package Threads::Pipe;
use 5.008004;
use strict;
use warnings;
use threads;
use Thread::Queue;
use Carp;
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = ( 'pipe' );
our $VERSION = '0.01';
# Virtualización de un pipe.
# Se supone que el número de procesos virtuales requeridos
# es conocido de antemano.
sub pipe { # crea el anillo que virtualiza el pipe de tamaño N
my $nt = shift(); # número de threads
my $N = shift(); # número de procesadores virtuales
my $task = shift(); # subrutina a ejecutar
my @t; # array de tareas
my @channel; # array de colas
# El resto de argumentos son argumentos de $task
my $id; # identificador de procesador físico
# creamos canales ...
for($id=0; $id < $nt; $id++) {
$channel[$id] = new Thread::Queue; # canal comunicando procesos $id y ($id+1)%$nt
}
# creamos threads ...
for($id=0; $id < $nt; $id++) {
my $wrap = sub { # clausura que envuelve a la función de usuario $task
my $i; # identificador de proceso virtual
my @results;
for($i = $id; $i < $N; $i += $nt) {
my $result_i = $task->($i, $channel[($id+$nt-1)%$nt], $channel[$id], @_);
push @results, $result_i;
}
return \@results;
}; # end wrap
$t[$id] = threads->new($wrap, @_); # contexto escalar: retorna escalar
}
# La thread 0 no trabaja, sólo espera ...
my @result;
for($id=0; $id < $nt; $id++) {
$result[$id] = $t[$id]->join(); # join debe ser ejecutado por el padre
}
# shuffle
my @R;
for($id=0; $id < $nt; $id++) {
my @aux = @{$result[$id]};
for my $i (0..$#aux) {
$R[$id+$nt*$i] = $aux[$i];
}
}
return @R;
}
1;
Programa:
$ cat -n pipe3.pl
1 #!/usr/bin/perl -w -I../lib
2 # File: pipe3.pl
3
4 use strict;
5 use threads;
6 use Thread::Queue;
7 use Threads::Pipe qw(pipe);
8
9 our $numthreads = (shift || 2); # número de threads "físicas"
10 our $numvirtual = (shift || 8); # número de threads "virtuales"
11 our $nummessages = (shift || 4); # número de mensajes por etapa
12
13 ### main ###
14 &pipe($numthreads, $numvirtual, \&job, $nummessages);
15
16 sub job {
17 my $vid = shift;
18 my $from_left = shift;
19 my $to_right = shift;
20 ##############
21 my $nummessages = shift;
22 my $id = threads->self->tid(); # identificado de ithread Perl
23
24 my ($i, $num);
25 if ($vid) {
26 for $i (1..$nummessages) {
27 $num = $from_left->dequeue;
28 # procesar número ...
29 my $processed = $num*$vid;
30 print "id=$id vid=$vid: num=$num processed=$processed\n";
31 $to_right->enqueue($num);
32 }
33 }
34 else {
35 for $i (1..$nummessages) {
36 print "id=$id vid=$vid: num=$i\n";
37 $to_right->enqueue($i);
38 }
39 }
40 }
Ejecución:
$ pipe3.pl id=1 vid=0: num=1 id=1 vid=0: num=2 id=1 vid=0: num=3 id=1 vid=0: num=4 id=2 vid=1: num=1 processed=1 id=1 vid=2: num=1 processed=2 id=2 vid=1: num=2 processed=2 id=1 vid=2: num=2 processed=4 id=2 vid=1: num=3 processed=3 id=1 vid=2: num=3 processed=6 id=2 vid=1: num=4 processed=4 id=1 vid=2: num=4 processed=8 id=2 vid=3: num=1 processed=3 id=1 vid=4: num=1 processed=4 id=2 vid=3: num=2 processed=6 id=2 vid=3: num=3 processed=9 id=1 vid=4: num=2 processed=8 id=2 vid=3: num=4 processed=12 id=1 vid=4: num=3 processed=12 id=2 vid=5: num=1 processed=5 id=1 vid=4: num=4 processed=16 id=2 vid=5: num=2 processed=10 id=1 vid=6: num=1 processed=6 id=1 vid=6: num=2 processed=12 id=2 vid=5: num=3 processed=15 id=1 vid=6: num=3 processed=18 id=2 vid=5: num=4 processed=20 id=1 vid=6: num=4 processed=24 id=2 vid=7: num=1 processed=7 id=2 vid=7: num=2 processed=14 id=2 vid=7: num=3 processed=21 id=2 vid=7: num=4 processed=28
Casiano Rodríguez León
