El siguiente código implanta una granja de procesadores
usando pipe s y wait s. La llamada se aplica
al ejemplo del cálculo de
. En el ejemplo se supone
que tenemos un número pequeño de procesadores NP
y un número comparativamente grande NT
de tareas:
Sigue un ejemplo de ejecución:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ farm_pi1.pl pi0 100% 7386 7.2KB/s 00:00 pi0 100% 7386 7.2KB/s 00:00 /usr/bin/ssh -l casiano orion '/tmp/pi0 0 2500 4' /usr/bin/ssh -l casiano beowulf '/tmp/pi0 1 2500 4' From beowulf (12515) received result: 0.785498 for task (1 2500 4) /usr/bin/ssh -l casiano beowulf '/tmp/pi0 2 2500 4' From orion (12514) received result: 0.785698 for task (0 2500 4) /usr/bin/ssh -l casiano orion '/tmp/pi0 3 2500 4' Last tasks beowulf (12516) task (2 2500 4), Combined = 2.356494 orion (12517) task (3 2500 4), Combined = 3.141592 Result: 3.141592
La subrutina farm
implanta la gestión de la granja.
En la llamada a farm
(líneas 106-113 del código que sigue)
es necesario especificar al menos:
command_handler
en nuestro ejemplo) el
cuál se encargará de preparar la cadena conteniendo la orden o programa
a ejecutar
combine
) que debe usar el maestro para
combinar el resultado parcial devuelto por
el trabajador
con el resultado acumulado hasta el momento (primer argumento
$_[0]
).
El segundo argumento $_[1]
es una referencia
a la lista de líneas producida por la salida del
último comando.
lhp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n farm_pi1.pl 1 #!/usr/bin/perl -w 2 # Author: Casiano 3 use strict; 4 use POSIX; 5 use IO::Handle; 6 7 my $dir = `pwd`; 8 chomp $dir; 9 my $rcp = (shift || "/usr/bin/scp"); 10 my $rdir = (shift || "/tmp/"); 11 my $executable = (shift || "pi0"); .. ................................. 99 #### main 100 use constant NT => 4; 101 use constant LT => NT-1; 102 use constant N => 10000; # Num intervals 103 use constant NperT => N/NT; # Assume N%NT == 0 104 my @processors = qw/orion beowulf/; 105 my @tasks = map { [$_, NperT, NT] } 0..LT; # Tasks for pi 106 my $out = farm(tasks=> \@tasks, 107 processors=> \@processors, 108 command=> \&command_handler, 109 combine=> \&combine, 110 initialize =>\&initialize, 111 rsh => '/usr/bin/ssh -l casiano', 112 debug=>1 113 ); 114 print "Result: $out\n";
En este caso, cada tarea
consiste en hacer una de las NT
sumas
necesarias para calcular
:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n pi0.c 1 #include <stdio.h> 2 #include <stdlib.h> 3 4 main(int argc, char **argv) { 5 int id, N, np, i; 6 double sum, left; 7 8 if (argc != 4) { 9 printf("Uso:\n%s id N np\n",argv[0]); 10 exit(1); 11 } 12 id = atoi(argv[1]); 13 N = atoi(argv[2]); 14 np = atoi(argv[3]); 15 for(i=id, sum = 0; i<N; i+=np) { 16 double x = (i + 0.5)/N; 17 sum += 4 / (1 + x*x); 18 } 19 sum /= N; 20 printf("%lf\n", sum); 21 }El programa espera tres argumentos:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ pi0 Uso: pi0 id N np lhp@nereida:~/Lperl/src/perl_networking/ch2$ pi0 2 1000 4 0.785148Estos argumentos son: el identificador lógico del procesador, el número de subintervalos en que se particiona el intervalo
[0,1]
y el número de procesos.
El ejecutable -originalmente en la máquina capataz o farmer - se copia en cada
máquina obrero o worker (se asume compatibilidad del ejecutable). Esta tarea
(línea 13, subrutina initialize
) es uno de los parámetros/manejadores que recibe
la subrutina farm
. Será disparada por farm
al comienzo de la
computación:
13 sub initialize { 14 my ($tasks, $procs) = @_; 15 16 # Assume all machines have the same architecture 17 my %Machines; 18 @Machines{@$procs} = (); 19 foreach my $machine (keys %Machines) { 20 die "couldn't copy $executable to $machine:$rdir: $?\n" 21 if system($rcp, $executable, "$machine:$rdir"); 22 } 23 return 0; # initial value for the accumulator 24 }
La función initialize
retorna el valor inicial para
el ''acumulador'' del granjero (en este caso 0, línea 23). Cada vez que termina
una tarea se ejecuta un manejador de combinación. El acumulador
es una variable que será pasada a dicho manejador de combinación
junto con el resultado
de la ejecución de la tarea que acaba de finalizar. El resultado
retornado por el manejador se guarda en el ''acumulador''.
Después de iniciar el acumulador en las líneas 53-54
el granjero (subrutina farm
en la línea 37) divide
su tiempo dentro del bucle (líneas 56-86)
en enviar tareas a los procesadores ociosos (líneas 57-72)
y recolectar resultados desde los que
han acabado una tarea (líneas 74-85).
La salida del bucle principal indica que todas las tareas han sido enviadas. No significa sin embargo que todos los resultados hayan sido recibidos, pues puede suceder que los últimos procesos envíados no hayan terminado.
En las líneas 89-95 se espera por la resolución de las últimas tareas.
37 sub farm { 38 my %args = @_; 39 my @tasks = @{$args{tasks} || die "farm Error! Supply tasks argument\n"}; 40 my @idles = @{$args{processors} || die "farm Error! Supply processors argument\n"}; 41 my $rsh = ($args{rsh} || "/usr/bin/ssh" || "/usr/bin/rsh"); chomp($rsh); 42 my $command = ($args{command} || die "farm Error! Supply a command argument\n"); 43 my $combine = ($args{combine} || sub { $_[0] .= "$[1]\n"; }); 44 my $debug = ($args{debug} || 0); 45 46 my %FROMCHILD; # Key = PID Value = IO handler for that process 47 my %Task; # Key = PID Value = [ lista de parámetros de la tarea ] 48 my %Worker; # Key = PID Value = machine name or address 49 50 my $handle; 51 52 # Initialize 53 my $accum = defined($args{initialize})? 54 $args{initialize}->(\@tasks, \@idles, $rsh, $command, $debug):undef; 55 56 while (@tasks) { 57 if (@idles) { 58 my $t = shift @tasks; 59 my $w = shift @idles; 60 $handle = IO::Handle->new(); 61 62 my $rcmd = "$rsh $w '". 63 $command->($t, $w, $debug). 64 "'"; 65 warn "$rcmd\n" if $debug; 66 67 my $pid = open($handle, "$rcmd |"); 68 69 $FROMCHILD{$pid} = $handle; 70 $Task{$pid} = $t; 71 $Worker{$pid} = $w; 72 } 73 74 my $child = waitpid(-1, WNOHANG); 75 if ($child > 0) { # Hijo cosechado 76 my @t = @{$Task{$child}}; delete($Task{$child}); 77 my $w = $Worker{$child}; delete($Worker{$child}); 78 79 $handle = $FROMCHILD{$child}; # Recuperamos el canal con ese hijo 80 my @result = <$handle>; 81 push @idles, $w; # Now $w is idle again 82 $combine->($accum, \@result, \@t, $w); 83 warn "From $w ($child) received result:\n@result[0..$#result>1?1:$#result]". 84 "for task (@t)\n" if $debug; 85 } 86 } 87 88 warn "Last tasks\n" if $debug; 89 while (($_ = wait) > 0) { 90 my @task = @{$Task{$_}}; 91 $handle = $FROMCHILD{$_}; 92 my @result = <$handle>; 93 $combine->($accum, \@result); 94 warn "$Worker{$_} ($_) task (@task), Combined = $accum\n" if $debug; 95 } 96 return $accum; 97 }
El manejador command_handler
recibe
la tarea (la lista anónima de parámetros)
y retorna la cadena con el comando a ejecutar.
25 sub command_handler { 26 my $t = shift; 27 my @parameters = @$t; 28 29 return "/tmp/pi0 @parameters"; 30 }El manejador de combinación suma los dos parámetros que recibe:
32 sub combine { 33 $_[0] += ${$_[1]}[0]; 34 }
El guión asume que se ha instalado un sistema de autentificación
automática usando parejas clave pública-clave privada y agentes.
El guión no considera el caso en que el login de usuario cambia de máquina a máquina.
Una solución a este problema es incluir un fichero de configuración ssh
:
hp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n ~/.ssh/config 1 # man ssh_config 2 Host machine1.domain 3 user casiano 4 5 Host machine2.domain 6 user pp2