Descargue la distribución desde el enlace http://nereida.deioc.ull.es/˜pp2/perlexamples/Farm-Simple-0.2.tar.gz
pp2@nereida:~/LFARM/script$ farm.pl pi0 100% 7361 7.2KB/s 00:00 pi0 100% 7361 7.2KB/s 00:00 pi0 100% 7361 7.2KB/s 00:00 /usr/bin/ssh beowulf ./pi0 0 4096 8 /usr/bin/ssh orion ./pi0 1 4096 8 /usr/bin/ssh nereida ./pi0 2 4096 8 reaping 11144 (2 4096 8) from worker nereida. Errvars: 5888 29 Desplazamiento ilegal From nereida (11144) received result: 0.39279065746825037042 for task (2 4096 8) reaping 11142 (0 4096 8) from worker beowulf. Errvars: 5888 0 From beowulf (11142) received result: 0.39291265325220731119 for task (0 4096 8) reaping 11143 (1 4096 8) from worker orion. Errvars: 5888 9 Descriptor de fichero erróneo From orion (11143) received result: 0.39285166283354705508 for task (1 4096 8) /usr/bin/ssh nereida ./pi0 3 4096 8 /usr/bin/ssh beowulf ./pi0 4 4096 8 /usr/bin/ssh orion ./pi0 5 4096 8 reaping 11151 (4 4096 8) from worker beowulf. Errvars: 5888 29 Desplazamiento ilegal From beowulf (11151) received result: 0.39266860197050978964 for task (4 4096 8) reaping 11150 (3 4096 8) from worker nereida. Errvars: 5888 10 No hay ningún proceso hijo From nereida (11150) received result: 0.39272963717450859455 for task (3 4096 8) /usr/bin/ssh beowulf ./pi0 6 4096 8 /usr/bin/ssh nereida ./pi0 7 4096 8 Last tasks reaping 11152 (5 4096 8) from worker orion. Errvars: 5888 29 Desplazamiento ilegal From orion (11152) received result: 0.39260755187444501546 for task (5 4096 8) reaping 11159 (7 4096 8) from worker nereida. Errvars: 5888 25 Función ioctl no apropiada para el dispositivo From nereida (11159) received result: 0.39248540707887613621 for task (7 4096 8) reaping 11158 (6 4096 8) from worker beowulf. Errvars: 5888 25 Función ioctl no apropiada para el dispositivo From beowulf (11158) received result: 0.39254648690450405502 for task (6 4096 8) Result: 3.14159265855685 pp2@nereida:~/LFARM/script$
pp2@nereida:~/LFARM/script$ cat -n farm.pl
1 #!/usr/bin/perl -w
2 # Author: Casiano
3 use strict;
4 use Farm::Simple;
5
6 #### main
7
8 # Warning: the format of the printf in pi0.c influences
9 # the precision
10 #
11 use constant NT => 8;
12 use constant LT => NT-1;
13 use constant N => 8**5; # Num intervals
14 use constant NperT => N/NT; # Assume N%NT == 0
15
16 my $dir = `pwd`;
17 chomp $dir;
18 my $rcp = (shift || "/usr/bin/scp");
19 my $executable = (shift || "./pi0");
20 my @tasks = map { [$executable, $_, NperT, NT] } 0..LT; # Tasks for pi
21
22 my $default = {
23 rsh => '/usr/bin/ssh',
24 rcp => '/usr/bin/scp',
25 user => 'casiano',
26 };
27
28 my $cluster = Farm::Cluster->new(
29 orion => $default,
30 nereida => $default,
31 beowulf => $default,
32 );
33
34 my $looks_ok = sub {
35 my $r = shift;
36
37 return ((ref($r) eq 'ARRAY') && @$r && ($r->[0] =~ m{\d+}));
38 };
39
40 my $job = Farm::Job->new(
41 tasks => \@tasks,
42 combine => \&combine,
43 initialize => \&initialize,
44 finalize => \&finalize,
45 looks_ok => $looks_ok,
46 );
47
48 my $farm = Farm::Simple->new(
49 checkpointing => 1,
50 debug=>1,
51 tracedir => 'FARM',
52 );
53
54 my $out = $farm->run($cluster, $job);
55 print "Result: $out\n";
56
57 sub initialize {
58 my ($job, $cluster, $run) = @_;
59
60 # Assume all machines have the same architecture
61 $cluster->cp($executable);
62
63 return 0; # initial value for the accumulator
64 }
65
66 sub finalize {
67 my ($job, $cluster, $run) = @_;
68
69 $cluster->rm($executable);
70 }
71
72 sub combine {
73 $_[0] += $_[1]->{result}[0];
74 }
pp2@nereida:~/LFARM/script$ sed -ne '1,153p' Simple.pm | cat -n
1 package Farm::Simple;
2 use 5.008008;
3 use strict;
4 use warnings;
5
6 use POSIX;
7 use IO::Handle;
8 use IO::File;
9 use YAML qw(DumpFile LoadFile);
10 use Time::HiRes qw(time);
11
12 use base qw(Class::Accessor);
13 Farm::Simple->mk_accessors(qw(checkpointing debug tracedir));
14
15 sub new {
16 my $class = shift;
17 my %args = @_;
18
19 $args{checkpointing} = 1 unless defined($args{checkpointing});
20 $args{debug} = 0 unless defined($args{debug});
21 $args{tracedir} = 'FARMLOG' unless defined($args{tracedir});
22
23 bless \%args, $class;
24 }
25
26 sub run {
27 my $self = shift;
28 my $cluster = shift;
29 my $job = shift;
30
31 my @idles = $cluster->processornames;
32 my %rsh = $cluster->rsh;
33 my @tasks = @{$job->tasks};
34 my $combine = $job->combine;
35 my $looks_ok = $job->looks_ok;
36 my $checkpointing = $self->checkpointing;
37 my $debug = $self->debug;
38
39 my %process; # Key = PID Value = {task=>$t, pid=>$p, worker=>$w, sorder=>$s, begin=>$t}
40 my $accum; # Accumulator
41 my @results; # Results.
42
43 my %workers; # Set of workers
44 @workers{@idles} = ();
45 my %error; # $Error{$w} = [ tasks that failed when executed on $w ]
46
47 my $rorder = 0; # reception order
48 my $tmpdir = $self->tracedir;
49 mkdir $tmpdir if $checkpointing;
50
51 my $process_child = sub {
52 my $child = shift;
53 my $p = $process{$child};
54
55 my @errmsg = ($?, 0+$!, "$!");
56 my @t = @{$p->task()};
57 my $w = $p->worker;
58
59 warn "reaping $child (@t) from worker $w. Errvars: @errmsg\n" if $debug;
60
61 my $handle = $p->fromchild; # Recuperamos el canal con ese hijo
62 my @result = <$handle>;
63 my $end = time();
64 if ($looks_ok->(\@result, $p, @errmsg)) {
65 push @idles, $w; # Now $w is idle again
66 my $r = {
67 task => \@t,
68 result => \@result,
69 worker => $w,
70 begin => $p->begin(),
71 end => $end,
72 PID => $child,
73 sorder => $p->sorder(),
74 rorder => $rorder++,
75 };
76 push @results, $r;
77
78 my @segtoshow = @result>2? @result[0,1]:@result;
79 warn "From $w ($child) received result:\n@segtoshow".
80 "for task (@t)\n" if $debug;
81 }
82 else {
83 warn "Error processing task @t on $w. Errmsg: @errmsg\n";
84 push @{$error{$w}}, \@t;
85 DumpFile("$tmpdir/Error".$p->sorder().".yml",
86 \$w, \@t, $p->begin(), $end, \@errmsg)
87 if $checkpointing;
88 die "No machines left\n" if (keys %workers == keys %error);
89 }
90
91 delete $process{$child};
92
93 };
94
95 my $reaper = sub {
96 my $child;
97
98 $process_child->($child) while (($child = waitpid(-1, WNOHANG)) > 0);
99 }; # end reaper
100
101 # Initialize
102 $accum = $job->initialize->($job, $cluster, $self);
103
104 my $sorder = 0; # Current task position
105 {
106 local $SIG{CHLD} = $reaper;
107 while (@tasks) {
108 while (@idles and @tasks) {
109 my $t = shift @tasks;
110 my $w = shift @idles;
111 my $handle = IO::Handle->new();
112
113 my $c = shift @$t;
114 my $rcmd = $rsh{$w}? "$rsh{$w} $w $c @$t" : "$c @$t";
115 warn "$rcmd\n" if $debug;
116
117 my $p = Farm::Process->new(
118 fromchild => $handle, task => $t, worker => $w, sorder => $sorder,
119 );
120
121 $job->on_start($p);
122 my $pid = open($handle, "$rcmd |") || die "Error: can't fork child to $rcmd\n";
123
124 $p->pid($pid);
125 $process{$pid} = $p;
126
127 $sorder++;
128 } # end while @idles and @tasks
129
130 my $r;
131 while ($r = shift @results) {
132 $combine->($accum, $r);
133 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing;
134 }
135 } # end while (@tasks)
136 } # end scope of reaper
137
138 warn "Last tasks\n" if $debug;
139 while (($_ = wait) > 0) {
140 $process_child->($_);
141 my $r = shift @results;
142 $combine->($accum, $r);
143 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing;
144 }
145
146 # Finalize
147 $job->finalize->($job, $cluster, $self);
148
149 return $accum;
150 }
151
152
153 1;
pp2@nereida:~/LFARM/script$ sed -ne '155,194p' Simple.pm | cat -n
1 package Farm::Job;
2 use base qw(Class::Accessor);
3 Farm::Job->mk_accessors(qw(tasks combine on_start looks_ok initialize finalize));
4
5
6 my $default_looks_ok = sub { # something written to STDOUT
7 my $res = shift;
8 return (ref($res) eq 'ARRAY') && (@{$res} > 0)
9 };
10
11 sub new {
12 my $class = shift || die "Error building Farm::Job\n";
13 my %args = @_;
14
15 die "farm Error! Supply tasks argument\n" unless defined($args{tasks})
16 and UNIVERSAL::isa($args{tasks}, 'ARRAY')
17 and @{$args{tasks}};
18
19 $args{combine} = sub { $_[0] .= "$[1]\n"; } unless defined($args{combine});
20
21 $args{on_start} = sub { } unless defined($args{on_start});
22
23 $args{looks_ok} = $default_looks_ok unless defined($args{looks_ok});
24
25 # Initialize
26 $args{initialize} = sub { } unless defined($args{initialize});
27
28 die "Error creating job: 'initialize' is not a CODE ref\n"
29 unless UNIVERSAL::isa($args{initialize}, 'CODE');
30
31 # Finalize
32 $args{finalize} = sub { } unless defined($args{finalize});
33
34 die "Error creating job: 'finalize' is not a CODE ref\n"
35 unless UNIVERSAL::isa($args{finalize}, 'CODE');
36
37 bless \%args, $class;
38 }
39
40 1;
pp2@nereida:~/LFARM/script$ sed -ne '196,216p' Simple.pm | cat -n
1 package Farm::Process;
2 use base qw(Class::Accessor);
3
4 Farm::Process->mk_accessors(qw(task worker fromchild pid sorder begin));
5
6 # Mapping of a task onto a machine
7 # $process{$pid} = Farm::Process->new(
8 # fromchild => $handle,
9 # task => $t,
10 # pid => $pid,
11 # worker => $w,
12 # sorder => $sorder);
13 #
14
15 sub new {
16 my $class = shift || die "Error building Farm::Process\n";
17 my %args = @_;
18
19 $args{begin} = time();
20 bless \%args, $class;
21 }
pp2@nereida:~/LFARM/script$ sed -ne '218,295p' Simple.pm | cat -n
1 package Farm::Cluster;
2
3 # Defines the cluster
4 # Farm::Cluster->new(
5 # beowulf => { rsh => '/usr/bin/ssh', ... }
6 # orion => { rsh => $rsh, rcp => $rcp, user=> 'fulano', ... }
7 # ...
8 # );
9 #
10 sub new {
11 my $class = shift || die "Error building Farm::Cluster\n";
12 my %args = @_;
13 my %cluster = ();
14
15 for (keys %args) {
16 die "Illegal machine name $_\n" unless defined($_) and m{\w+};
17 my $m = Farm::Machine->new(name => $_, %{$args{$_}});
18 $cluster{$_} = $m;
19 }
20 # user and dir
21 #
22
23 bless \%cluster, $class;
24 }
25
26 sub processornames {
27 my $self = shift;
28
29 return keys(%$self);
30 }
31
32 sub processor {
33 my $self = shift; # cluster
34 my $name = shift; # processor name
35 my $val = shift;
36
37 $self->{$name} = $val if defined($val) and UNIVERSAL::isa($val, 'Farm::Machine');
38 return $self->{$name};
39 }
40
41 sub processors {
42 my $self = shift;
43
44 return values(%$self);
45 }
46
47 # Returns a hash (beowulf=>'/usr/bin/ssh', nereida => '', ... )
48 sub rsh {
49 my $self = shift;
50
51 return map { $_ => $self->{$_}{rsh} } keys(%$self);
52 }
53
54 sub cp {
55 my $cluster = shift;
56 my $src = shift;
57 my $dest = shift || '';
58
59 die "Can't copy file $src\n" unless -r $src;
60 foreach my $machine ($cluster->processors()) {
61 if (system($machine->rcp, $src, "$machine->{name}:$dest")) {
62 warn "Couldn't copy $src to $machine->{name}:$dest: $?\n"
63 }
64 }
65 }
66
67 sub rm {
68 my $cluster = shift;
69 my $src = shift || die "Cluster 'rm' error: provide a filename\n";
70
71 foreach my $machine ($cluster->processors()) {
72 if (system($machine->rsh, "$machine->{name}", "rm $src")) {
73 warn "couldn't rm $src in $machine->{name}: $?\n"
74 }
75 }
76 }
77
78 1;
pp2@nereida:~/LFARM/script$ sed -ne '297,370p' Simple.pm | cat -n
1 package Farm::Machine;
2 use IPC::Run3;
3 use base qw(Class::Accessor);
4 Farm::Machine->mk_accessors(qw(name rsh rcp stdout stderr));
5
6 use constant NULDEV => \undef;
7
8 # Defines machine
9 # only rsh field now
10
11 sub new {
12 my $class = shift || die "Error building Farm::Machine\n";
13 my %arg = @_;
14
15 die "Provide a name for the machine\n" unless defined($arg{name});
16
17 unless (defined($arg{rsh})) {
18 my $rsh = `which ssh`;
19 die "Error: define ssh\n" unless defined($rsh);
20 chomp($rsh),
21 $arg{rsh} = $rsh;
22 }
23
24 unless (defined($arg{rcp})) {
25 my $rcp = `which scp`;
26 die "Error: define scp\n" unless defined($rcp);
27 chomp($rcp),
28 $arg{rcp} = $rcp;
29 }
30
31 # Add user field
32
33 # Home directory for this machine
34 $arg{home} = $arg{name} unless exists($arg{home});
35
36 # Local directories for this machine
37 open $arg{stdout}, "> $arg{name}.output";
38 open $arg{stderr}, "> .$arg{name}.err";
39
40 # Add environment variables
41 # stdin stdout stderr
42
43 bless \%arg, $class;
44 }
45
46 sub DESTROY {
47 my $self = shift;
48
49 close($self->stdout);
50 close($self->stderr);
51 }
52
53 # True if machine accepts ssh connections
54 {
55 my $self;
56 local $SIG{ALRM} = sub { $self->{operative} = 0 };
57
58 sub isoperative {
59 $self = shift;
60 my $seconds = shift || 1;
61
62 my $machine = ['ssh', $self->name,' ps'];
63 $self->{operative} = 1;
64 alarm($seconds);
65 eval {
66 run3($machine, undef, NULDEV, $self->stderr);
67 };
68 $self->{operative} = 0 if $@;
69 alarm(0);
70 return $self->{operative};
71 }
72 }
73
74 1;
pp2@nereida:~/LFARM/script$ cat -n operative.pl
1 #!/usr/bin/perl -w
2 # Author: Casiano
3 use strict;
4 use Farm::Simple;
5
6 #### main
7
8 # Warning: the format of the printf in pi0.c influences
9 # the precision
10 #
11 my $m = shift || "beowulf";
12
13 my $default = {
14 rsh => '/usr/bin/ssh',
15 rcp => '/usr/bin/scp',
16 user => 'casiano',
17 };
18
19 my @processors = qw(orion nereida beowulf);
20 my $cluster = Farm::Cluster->new( map { $_ => $default } @processors );
21
22 my $rm = $cluster->processor($m);
23 print $rm->isoperative."\n";
