Escriba un módulo Batch::Cluster que provee:
new que recibe
my $cluster = Batch::Cluster->new('machine1', 'machine2', ...);
La llamada a new retorna un objeto de la clase Batch::Cluster que
es una colección de objetos GRID::Machine. Para cada máquina, se abre una conexión SSH
y se arranca un intérprete Perl que permanece a la espera de comandos desde
la máquina cliente.
Batch::Cluster disponen de un método run que ejecutará un array
de comandos en cada una de las máquinas:
my $result = $cluster->run(@commands);Recibe una lista de comandos y ejecuta cada comando como un proceso remoto. Retorna un objeto
Batch::Cluster::Result.
Un objeto Batch::Cluster::Result describe una colección de objetos GRID::Machine::Result
copyandmake y chdir:
my $c = Batch::Cluster->new(@machine);
|| die "No machines has been initialized in the cluster";
# Transference of files to remote hosts
$c->copyandmake(
dir => 'pi',
makeargs => 'pi',
files => [ qw{pi.c Makefile} ],
cleanfiles => $clean,
cleandirs => $clean, # remove the whole directory at the end
keepdir => 1,
);
# This method changes the remote working directory in all hosts
$c->chdir("pi/") || die "Can't change to pi/\n";
1 use List::Util qw(sum);
2 use Batch::Cluster;
3
4 my $np = 20; # Number of processes
5 my $N = 1000; # Number of iterations
6 my $clean = 0; # The files are not removed when the execution is finished
7
8 my @cluster = ( 'host1', 'host2', 'host3' ); # Hosts
9
10 my $c = Batch::Cluster->new(@cluster) || die "Error Intializing Cluster";
11
12 # Transference of files to remote hosts
13 $c->copyandmake(
14 dir => 'pi',
15 makeargs => 'pi',
16 files => [ qw{pi.c Makefile} ],
17 cleanfiles => $clean,
18 cleandirs => $clean, # remove the whole directory at the end
19 keepdir => 1,
20 );
21
22 # This method changes the remote working directory on all hosts
23 $c->chdir("pi/") || die "Can't change to pi/\n";
24
25 my @commands = map { "./pi $_ $N $np" } 0..$np-1
26 print "Pi Value: ".sum($c->run(@commands))."\n";
Aqui vemos un borrador de
como podría esribirse el método chdir de un objeto Batch::Cluster:
sub chdir {
my ($self, $dir) = @_;
my $r = Batch::Cluster::Result->new();
for ($self->HostGMs()) {
my $machine_result = $_->chdir($dir);
$r->add(host_name => $_->host, machine_result => $machine_result);
}
return $r;
}
HostGMs devuelve la lista de objetos GRID::Machines
de las máquinas que integran el cluster.
$self->host($_) devuelve el objeto GRID::Machine
cuyo nombre es $_.
add de un objeto Batch::Cluster::Result añade un nuevo
objeto GRID::Machine::Result
al objeto Batch::Cluster::Result anotando que fué obtenido
en la máquina $_->host.
$dir
se admita también un array de argumentos @dir que permita el cambio a un directorio
diferente en cada máquina
La llamada a chdir de la línea 23:
$c->chdir("pi/") || die "Can't change to pi/\n";
Supone que la evaluación de un objeto Batch::Cluster::Result en un contexto booleano
esta sobrecargada:
use overload q("") => 'toStr',
bool => 'allOK';
Este código es un posible borrador del método allOK
que maneja la sobrecarga:
sub allOK {
my $self = shift;
return (first { !$_ } $self->Results)? 0 : 1;
}
El método first esta definido en List::Util.
El borrador del método copyandmake es similar al de chdir:
sub copyandmake {
my ($self, %opts) = @_;
my $r = GRID::Cluster::Result->new();
for ($self->HostGMs()) {
my $machine_result = $_->copyandmake(%opts);
$r->add(host_name => $_->host(), machine_result => $machine_result);
}
return $r;
}
De nuevo copyandmake podría ser mas versátil y admitir que parámetros
como dir, makeargs, etc. admitieran arrays anónimos para poder hacer
que diferentes máquinas hiceran cosas distintas.
El método run parece el más difícil de escribir.
Pueden simplificarnos su escritura alguno de los siguientes
módulos:
También puede ayudarnos este borrador que hace uso de IO::Select.
Retorna una lista de cadenas y no un objeto Batch::Cluster::Result:
sub run {
my $self = shift;
my @commands = map { "$_ | " } @_;
my @proc;
my @pid;
my %hostName; # task number => host name
my %taskNumber; # stream handler => task number
my $nlt = 0; # number of launched tasks
my $np = @commands; # number of tasks
my $lp = $np - 1; # last task
my $readset = IO::Select->new();
# sub launchTask assumes $_ has the name of the machine
my $launchTask = sub {
my $m = $self->get_host($_);
($proc[$nlt], $pid[$nlt]) = $m->open(shift @commands);
$proc[$nlt]->blocking(1);
$readset->add($proc[$nlt]);
$hostName{$nlt} = $_;
my $address = 0 + $proc[$nlt];
$taskNumber{$address} = $nlt;
$nlt++;
};
for ($self->hostNames()) {
$launchTask->();
last if ($nlt > $lp);
}
my $nunFinished = 0;
my @ready;
my @result;
do {
push @ready, $readset->can_read unless @ready;
my $handle = shift @ready;
my $tn = $taskNumber{0 + $handle};
my ($aux, $bytes, $r);
while ((!defined($bytes)) || ($bytes)) {
$bytes = sysread($handle, $aux, BUFFERSIZE);
$r .= $aux if ((defined($bytes)) && ($bytes));
}
$result[$tn] = $r;
$readset->remove($handle);
close $handle;
if (@commands) {
given ($hostName{$tn}) {
$launchTask->();
}
}
} until (++$nunFinished == $np);
return @result;
}
