Parallel::Simple::Pipe
que
extienda el módulo Parallel::Simple
introducido en la
sección
3.9
con comunicaciones punto a punto
usando la función pipe
. Puede ayudarle el ejemplo desarrollado en la sección
4.1.8.
Una mejora
es crear - al estilo MPI - una clase Communicator
que encapsule los servicios proveídos por un comunicador:
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/lib/Parallel/Simple$ cat -n Communicator.pm 1 #!/usr/bin/perl -w 2 package Parallel::Simple::Communicator; 3 4 use strict; 5 use warnings; 6 use Data::Dumper; 7 8 sub new { 9 die "Incorrect number of parameters in Communicator->new\n" unless @_ >= 4; 10 my ($class, $name, $r, $w) = @_; 11 my $self = { name => $name, rpipe => $r, wpipes => $w, buffer => {}, delim =>"\cC" }; 12 bless $self, $class; 13 return $self; 14 } 15 16 sub send { 17 die "Incorrect number of parameters in Communicator->send\n" unless @_ >= 4; 18 my ($self, $target, $tag, $mesg) = @_; 19 my $all = { source => $self->{name}, tag => $tag, message => $mesg }; 20 my $dumped = Data::Dumper->Dump([$all],['all']); 21 my $chan = $self->{wpipes}{$target}; 22 print $chan $dumped . $self->{delim}; 23 } 24 25 sub recv { 26 die "Incorrect number of parameters in Communicator->send\n" unless @_ >= 3; 27 my ($self, $source, $tag) = @_; 28 29 exists $self->{buffer}{$source}{$tag} and do { 30 my $result = $self->{buffer}{$source}{$tag}; 31 delete $self->{buffer}{$source}{$tag}; 32 return $result; 33 }; 34 { 35 local $/ = $self->{delim}; 36 my $chan = $self->{rpipe}; 37 my $rcv = <$chan>; chomp $rcv; 38 my $all; eval $rcv; 39 ($all->{source} eq $source) and ($all->{tag} eq $tag) and 40 return $all->{message}; 41 $self->{buffer}{$all->{source}}{$all->{tag}} = $all->{message}; 42 redo; 43 } 44 } 45 46 1;Cada proceso hijo después de su creación y antes de realizar su tarea crea su comunicador. Sigue el fragmento de
prun
en el que aparece la ejecución de un proceso hijo.
Observe la línea 64.
pp2@nereida:/tmp/Parallel-Simple-Pipe-0.01/lib/Parallel/Simple$ cat -n Pipe.pm 1 package Parallel::Simple::Pipe; 2 3 use strict; 4 use warnings; 5 use IO::Handle; 6 use Parallel::Simple::Communicator; . ................................... 22 { 23 my (%R, %W); # pipe channels 24 25 sub init_pipes { 26 my (@blocks) = @_; 27 foreach (@blocks) { 28 pipe($R{$_}, $W{$_}); 29 $W{$_}->autoflush(); 30 } 31 } . ................................... 41 sub prun { 42 ( $error, $return_values ) = ( undef, undef ); # reset globals 43 return 1 unless ( @_ ); . ................................... 64 if ( $child == 0 ) { # child 65 my ( $subref, @args ) = ref($block) =~ /ARRAY/ ? @$block : ( $block ); 66 close $W{$name}; 67 my $comm = Parallel::Simple::Communicator->new($name, $R{$name}, \%W); 68 my $return_value = eval { $subref->( $comm, @args ) }; 69 warn( $@ ) if ( $@ ); # print death message, because eval doesn't 70 exit( $@ ? 255 : $options{use_return} ? $return_value : 0 ); 71 } . ...................................Sigue un ejemplo de uso:
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/script$ cat -n test.pl 1 #!/usr/bin/perl -w -I../lib 2 use strict; 3 use Parallel::Simple::Pipe qw ( prun ); 4 5 sub foo { 6 my $c = shift; 7 $c->send("bar", "mytag", "from foo to bar\n"); 8 my $line = $c->recv("bar", "myothertag"); 9 print $line; 10 } 11 12 sub bar { 13 my $c = shift; 14 my $line = $c->recv("foo", "mytag"); 15 $c->send("foo", "myothertag", "from bar to foo\n"); 16 print $line; 17 } 18 19 prun ( 20 foo => \&foo, 21 bar => \&bar 22 ); lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/script$ test.pl from foo to bar from bar to foo
Casiano Rodríguez León