Parallel::Simple::Pipe que
extienda el módulo Parallel::Simple introducido en la
sección
3.9
con comunicaciones punto a punto
usando la función pipe. Puede ayudarle el ejemplo desarrollado en la sección
4.1.8.
Una mejora
es crear - al estilo MPI - una clase Communicator
que encapsule los servicios proveídos por un comunicador:
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/lib/Parallel/Simple$ cat -n Communicator.pm
1 #!/usr/bin/perl -w
2 package Parallel::Simple::Communicator;
3
4 use strict;
5 use warnings;
6 use Data::Dumper;
7
8 sub new {
9 die "Incorrect number of parameters in Communicator->new\n" unless @_ >= 4;
10 my ($class, $name, $r, $w) = @_;
11 my $self = { name => $name, rpipe => $r, wpipes => $w, buffer => {}, delim =>"\cC" };
12 bless $self, $class;
13 return $self;
14 }
15
16 sub send {
17 die "Incorrect number of parameters in Communicator->send\n" unless @_ >= 4;
18 my ($self, $target, $tag, $mesg) = @_;
19 my $all = { source => $self->{name}, tag => $tag, message => $mesg };
20 my $dumped = Data::Dumper->Dump([$all],['all']);
21 my $chan = $self->{wpipes}{$target};
22 print $chan $dumped . $self->{delim};
23 }
24
25 sub recv {
26 die "Incorrect number of parameters in Communicator->send\n" unless @_ >= 3;
27 my ($self, $source, $tag) = @_;
28
29 exists $self->{buffer}{$source}{$tag} and do {
30 my $result = $self->{buffer}{$source}{$tag};
31 delete $self->{buffer}{$source}{$tag};
32 return $result;
33 };
34 {
35 local $/ = $self->{delim};
36 my $chan = $self->{rpipe};
37 my $rcv = <$chan>; chomp $rcv;
38 my $all; eval $rcv;
39 ($all->{source} eq $source) and ($all->{tag} eq $tag) and
40 return $all->{message};
41 $self->{buffer}{$all->{source}}{$all->{tag}} = $all->{message};
42 redo;
43 }
44 }
45
46 1;
Cada proceso hijo después de su creación y antes
de realizar su tarea crea su comunicador. Sigue el fragmento
de prun en el que aparece la ejecución de un proceso hijo.
Observe la línea 64.
pp2@nereida:/tmp/Parallel-Simple-Pipe-0.01/lib/Parallel/Simple$ cat -n Pipe.pm
1 package Parallel::Simple::Pipe;
2
3 use strict;
4 use warnings;
5 use IO::Handle;
6 use Parallel::Simple::Communicator;
. ...................................
22 {
23 my (%R, %W); # pipe channels
24
25 sub init_pipes {
26 my (@blocks) = @_;
27 foreach (@blocks) {
28 pipe($R{$_}, $W{$_});
29 $W{$_}->autoflush();
30 }
31 }
. ...................................
41 sub prun {
42 ( $error, $return_values ) = ( undef, undef ); # reset globals
43 return 1 unless ( @_ );
. ...................................
64 if ( $child == 0 ) { # child
65 my ( $subref, @args ) = ref($block) =~ /ARRAY/ ? @$block : ( $block );
66 close $W{$name};
67 my $comm = Parallel::Simple::Communicator->new($name, $R{$name}, \%W);
68 my $return_value = eval { $subref->( $comm, @args ) };
69 warn( $@ ) if ( $@ ); # print death message, because eval doesn't
70 exit( $@ ? 255 : $options{use_return} ? $return_value : 0 );
71 }
. ...................................
Sigue un ejemplo de uso:
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/script$ cat -n test.pl
1 #!/usr/bin/perl -w -I../lib
2 use strict;
3 use Parallel::Simple::Pipe qw ( prun );
4
5 sub foo {
6 my $c = shift;
7 $c->send("bar", "mytag", "from foo to bar\n");
8 my $line = $c->recv("bar", "myothertag");
9 print $line;
10 }
11
12 sub bar {
13 my $c = shift;
14 my $line = $c->recv("foo", "mytag");
15 $c->send("foo", "myothertag", "from bar to foo\n");
16 print $line;
17 }
18
19 prun (
20 foo => \&foo,
21 bar => \&bar
22 );
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/script$ test.pl
from foo to bar
from bar to foo
Casiano Rodríguez León
