package Threads::Pipe; use 5.008004; use strict; use warnings; use threads; use Thread::Queue; use Carp; require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = ( 'pipe' ); our $VERSION = '0.01'; # Virtualización de un pipe. # Se supone que el número de procesos virtuales requeridos # es conocido de antemano. sub pipe { # crea el anillo que virtualiza el pipe de tamaño N my $nt = shift(); # número de threads my $N = shift(); # número de procesadores virtuales my $task = shift(); # subrutina a ejecutar my @t; # array de tareas my @channel; # array de colas # El resto de argumentos son argumentos de $task my $id; # identificador de procesador físico # creamos canales ... for($id=0; $id < $nt; $id++) { $channel[$id] = new Thread::Queue; # canal comunicando procesos $id y ($id+1)%$nt } # creamos threads ... for($id=0; $id < $nt; $id++) { my $wrap = sub { # clausura que envuelve a la función de usuario $task my $i; # identificador de proceso virtual my @results; for($i = $id; $i < $N; $i += $nt) { my $result_i = $task->($i, $channel[($id+$nt-1)%$nt], $channel[$id], @_); push @results, $result_i; } return \@results; }; # end wrap $t[$id] = threads->new($wrap, @_); # contexto escalar: retorna escalar } # La thread 0 no trabaja, sólo espera ... my @result; for($id=0; $id < $nt; $id++) { $result[$id] = $t[$id]->join(); # join debe ser ejecutado por el padre } # shuffle my @R; for($id=0; $id < $nt; $id++) { my @aux = @{$result[$id]}; for my $i (0..$#aux) { $R[$id+$nt*$i] = $aux[$i]; } } return @R; } 1;Programa:
$ cat -n pipe3.pl 1 #!/usr/bin/perl -w -I../lib 2 # File: pipe3.pl 3 4 use strict; 5 use threads; 6 use Thread::Queue; 7 use Threads::Pipe qw(pipe); 8 9 our $numthreads = (shift || 2); # número de threads "físicas" 10 our $numvirtual = (shift || 8); # número de threads "virtuales" 11 our $nummessages = (shift || 4); # número de mensajes por etapa 12 13 ### main ### 14 &pipe($numthreads, $numvirtual, \&job, $nummessages); 15 16 sub job { 17 my $vid = shift; 18 my $from_left = shift; 19 my $to_right = shift; 20 ############## 21 my $nummessages = shift; 22 my $id = threads->self->tid(); # identificado de ithread Perl 23 24 my ($i, $num); 25 if ($vid) { 26 for $i (1..$nummessages) { 27 $num = $from_left->dequeue; 28 # procesar número ... 29 my $processed = $num*$vid; 30 print "id=$id vid=$vid: num=$num processed=$processed\n"; 31 $to_right->enqueue($num); 32 } 33 } 34 else { 35 for $i (1..$nummessages) { 36 print "id=$id vid=$vid: num=$i\n"; 37 $to_right->enqueue($i); 38 } 39 } 40 }Ejecución:
$ pipe3.pl id=1 vid=0: num=1 id=1 vid=0: num=2 id=1 vid=0: num=3 id=1 vid=0: num=4 id=2 vid=1: num=1 processed=1 id=1 vid=2: num=1 processed=2 id=2 vid=1: num=2 processed=2 id=1 vid=2: num=2 processed=4 id=2 vid=1: num=3 processed=3 id=1 vid=2: num=3 processed=6 id=2 vid=1: num=4 processed=4 id=1 vid=2: num=4 processed=8 id=2 vid=3: num=1 processed=3 id=1 vid=4: num=1 processed=4 id=2 vid=3: num=2 processed=6 id=2 vid=3: num=3 processed=9 id=1 vid=4: num=2 processed=8 id=2 vid=3: num=4 processed=12 id=1 vid=4: num=3 processed=12 id=2 vid=5: num=1 processed=5 id=1 vid=4: num=4 processed=16 id=2 vid=5: num=2 processed=10 id=1 vid=6: num=1 processed=6 id=1 vid=6: num=2 processed=12 id=2 vid=5: num=3 processed=15 id=1 vid=6: num=3 processed=18 id=2 vid=5: num=4 processed=20 id=1 vid=6: num=4 processed=24 id=2 vid=7: num=1 processed=7 id=2 vid=7: num=2 processed=14 id=2 vid=7: num=3 processed=21 id=2 vid=7: num=4 processed=28
Casiano Rodríguez León