Descargue la distribución desde el enlace http://nereida.deioc.ull.es/˜pp2/perlexamples/Farm-Simple-0.2.tar.gz
pp2@nereida:~/LFARM/script$ farm.pl pi0 100% 7361 7.2KB/s 00:00 pi0 100% 7361 7.2KB/s 00:00 pi0 100% 7361 7.2KB/s 00:00 /usr/bin/ssh beowulf ./pi0 0 4096 8 /usr/bin/ssh orion ./pi0 1 4096 8 /usr/bin/ssh nereida ./pi0 2 4096 8 reaping 11144 (2 4096 8) from worker nereida. Errvars: 5888 29 Desplazamiento ilegal From nereida (11144) received result: 0.39279065746825037042 for task (2 4096 8) reaping 11142 (0 4096 8) from worker beowulf. Errvars: 5888 0 From beowulf (11142) received result: 0.39291265325220731119 for task (0 4096 8) reaping 11143 (1 4096 8) from worker orion. Errvars: 5888 9 Descriptor de fichero erróneo From orion (11143) received result: 0.39285166283354705508 for task (1 4096 8) /usr/bin/ssh nereida ./pi0 3 4096 8 /usr/bin/ssh beowulf ./pi0 4 4096 8 /usr/bin/ssh orion ./pi0 5 4096 8 reaping 11151 (4 4096 8) from worker beowulf. Errvars: 5888 29 Desplazamiento ilegal From beowulf (11151) received result: 0.39266860197050978964 for task (4 4096 8) reaping 11150 (3 4096 8) from worker nereida. Errvars: 5888 10 No hay ningún proceso hijo From nereida (11150) received result: 0.39272963717450859455 for task (3 4096 8) /usr/bin/ssh beowulf ./pi0 6 4096 8 /usr/bin/ssh nereida ./pi0 7 4096 8 Last tasks reaping 11152 (5 4096 8) from worker orion. Errvars: 5888 29 Desplazamiento ilegal From orion (11152) received result: 0.39260755187444501546 for task (5 4096 8) reaping 11159 (7 4096 8) from worker nereida. Errvars: 5888 25 Función ioctl no apropiada para el dispositivo From nereida (11159) received result: 0.39248540707887613621 for task (7 4096 8) reaping 11158 (6 4096 8) from worker beowulf. Errvars: 5888 25 Función ioctl no apropiada para el dispositivo From beowulf (11158) received result: 0.39254648690450405502 for task (6 4096 8) Result: 3.14159265855685 pp2@nereida:~/LFARM/script$
pp2@nereida:~/LFARM/script$ cat -n farm.pl 1 #!/usr/bin/perl -w 2 # Author: Casiano 3 use strict; 4 use Farm::Simple; 5 6 #### main 7 8 # Warning: the format of the printf in pi0.c influences 9 # the precision 10 # 11 use constant NT => 8; 12 use constant LT => NT-1; 13 use constant N => 8**5; # Num intervals 14 use constant NperT => N/NT; # Assume N%NT == 0 15 16 my $dir = `pwd`; 17 chomp $dir; 18 my $rcp = (shift || "/usr/bin/scp"); 19 my $executable = (shift || "./pi0"); 20 my @tasks = map { [$executable, $_, NperT, NT] } 0..LT; # Tasks for pi 21 22 my $default = { 23 rsh => '/usr/bin/ssh', 24 rcp => '/usr/bin/scp', 25 user => 'casiano', 26 }; 27 28 my $cluster = Farm::Cluster->new( 29 orion => $default, 30 nereida => $default, 31 beowulf => $default, 32 ); 33 34 my $looks_ok = sub { 35 my $r = shift; 36 37 return ((ref($r) eq 'ARRAY') && @$r && ($r->[0] =~ m{\d+})); 38 }; 39 40 my $job = Farm::Job->new( 41 tasks => \@tasks, 42 combine => \&combine, 43 initialize => \&initialize, 44 finalize => \&finalize, 45 looks_ok => $looks_ok, 46 ); 47 48 my $farm = Farm::Simple->new( 49 checkpointing => 1, 50 debug=>1, 51 tracedir => 'FARM', 52 ); 53 54 my $out = $farm->run($cluster, $job); 55 print "Result: $out\n"; 56 57 sub initialize { 58 my ($job, $cluster, $run) = @_; 59 60 # Assume all machines have the same architecture 61 $cluster->cp($executable); 62 63 return 0; # initial value for the accumulator 64 } 65 66 sub finalize { 67 my ($job, $cluster, $run) = @_; 68 69 $cluster->rm($executable); 70 } 71 72 sub combine { 73 $_[0] += $_[1]->{result}[0]; 74 }
pp2@nereida:~/LFARM/script$ sed -ne '1,153p' Simple.pm | cat -n 1 package Farm::Simple; 2 use 5.008008; 3 use strict; 4 use warnings; 5 6 use POSIX; 7 use IO::Handle; 8 use IO::File; 9 use YAML qw(DumpFile LoadFile); 10 use Time::HiRes qw(time); 11 12 use base qw(Class::Accessor); 13 Farm::Simple->mk_accessors(qw(checkpointing debug tracedir)); 14 15 sub new { 16 my $class = shift; 17 my %args = @_; 18 19 $args{checkpointing} = 1 unless defined($args{checkpointing}); 20 $args{debug} = 0 unless defined($args{debug}); 21 $args{tracedir} = 'FARMLOG' unless defined($args{tracedir}); 22 23 bless \%args, $class; 24 } 25 26 sub run { 27 my $self = shift; 28 my $cluster = shift; 29 my $job = shift; 30 31 my @idles = $cluster->processornames; 32 my %rsh = $cluster->rsh; 33 my @tasks = @{$job->tasks}; 34 my $combine = $job->combine; 35 my $looks_ok = $job->looks_ok; 36 my $checkpointing = $self->checkpointing; 37 my $debug = $self->debug; 38 39 my %process; # Key = PID Value = {task=>$t, pid=>$p, worker=>$w, sorder=>$s, begin=>$t} 40 my $accum; # Accumulator 41 my @results; # Results. 42 43 my %workers; # Set of workers 44 @workers{@idles} = (); 45 my %error; # $Error{$w} = [ tasks that failed when executed on $w ] 46 47 my $rorder = 0; # reception order 48 my $tmpdir = $self->tracedir; 49 mkdir $tmpdir if $checkpointing; 50 51 my $process_child = sub { 52 my $child = shift; 53 my $p = $process{$child}; 54 55 my @errmsg = ($?, 0+$!, "$!"); 56 my @t = @{$p->task()}; 57 my $w = $p->worker; 58 59 warn "reaping $child (@t) from worker $w. Errvars: @errmsg\n" if $debug; 60 61 my $handle = $p->fromchild; # Recuperamos el canal con ese hijo 62 my @result = <$handle>; 63 my $end = time(); 64 if ($looks_ok->(\@result, $p, @errmsg)) { 65 push @idles, $w; # Now $w is idle again 66 my $r = { 67 task => \@t, 68 result => \@result, 69 worker => $w, 70 begin => $p->begin(), 71 end => $end, 72 PID => $child, 73 sorder => $p->sorder(), 74 rorder => $rorder++, 75 }; 76 push @results, $r; 77 78 my @segtoshow = @result>2? @result[0,1]:@result; 79 warn "From $w ($child) received result:\n@segtoshow". 80 "for task (@t)\n" if $debug; 81 } 82 else { 83 warn "Error processing task @t on $w. Errmsg: @errmsg\n"; 84 push @{$error{$w}}, \@t; 85 DumpFile("$tmpdir/Error".$p->sorder().".yml", 86 \$w, \@t, $p->begin(), $end, \@errmsg) 87 if $checkpointing; 88 die "No machines left\n" if (keys %workers == keys %error); 89 } 90 91 delete $process{$child}; 92 93 }; 94 95 my $reaper = sub { 96 my $child; 97 98 $process_child->($child) while (($child = waitpid(-1, WNOHANG)) > 0); 99 }; # end reaper 100 101 # Initialize 102 $accum = $job->initialize->($job, $cluster, $self); 103 104 my $sorder = 0; # Current task position 105 { 106 local $SIG{CHLD} = $reaper; 107 while (@tasks) { 108 while (@idles and @tasks) { 109 my $t = shift @tasks; 110 my $w = shift @idles; 111 my $handle = IO::Handle->new(); 112 113 my $c = shift @$t; 114 my $rcmd = $rsh{$w}? "$rsh{$w} $w $c @$t" : "$c @$t"; 115 warn "$rcmd\n" if $debug; 116 117 my $p = Farm::Process->new( 118 fromchild => $handle, task => $t, worker => $w, sorder => $sorder, 119 ); 120 121 $job->on_start($p); 122 my $pid = open($handle, "$rcmd |") || die "Error: can't fork child to $rcmd\n"; 123 124 $p->pid($pid); 125 $process{$pid} = $p; 126 127 $sorder++; 128 } # end while @idles and @tasks 129 130 my $r; 131 while ($r = shift @results) { 132 $combine->($accum, $r); 133 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing; 134 } 135 } # end while (@tasks) 136 } # end scope of reaper 137 138 warn "Last tasks\n" if $debug; 139 while (($_ = wait) > 0) { 140 $process_child->($_); 141 my $r = shift @results; 142 $combine->($accum, $r); 143 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing; 144 } 145 146 # Finalize 147 $job->finalize->($job, $cluster, $self); 148 149 return $accum; 150 } 151 152 153 1;
pp2@nereida:~/LFARM/script$ sed -ne '155,194p' Simple.pm | cat -n 1 package Farm::Job; 2 use base qw(Class::Accessor); 3 Farm::Job->mk_accessors(qw(tasks combine on_start looks_ok initialize finalize)); 4 5 6 my $default_looks_ok = sub { # something written to STDOUT 7 my $res = shift; 8 return (ref($res) eq 'ARRAY') && (@{$res} > 0) 9 }; 10 11 sub new { 12 my $class = shift || die "Error building Farm::Job\n"; 13 my %args = @_; 14 15 die "farm Error! Supply tasks argument\n" unless defined($args{tasks}) 16 and UNIVERSAL::isa($args{tasks}, 'ARRAY') 17 and @{$args{tasks}}; 18 19 $args{combine} = sub { $_[0] .= "$[1]\n"; } unless defined($args{combine}); 20 21 $args{on_start} = sub { } unless defined($args{on_start}); 22 23 $args{looks_ok} = $default_looks_ok unless defined($args{looks_ok}); 24 25 # Initialize 26 $args{initialize} = sub { } unless defined($args{initialize}); 27 28 die "Error creating job: 'initialize' is not a CODE ref\n" 29 unless UNIVERSAL::isa($args{initialize}, 'CODE'); 30 31 # Finalize 32 $args{finalize} = sub { } unless defined($args{finalize}); 33 34 die "Error creating job: 'finalize' is not a CODE ref\n" 35 unless UNIVERSAL::isa($args{finalize}, 'CODE'); 36 37 bless \%args, $class; 38 } 39 40 1;
pp2@nereida:~/LFARM/script$ sed -ne '196,216p' Simple.pm | cat -n 1 package Farm::Process; 2 use base qw(Class::Accessor); 3 4 Farm::Process->mk_accessors(qw(task worker fromchild pid sorder begin)); 5 6 # Mapping of a task onto a machine 7 # $process{$pid} = Farm::Process->new( 8 # fromchild => $handle, 9 # task => $t, 10 # pid => $pid, 11 # worker => $w, 12 # sorder => $sorder); 13 # 14 15 sub new { 16 my $class = shift || die "Error building Farm::Process\n"; 17 my %args = @_; 18 19 $args{begin} = time(); 20 bless \%args, $class; 21 }
pp2@nereida:~/LFARM/script$ sed -ne '218,295p' Simple.pm | cat -n 1 package Farm::Cluster; 2 3 # Defines the cluster 4 # Farm::Cluster->new( 5 # beowulf => { rsh => '/usr/bin/ssh', ... } 6 # orion => { rsh => $rsh, rcp => $rcp, user=> 'fulano', ... } 7 # ... 8 # ); 9 # 10 sub new { 11 my $class = shift || die "Error building Farm::Cluster\n"; 12 my %args = @_; 13 my %cluster = (); 14 15 for (keys %args) { 16 die "Illegal machine name $_\n" unless defined($_) and m{\w+}; 17 my $m = Farm::Machine->new(name => $_, %{$args{$_}}); 18 $cluster{$_} = $m; 19 } 20 # user and dir 21 # 22 23 bless \%cluster, $class; 24 } 25 26 sub processornames { 27 my $self = shift; 28 29 return keys(%$self); 30 } 31 32 sub processor { 33 my $self = shift; # cluster 34 my $name = shift; # processor name 35 my $val = shift; 36 37 $self->{$name} = $val if defined($val) and UNIVERSAL::isa($val, 'Farm::Machine'); 38 return $self->{$name}; 39 } 40 41 sub processors { 42 my $self = shift; 43 44 return values(%$self); 45 } 46 47 # Returns a hash (beowulf=>'/usr/bin/ssh', nereida => '', ... ) 48 sub rsh { 49 my $self = shift; 50 51 return map { $_ => $self->{$_}{rsh} } keys(%$self); 52 } 53 54 sub cp { 55 my $cluster = shift; 56 my $src = shift; 57 my $dest = shift || ''; 58 59 die "Can't copy file $src\n" unless -r $src; 60 foreach my $machine ($cluster->processors()) { 61 if (system($machine->rcp, $src, "$machine->{name}:$dest")) { 62 warn "Couldn't copy $src to $machine->{name}:$dest: $?\n" 63 } 64 } 65 } 66 67 sub rm { 68 my $cluster = shift; 69 my $src = shift || die "Cluster 'rm' error: provide a filename\n"; 70 71 foreach my $machine ($cluster->processors()) { 72 if (system($machine->rsh, "$machine->{name}", "rm $src")) { 73 warn "couldn't rm $src in $machine->{name}: $?\n" 74 } 75 } 76 } 77 78 1;
pp2@nereida:~/LFARM/script$ sed -ne '297,370p' Simple.pm | cat -n 1 package Farm::Machine; 2 use IPC::Run3; 3 use base qw(Class::Accessor); 4 Farm::Machine->mk_accessors(qw(name rsh rcp stdout stderr)); 5 6 use constant NULDEV => \undef; 7 8 # Defines machine 9 # only rsh field now 10 11 sub new { 12 my $class = shift || die "Error building Farm::Machine\n"; 13 my %arg = @_; 14 15 die "Provide a name for the machine\n" unless defined($arg{name}); 16 17 unless (defined($arg{rsh})) { 18 my $rsh = `which ssh`; 19 die "Error: define ssh\n" unless defined($rsh); 20 chomp($rsh), 21 $arg{rsh} = $rsh; 22 } 23 24 unless (defined($arg{rcp})) { 25 my $rcp = `which scp`; 26 die "Error: define scp\n" unless defined($rcp); 27 chomp($rcp), 28 $arg{rcp} = $rcp; 29 } 30 31 # Add user field 32 33 # Home directory for this machine 34 $arg{home} = $arg{name} unless exists($arg{home}); 35 36 # Local directories for this machine 37 open $arg{stdout}, "> $arg{name}.output"; 38 open $arg{stderr}, "> .$arg{name}.err"; 39 40 # Add environment variables 41 # stdin stdout stderr 42 43 bless \%arg, $class; 44 } 45 46 sub DESTROY { 47 my $self = shift; 48 49 close($self->stdout); 50 close($self->stderr); 51 } 52 53 # True if machine accepts ssh connections 54 { 55 my $self; 56 local $SIG{ALRM} = sub { $self->{operative} = 0 }; 57 58 sub isoperative { 59 $self = shift; 60 my $seconds = shift || 1; 61 62 my $machine = ['ssh', $self->name,' ps']; 63 $self->{operative} = 1; 64 alarm($seconds); 65 eval { 66 run3($machine, undef, NULDEV, $self->stderr); 67 }; 68 $self->{operative} = 0 if $@; 69 alarm(0); 70 return $self->{operative}; 71 } 72 } 73 74 1;
pp2@nereida:~/LFARM/script$ cat -n operative.pl 1 #!/usr/bin/perl -w 2 # Author: Casiano 3 use strict; 4 use Farm::Simple; 5 6 #### main 7 8 # Warning: the format of the printf in pi0.c influences 9 # the precision 10 # 11 my $m = shift || "beowulf"; 12 13 my $default = { 14 rsh => '/usr/bin/ssh', 15 rcp => '/usr/bin/scp', 16 user => 'casiano', 17 }; 18 19 my @processors = qw(orion nereida beowulf); 20 my $cluster = Farm::Cluster->new( map { $_ => $default } @processors ); 21 22 my $rm = $cluster->processor($m); 23 print $rm->isoperative."\n";