Granja con Extensiones

Descargue la distribución desde el enlace http://nereida.deioc.ull.es/˜pp2/perlexamples/Farm-Simple-0.2.tar.gz

Ejecución

pp2@nereida:~/LFARM/script$ farm.pl
pi0          100% 7361     7.2KB/s   00:00
pi0          100% 7361     7.2KB/s   00:00
pi0          100% 7361     7.2KB/s   00:00
/usr/bin/ssh beowulf ./pi0 0 4096 8
/usr/bin/ssh orion ./pi0 1 4096 8
/usr/bin/ssh nereida ./pi0 2 4096 8
reaping 11144 (2 4096 8) from worker nereida. Errvars: 5888 29 Desplazamiento ilegal
From nereida (11144) received result:
0.39279065746825037042
for task (2 4096 8)
reaping 11142 (0 4096 8) from worker beowulf. Errvars: 5888 0
From beowulf (11142) received result:
0.39291265325220731119
for task (0 4096 8)
reaping 11143 (1 4096 8) from worker orion. Errvars: 5888 9 Descriptor de fichero erróneo
From orion (11143) received result:
0.39285166283354705508
for task (1 4096 8)
/usr/bin/ssh nereida ./pi0 3 4096 8
/usr/bin/ssh beowulf ./pi0 4 4096 8
/usr/bin/ssh orion ./pi0 5 4096 8
reaping 11151 (4 4096 8) from worker beowulf. Errvars: 5888 29 Desplazamiento ilegal
From beowulf (11151) received result:
0.39266860197050978964
for task (4 4096 8)
reaping 11150 (3 4096 8) from worker nereida. Errvars: 5888 10 No hay ningún proceso hijo
From nereida (11150) received result:
0.39272963717450859455
for task (3 4096 8)
/usr/bin/ssh beowulf ./pi0 6 4096 8
/usr/bin/ssh nereida ./pi0 7 4096 8
Last tasks
reaping 11152 (5 4096 8) from worker orion. Errvars: 5888 29 Desplazamiento ilegal
From orion (11152) received result:
0.39260755187444501546
for task (5 4096 8)
reaping 11159 (7 4096 8) from worker nereida. Errvars: 5888 25 Función ioctl no apropiada para el dispositivo
From nereida (11159) received result:
0.39248540707887613621
for task (7 4096 8)
reaping 11158 (6 4096 8) from worker beowulf. Errvars: 5888 25 Función ioctl no apropiada para el dispositivo
From beowulf (11158) received result:
0.39254648690450405502
for task (6 4096 8)
Result: 3.14159265855685
pp2@nereida:~/LFARM/script$

El Programa Cliente

pp2@nereida:~/LFARM/script$ cat -n farm.pl
 1  #!/usr/bin/perl -w
 2  # Author: Casiano
 3  use strict;
 4  use Farm::Simple;
 5
 6  #### main
 7
 8  # Warning: the format of the printf in pi0.c influences
 9  # the precision
10  #
11  use constant NT => 8;
12  use constant LT => NT-1;
13  use constant N => 8**5; # Num intervals
14  use constant NperT => N/NT; # Assume N%NT == 0
15
16  my $dir = `pwd`;
17  chomp $dir;
18  my $rcp = (shift || "/usr/bin/scp");
19  my $executable = (shift || "./pi0");
20  my @tasks = map { [$executable, $_, NperT, NT] } 0..LT; # Tasks for pi
21
22  my $default = {
23    rsh => '/usr/bin/ssh',
24    rcp => '/usr/bin/scp',
25    user => 'casiano',
26  };
27
28  my $cluster = Farm::Cluster->new(
29    orion   => $default,
30    nereida => $default,
31    beowulf => $default,
32  );
33
34  my $looks_ok = sub {
35    my $r = shift;
36
37    return ((ref($r) eq 'ARRAY') && @$r && ($r->[0] =~ m{\d+}));
38  };
39
40  my $job = Farm::Job->new(
41    tasks       => \@tasks,
42    combine     => \&combine,
43    initialize  => \&initialize,
44    finalize    => \&finalize,
45    looks_ok    => $looks_ok,
46  );
47
48  my $farm = Farm::Simple->new(
49    checkpointing => 1,
50    debug=>1,
51    tracedir => 'FARM',
52  );
53
54  my $out = $farm->run($cluster, $job);
55  print "Result: $out\n";
56
57  sub initialize {
58    my ($job, $cluster, $run) = @_;
59
60    # Assume all machines have the same architecture
61    $cluster->cp($executable);
62
63    return 0; # initial value for the accumulator
64  }
65
66  sub finalize {
67    my ($job, $cluster, $run) = @_;
68
69    $cluster->rm($executable);
70  }
71
72  sub combine {
73    $_[0] += $_[1]->{result}[0];
74  }

El Paquete Farm::Simple

pp2@nereida:~/LFARM/script$ sed -ne '1,153p' Simple.pm | cat -n
  1  package Farm::Simple;
  2  use 5.008008;
  3  use strict;
  4  use warnings;
  5
  6  use POSIX;
  7  use IO::Handle;
  8  use IO::File;
  9  use YAML qw(DumpFile LoadFile);
 10  use Time::HiRes qw(time);
 11
 12  use base qw(Class::Accessor);
 13  Farm::Simple->mk_accessors(qw(checkpointing debug tracedir));
 14
 15  sub new {
 16    my $class = shift;
 17    my %args = @_;
 18
 19    $args{checkpointing} = 1 unless defined($args{checkpointing});
 20    $args{debug} = 0  unless defined($args{debug});
 21    $args{tracedir} = 'FARMLOG'  unless defined($args{tracedir});
 22
 23    bless \%args, $class;
 24  }
 25
 26  sub run {
 27    my $self = shift;
 28    my $cluster = shift;
 29    my $job = shift;
 30
 31    my @idles = $cluster->processornames;
 32    my %rsh = $cluster->rsh;
 33    my @tasks = @{$job->tasks};
 34    my $combine = $job->combine;
 35    my $looks_ok = $job->looks_ok;
 36    my $checkpointing = $self->checkpointing;
 37    my $debug = $self->debug;
 38
 39    my %process;   # Key = PID Value = {task=>$t, pid=>$p, worker=>$w, sorder=>$s, begin=>$t}
 40    my $accum;     # Accumulator
 41    my @results;   # Results.
 42
 43    my %workers;   # Set of workers
 44    @workers{@idles} = ();
 45    my %error;     # $Error{$w} = [ tasks that failed when executed on $w ]
 46
 47    my $rorder = 0;  # reception order
 48    my $tmpdir = $self->tracedir;
 49    mkdir $tmpdir if $checkpointing;
 50
 51    my $process_child = sub {
 52        my $child = shift;
 53        my $p = $process{$child};
 54
 55        my @errmsg = ($?, 0+$!, "$!");
 56        my @t = @{$p->task()};
 57        my $w = $p->worker;
 58
 59        warn "reaping $child (@t) from worker $w. Errvars: @errmsg\n" if $debug;
 60
 61        my $handle = $p->fromchild; # Recuperamos el canal con ese hijo
 62        my @result = <$handle>;
 63        my $end = time();
 64        if ($looks_ok->(\@result, $p, @errmsg)) {
 65          push @idles, $w;                              # Now $w is idle again
 66          my $r = {
 67            task => \@t,
 68            result => \@result,
 69            worker => $w,
 70            begin => $p->begin(),
 71            end => $end,
 72            PID => $child,
 73            sorder => $p->sorder(),
 74            rorder => $rorder++,
 75          };
 76          push @results, $r;
 77
 78          my @segtoshow = @result>2? @result[0,1]:@result;
 79          warn "From $w ($child) received result:\n@segtoshow".
 80               "for task (@t)\n" if $debug;
 81        }
 82        else {
 83          warn "Error processing task @t on $w. Errmsg: @errmsg\n";
 84          push @{$error{$w}}, \@t;
 85            DumpFile("$tmpdir/Error".$p->sorder().".yml",
 86                     \$w, \@t, $p->begin(), $end,  \@errmsg)
 87          if $checkpointing;
 88          die "No machines left\n" if (keys %workers == keys %error);
 89        }
 90
 91        delete $process{$child};
 92
 93    };
 94
 95    my $reaper = sub {
 96      my $child;
 97
 98      $process_child->($child) while (($child = waitpid(-1, WNOHANG)) > 0);
 99    }; # end reaper
100
101    # Initialize
102    $accum = $job->initialize->($job, $cluster, $self);
103
104    my $sorder = 0; # Current task position
105    {
106      local $SIG{CHLD} = $reaper;
107      while (@tasks) {
108        while (@idles and @tasks) {
109          my $t = shift @tasks;
110          my $w = shift @idles;
111          my $handle = IO::Handle->new();
112
113          my $c = shift @$t;
114          my $rcmd = $rsh{$w}? "$rsh{$w} $w $c @$t" : "$c @$t";
115          warn "$rcmd\n" if $debug;
116
117          my $p = Farm::Process->new(
118              fromchild => $handle, task => $t, worker => $w, sorder => $sorder,
119          );
120
121          $job->on_start($p);
122          my $pid = open($handle, "$rcmd |") || die "Error: can't fork child to $rcmd\n";
123
124          $p->pid($pid);
125          $process{$pid} = $p;
126
127          $sorder++;
128        } # end while @idles and @tasks
129
130        my $r;
131        while ($r = shift @results) {
132          $combine->($accum, $r);
133          DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing;
134        }
135      } # end while (@tasks)
136    } # end scope of reaper
137
138    warn "Last tasks\n" if $debug;
139    while (($_ = wait) > 0) {
140      $process_child->($_);
141      my $r = shift @results;
142      $combine->($accum, $r);
143      DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing;
144    }
145
146    # Finalize
147    $job->finalize->($job, $cluster, $self);
148
149    return $accum;
150  }
151
152
153  1;

El Paquete Farm::Job

pp2@nereida:~/LFARM/script$ sed -ne '155,194p' Simple.pm | cat -n
 1  package Farm::Job;
 2  use base qw(Class::Accessor);
 3  Farm::Job->mk_accessors(qw(tasks combine on_start looks_ok initialize finalize));
 4
 5
 6  my $default_looks_ok = sub { # something written to STDOUT
 7    my $res = shift;
 8    return (ref($res) eq 'ARRAY') && (@{$res} > 0)
 9  };
10
11  sub new {
12    my $class = shift || die "Error building Farm::Job\n";
13    my %args = @_;
14
15    die "farm Error! Supply tasks argument\n" unless defined($args{tasks})
16                                  and UNIVERSAL::isa($args{tasks}, 'ARRAY')
17                                  and @{$args{tasks}};
18
19    $args{combine} = sub { $_[0] .= "$[1]\n"; } unless defined($args{combine});
20
21    $args{on_start} = sub { } unless defined($args{on_start});
22
23    $args{looks_ok} =  $default_looks_ok unless defined($args{looks_ok});
24
25    # Initialize
26    $args{initialize} = sub { } unless defined($args{initialize});
27
28      die "Error creating job: 'initialize' is not a CODE ref\n"
29    unless  UNIVERSAL::isa($args{initialize}, 'CODE');
30
31    # Finalize
32    $args{finalize} = sub { } unless defined($args{finalize});
33
34      die "Error creating job: 'finalize' is not a CODE ref\n"
35    unless  UNIVERSAL::isa($args{finalize}, 'CODE');
36
37    bless \%args, $class;
38  }
39
40  1;

El Paquete Farm::Process

pp2@nereida:~/LFARM/script$ sed -ne '196,216p' Simple.pm | cat -n
 1  package Farm::Process;
 2  use base qw(Class::Accessor);
 3
 4  Farm::Process->mk_accessors(qw(task worker fromchild pid sorder begin));
 5
 6  # Mapping of a task onto a machine
 7  # $process{$pid} = Farm::Process->new(
 8  #    fromchild => $handle,
 9  #    task => $t,
10  #    pid => $pid,
11  #    worker => $w,
12  #    sorder => $sorder);
13  #
14
15  sub new {
16    my $class = shift || die "Error building Farm::Process\n";
17    my %args = @_;
18
19    $args{begin} = time();
20    bless \%args, $class;
21  }

El Paquete Farm::Cluster

pp2@nereida:~/LFARM/script$ sed -ne '218,295p' Simple.pm | cat -n
 1  package Farm::Cluster;
 2
 3  # Defines the cluster
 4  # Farm::Cluster->new(
 5  #       beowulf => { rsh => '/usr/bin/ssh', ... }
 6  #       orion   => { rsh => $rsh, rcp => $rcp, user=> 'fulano', ... }
 7  #       ...
 8  #       );
 9  #
10  sub new {
11    my $class = shift || die "Error building Farm::Cluster\n";
12    my %args = @_;
13    my %cluster = ();
14
15    for (keys %args) {
16      die "Illegal machine name $_\n" unless defined($_) and m{\w+};
17      my $m = Farm::Machine->new(name => $_, %{$args{$_}});
18      $cluster{$_} = $m;
19    }
20    # user and dir
21    #
22
23    bless \%cluster, $class;
24  }
25
26  sub processornames {
27    my $self = shift;
28
29    return keys(%$self);
30  }
31
32  sub processor {
33    my $self = shift; # cluster
34    my $name = shift; # processor name
35    my $val = shift;
36
37    $self->{$name} = $val if defined($val) and UNIVERSAL::isa($val, 'Farm::Machine');
38    return $self->{$name};
39  }
40
41  sub processors {
42    my $self = shift;
43
44    return values(%$self);
45  }
46
47  # Returns a hash (beowulf=>'/usr/bin/ssh', nereida => '', ... )
48  sub rsh {
49    my $self = shift;
50
51    return map { $_ => $self->{$_}{rsh} } keys(%$self);
52  }
53
54  sub cp {
55    my $cluster = shift;
56    my $src     = shift;
57    my $dest    = shift || '';
58
59    die "Can't copy file $src\n" unless -r $src;
60    foreach my $machine ($cluster->processors()) {
61      if (system($machine->rcp, $src, "$machine->{name}:$dest")) {
62        warn "Couldn't copy $src to $machine->{name}:$dest: $?\n"
63      }
64    }
65  }
66
67  sub rm {
68    my $cluster = shift;
69    my $src     = shift || die "Cluster 'rm' error: provide a filename\n";
70
71    foreach my $machine ($cluster->processors()) {
72      if (system($machine->rsh, "$machine->{name}", "rm $src")) {
73        warn "couldn't rm $src in $machine->{name}: $?\n"
74      }
75    }
76  }
77
78  1;

El Paquete Farm::Machine

pp2@nereida:~/LFARM/script$ sed -ne '297,370p' Simple.pm | cat -n
 1  package Farm::Machine;
 2  use IPC::Run3;
 3  use base qw(Class::Accessor);
 4  Farm::Machine->mk_accessors(qw(name rsh rcp stdout stderr));
 5
 6  use constant NULDEV => \undef;
 7
 8  # Defines machine
 9  # only rsh field now
10
11  sub new {
12    my $class = shift || die "Error building Farm::Machine\n";
13    my %arg = @_;
14
15    die "Provide a name for the machine\n" unless defined($arg{name});
16
17    unless (defined($arg{rsh})) {
18      my $rsh = `which ssh`;
19      die "Error: define ssh\n" unless defined($rsh);
20      chomp($rsh),
21      $arg{rsh} = $rsh;
22    }
23
24    unless (defined($arg{rcp})) {
25      my $rcp = `which scp`;
26      die "Error: define scp\n" unless defined($rcp);
27      chomp($rcp),
28      $arg{rcp} = $rcp;
29    }
30
31    # Add user field
32
33    # Home directory for this machine
34    $arg{home} = $arg{name} unless exists($arg{home});
35
36    # Local directories for this machine
37    open $arg{stdout}, "> $arg{name}.output";
38    open $arg{stderr}, "> .$arg{name}.err";
39
40    # Add environment variables
41    # stdin stdout stderr
42
43    bless \%arg, $class;
44  }
45
46  sub DESTROY {
47    my $self = shift;
48
49    close($self->stdout);
50    close($self->stderr);
51  }
52
53  # True if machine accepts ssh connections
54  {
55    my $self;
56    local $SIG{ALRM} = sub { $self->{operative} = 0 };
57
58    sub isoperative {
59      $self = shift;
60      my $seconds = shift || 1;
61
62      my $machine = ['ssh', $self->name,' ps'];
63      $self->{operative} = 1;
64      alarm($seconds);
65      eval {
66        run3($machine, undef, NULDEV, $self->stderr);
67      };
68      $self->{operative} = 0 if $@;
69      alarm(0);
70      return $self->{operative};
71    }
72  }
73
74  1;

El Ejecutable operative.pl

pp2@nereida:~/LFARM/script$ cat -n operative.pl
 1  #!/usr/bin/perl -w
 2  # Author: Casiano
 3  use strict;
 4  use Farm::Simple;
 5
 6  #### main
 7
 8  # Warning: the format of the printf in pi0.c influences
 9  # the precision
10  #
11  my $m = shift || "beowulf";
12
13  my $default = {
14    rsh => '/usr/bin/ssh',
15    rcp => '/usr/bin/scp',
16    user => 'casiano',
17  };
18
19  my @processors = qw(orion nereida beowulf);
20  my $cluster = Farm::Cluster->new( map { $_ => $default } @processors );
21
22  my $rm = $cluster->processor($m);
23  print $rm->isoperative."\n";



Subsecciones
Casiano Rodríguez León
Licencia de Creative Commons
Programación Distribuida y Mejora del Rendimiento
por Casiano Rodríguez León is licensed under a Creative Commons Reconocimiento 3.0 Unported License.

Permissions beyond the scope of this license may be available at http://campusvirtual.ull.es/ocw/course/view.php?id=44.
2012-06-19