Práctica: Extensión de Parallel::Simple con Pipes

Escriba un módulo Parallel::Simple::Pipe que extienda el módulo Parallel::Simple introducido en la sección 3.9 con comunicaciones punto a punto usando la función pipe. Puede ayudarle el ejemplo desarrollado en la sección 4.1.8.

Una mejora es crear - al estilo MPI - una clase Communicator que encapsule los servicios proveídos por un comunicador:

lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/lib/Parallel/Simple$ cat -n Communicator.pm
 1  #!/usr/bin/perl -w
 2  package Parallel::Simple::Communicator;
 3
 4  use strict;
 5  use warnings;
 6  use Data::Dumper;
 7
 8  sub new {
 9    die "Incorrect number of parameters in Communicator->new\n" unless @_ >= 4;
10    my ($class, $name, $r, $w) = @_;
11    my $self = { name => $name, rpipe => $r, wpipes => $w, buffer => {}, delim =>"\cC" };
12    bless $self, $class;
13    return $self;
14  }
15
16  sub send {
17    die "Incorrect number of parameters in Communicator->send\n" unless @_ >= 4;
18    my ($self, $target, $tag, $mesg) = @_;
19    my $all = { source => $self->{name}, tag => $tag, message => $mesg };
20    my $dumped = Data::Dumper->Dump([$all],['all']);
21    my $chan = $self->{wpipes}{$target};
22    print $chan $dumped . $self->{delim};
23  }
24
25  sub recv {
26    die "Incorrect number of parameters in Communicator->send\n" unless @_ >= 3;
27    my ($self, $source, $tag) = @_;
28
29    exists $self->{buffer}{$source}{$tag} and do {
30      my $result = $self->{buffer}{$source}{$tag};
31      delete $self->{buffer}{$source}{$tag};
32      return $result;
33    };
34    {
35      local $/ = $self->{delim};
36      my $chan = $self->{rpipe};
37      my $rcv = <$chan>; chomp $rcv;
38      my $all; eval $rcv;
39      ($all->{source} eq $source) and ($all->{tag} eq $tag) and
40        return $all->{message};
41      $self->{buffer}{$all->{source}}{$all->{tag}} = $all->{message};
42      redo;
43    }
44  }
45
46  1;
Cada proceso hijo después de su creación y antes de realizar su tarea crea su comunicador. Sigue el fragmento de prun en el que aparece la ejecución de un proceso hijo. Observe la línea 64.
pp2@nereida:/tmp/Parallel-Simple-Pipe-0.01/lib/Parallel/Simple$ cat -n Pipe.pm
1  package Parallel::Simple::Pipe;
2
3  use strict;
4  use warnings;
5  use IO::Handle;
6  use Parallel::Simple::Communicator;
.  ...................................
22  {
23    my (%R, %W); # pipe channels
24
25    sub init_pipes {
26      my (@blocks) = @_;
27      foreach (@blocks) {
28        pipe($R{$_}, $W{$_});
29        $W{$_}->autoflush();
30      }
31    }
.  ...................................
41  sub prun {
42    ( $error, $return_values ) = ( undef, undef );         # reset globals
43    return 1 unless ( @_ );
.  ...................................
64    if ( $child == 0 ) {  # child
65      my ( $subref, @args ) = ref($block) =~ /ARRAY/ ? @$block : ( $block );
66      close $W{$name};
67      my $comm = Parallel::Simple::Communicator->new($name, $R{$name}, \%W);
68      my $return_value = eval { $subref->( $comm, @args ) };
69      warn( $@ ) if ( $@ );  # print death message, because eval doesn't
70      exit( $@ ? 255 : $options{use_return} ? $return_value : 0 );
71    }
.  ...................................
Sigue un ejemplo de uso:
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/script$ cat -n test.pl
     1  #!/usr/bin/perl -w -I../lib
     2  use strict;
     3  use Parallel::Simple::Pipe qw ( prun );
     4
     5  sub foo {
     6    my $c = shift;
     7    $c->send("bar", "mytag", "from foo to bar\n");
     8    my $line = $c->recv("bar", "myothertag");
     9    print $line;
    10  }
    11
    12  sub bar {
    13    my $c = shift;
    14    my $line = $c->recv("foo", "mytag");
    15    $c->send("foo", "myothertag", "from bar to foo\n");
    16    print $line;
    17  }
    18
    19  prun (
    20    foo => \&foo,
    21    bar => \&bar
    22  );
lhp@nereida:/tmp/Parallel-Simple-Pipe-0.01/script$ test.pl
from foo to bar
from bar to foo

Casiano Rodríguez León
Licencia de Creative Commons
Programación Distribuida y Mejora del Rendimiento
por Casiano Rodríguez León is licensed under a Creative Commons Reconocimiento 3.0 Unported License.

Permissions beyond the scope of this license may be available at http://campusvirtual.ull.es/ocw/course/view.php?id=44.
2012-06-19