Escriba un módulo Batch::Cluster
que provee:
new
que recibe
my $cluster = Batch::Cluster->new('machine1', 'machine2', ...);
La llamada a new
retorna un objeto de la clase Batch::Cluster
que
es una colección de objetos GRID::Machine. Para cada máquina, se abre una conexión SSH
y se arranca un intérprete Perl
que permanece a la espera de comandos desde
la máquina cliente.
Batch::Cluster
disponen de un método run
que ejecutará un array
de comandos en cada una de las máquinas:
my $result = $cluster->run(@commands);Recibe una lista de comandos y ejecuta cada comando como un proceso remoto. Retorna un objeto
Batch::Cluster::Result
.
Un objeto Batch::Cluster::Result
describe una colección de objetos GRID::Machine::Result
copyandmake
y chdir
:
my $c = Batch::Cluster->new(@machine); || die "No machines has been initialized in the cluster"; # Transference of files to remote hosts $c->copyandmake( dir => 'pi', makeargs => 'pi', files => [ qw{pi.c Makefile} ], cleanfiles => $clean, cleandirs => $clean, # remove the whole directory at the end keepdir => 1, ); # This method changes the remote working directory in all hosts $c->chdir("pi/") || die "Can't change to pi/\n";
1 use List::Util qw(sum); 2 use Batch::Cluster; 3 4 my $np = 20; # Number of processes 5 my $N = 1000; # Number of iterations 6 my $clean = 0; # The files are not removed when the execution is finished 7 8 my @cluster = ( 'host1', 'host2', 'host3' ); # Hosts 9 10 my $c = Batch::Cluster->new(@cluster) || die "Error Intializing Cluster"; 11 12 # Transference of files to remote hosts 13 $c->copyandmake( 14 dir => 'pi', 15 makeargs => 'pi', 16 files => [ qw{pi.c Makefile} ], 17 cleanfiles => $clean, 18 cleandirs => $clean, # remove the whole directory at the end 19 keepdir => 1, 20 ); 21 22 # This method changes the remote working directory on all hosts 23 $c->chdir("pi/") || die "Can't change to pi/\n"; 24 25 my @commands = map { "./pi $_ $N $np" } 0..$np-1 26 print "Pi Value: ".sum($c->run(@commands))."\n";
Aqui vemos un borrador de
como podría esribirse el método chdir
de un objeto Batch::Cluster
:
sub chdir { my ($self, $dir) = @_; my $r = Batch::Cluster::Result->new(); for ($self->HostGMs()) { my $machine_result = $_->chdir($dir); $r->add(host_name => $_->host, machine_result => $machine_result); } return $r; }
HostGMs
devuelve la lista de objetos GRID::Machines
de las máquinas que integran el cluster.
$self->host($_)
devuelve el objeto GRID::Machine
cuyo nombre es $_
.
add
de un objeto Batch::Cluster::Result
añade un nuevo
objeto GRID::Machine::Result
al objeto Batch::Cluster::Result
anotando que fué obtenido
en la máquina $_->host
.
$dir
se admita también un array de argumentos @dir
que permita el cambio a un directorio
diferente en cada máquina
La llamada a chdir de la línea 23:
$c->chdir("pi/") || die "Can't change to pi/\n";Supone que la evaluación de un objeto
Batch::Cluster::Result
en un contexto booleano
esta sobrecargada:
use overload q("") => 'toStr', bool => 'allOK';
Este código es un posible borrador del método allOK
que maneja la sobrecarga:
sub allOK { my $self = shift; return (first { !$_ } $self->Results)? 0 : 1; }El método
first
esta definido en List::Util.
El borrador del método copyandmake
es similar al de chdir
:
sub copyandmake { my ($self, %opts) = @_; my $r = GRID::Cluster::Result->new(); for ($self->HostGMs()) { my $machine_result = $_->copyandmake(%opts); $r->add(host_name => $_->host(), machine_result => $machine_result); } return $r; }De nuevo
copyandmake
podría ser mas versátil y admitir que parámetros
como dir
, makeargs
, etc. admitieran arrays anónimos para poder hacer
que diferentes máquinas hiceran cosas distintas.
El método run
parece el más difícil de escribir.
Pueden simplificarnos su escritura alguno de los siguientes
módulos:
También puede ayudarnos este borrador que hace uso de IO::Select.
Retorna una lista de cadenas y no un objeto Batch::Cluster::Result
:
sub run { my $self = shift; my @commands = map { "$_ | " } @_; my @proc; my @pid; my %hostName; # task number => host name my %taskNumber; # stream handler => task number my $nlt = 0; # number of launched tasks my $np = @commands; # number of tasks my $lp = $np - 1; # last task my $readset = IO::Select->new(); # sub launchTask assumes $_ has the name of the machine my $launchTask = sub { my $m = $self->get_host($_); ($proc[$nlt], $pid[$nlt]) = $m->open(shift @commands); $proc[$nlt]->blocking(1); $readset->add($proc[$nlt]); $hostName{$nlt} = $_; my $address = 0 + $proc[$nlt]; $taskNumber{$address} = $nlt; $nlt++; }; for ($self->hostNames()) { $launchTask->(); last if ($nlt > $lp); } my $nunFinished = 0; my @ready; my @result; do { push @ready, $readset->can_read unless @ready; my $handle = shift @ready; my $tn = $taskNumber{0 + $handle}; my ($aux, $bytes, $r); while ((!defined($bytes)) || ($bytes)) { $bytes = sysread($handle, $aux, BUFFERSIZE); $r .= $aux if ((defined($bytes)) && ($bytes)); } $result[$tn] = $r; $readset->remove($handle); close $handle; if (@commands) { given ($hostName{$tn}) { $launchTask->(); } } } until (++$nunFinished == $np); return @result; }