Granja usando pipes y wait

En el paradigma de paralelismo conocido como Farm o Granja de Procesadores la tarea es dividida en el subconjunto de tareas a realizar. Un procesador denominado maestro o capataz envia las tareas a las restantes estaciones-trabajadores. Tan pronto como un trabajador devuelve el resultado de una tarea el capataz le da una nueva subtarea. El capataz combina el resultado parcial con los que haya obtenido hasta ese momento. Una ventaja que tiene este paradigma es que consigue equilibrar la carga de trabajo entre las máquinas, independientemente de que estas sean heterogéneas o no, independientemente de cual sea la carga dinámica de las estaciones por la presencia de otros procesos y usuarios e independientemente de que las tareas sean heterogéneas en sus necesidades de tiempo de cómputo.

Ejecución

El siguiente código implanta una granja de procesadores usando pipe s y wait s. La llamada se aplica al ejemplo del cálculo de $ \pi $ . En el ejemplo se supone que tenemos un número pequeño de procesadores NP y un número comparativamente grande NT de tareas: Sigue un ejemplo de ejecución:

lhp@nereida:~/Lperl/src/perl_networking/ch2$ farm_pi1.pl
pi0            100% 7386     7.2KB/s   00:00
pi0            100% 7386     7.2KB/s   00:00
/usr/bin/ssh -l casiano orion '/tmp/pi0 0 2500 4'
/usr/bin/ssh -l casiano beowulf '/tmp/pi0 1 2500 4'
From beowulf (12515) received result:
0.785498
for task (1 2500 4)
/usr/bin/ssh -l casiano beowulf '/tmp/pi0 2 2500 4'
From orion (12514) received result:
0.785698
for task (0 2500 4)
/usr/bin/ssh -l casiano orion '/tmp/pi0 3 2500 4'
Last tasks
beowulf (12516) task (2 2500 4), Combined = 2.356494
orion (12517) task (3 2500 4), Combined = 3.141592
Result: 3.141592

La LLamada a farm

La subrutina farm implanta la gestión de la granja. En la llamada a farm (líneas 106-113 del código que sigue) es necesario especificar al menos:

  1. La referencia a la lista de tareas, caracterizada por la lista de argumentos al programa a ejecutar,
  2. La referencia a la lista de nombres de las máquinas que participarán en el cómputo. Se asume que las máquinas están accesibles por ssh o rsh .
  3. El manejador del comando (command_handler en nuestro ejemplo) el cuál se encargará de preparar la cadena conteniendo la orden o programa a ejecutar
  4. La función (clave combine) que debe usar el maestro para combinar el resultado parcial devuelto por el trabajador con el resultado acumulado hasta el momento (primer argumento $_[0]). El segundo argumento $_[1] es una referencia a la lista de líneas producida por la salida del último comando.

lhp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n farm_pi1.pl
 1  #!/usr/bin/perl -w
 2  # Author: Casiano
 3  use strict;
 4  use POSIX;
 5  use IO::Handle;
 6
 7  my $dir = `pwd`;
 8  chomp $dir;
 9  my $rcp = (shift || "/usr/bin/scp");
10  my $rdir = (shift || "/tmp/");
11  my $executable = (shift || "pi0");
..  .................................
99  #### main
100  use constant NT => 4;
101  use constant LT => NT-1;
102  use constant N => 10000; # Num intervals
103  use constant NperT => N/NT; # Assume N%NT == 0
104  my @processors = qw/orion beowulf/;
105  my @tasks = map { [$_, NperT, NT] } 0..LT; # Tasks for pi
106  my $out = farm(tasks=> \@tasks,
107                processors=> \@processors,
108                command=> \&command_handler,
109                combine=> \&combine,
110                initialize  =>\&initialize,
111                rsh => '/usr/bin/ssh -l casiano',
112                debug=>1
113            );
114  print "Result: $out\n";

La Tarea: Código C

En este caso, cada tarea $ k$ consiste en hacer una de las NT sumas necesarias para calcular $ \pi $ :

$\displaystyle \sum_{i=k, i+=NT}^{N-1} \frac{4}{N \times \left (1+ (\frac{i+0.5}{N})^2 \right)}$   $\displaystyle \mbox{ con $k = 0 \ldots NT-1$}$ (3.2)

Para realizar la tarea usamos un programa escrito en C que calcula la suma parcial explicitada en la ecuación anterior:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n pi0.c
 1  #include <stdio.h>
 2  #include <stdlib.h>
 3
 4  main(int argc, char **argv) {
 5    int id, N, np, i;
 6    double sum, left;
 7
 8    if (argc != 4) {
 9      printf("Uso:\n%s id N np\n",argv[0]);
10      exit(1);
11    }
12    id = atoi(argv[1]);
13    N = atoi(argv[2]);
14    np = atoi(argv[3]);
15    for(i=id, sum = 0; i<N; i+=np) {
16      double x = (i + 0.5)/N;
17      sum += 4 / (1 + x*x);
18    }
19    sum /= N;
20    printf("%lf\n", sum);
21  }
El programa espera tres argumentos:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ pi0
Uso:
pi0 id N np
lhp@nereida:~/Lperl/src/perl_networking/ch2$ pi0 2 1000 4
0.785148
Estos argumentos son: el identificador lógico del procesador, el número de subintervalos en que se particiona el intervalo [0,1] y el número de procesos.

Inicialización

El ejecutable -originalmente en la máquina capataz o farmer - se copia en cada máquina obrero o worker (se asume compatibilidad del ejecutable). Esta tarea (línea 13, subrutina initialize) es uno de los parámetros/manejadores que recibe la subrutina farm. Será disparada por farm al comienzo de la computación:

13  sub initialize {
14    my ($tasks, $procs) = @_;
15
16    # Assume all machines have the same architecture
17    my %Machines;
18    @Machines{@$procs} = ();
19    foreach my $machine (keys %Machines) {
20      die "couldn't copy $executable to $machine:$rdir: $?\n"
21         if system($rcp, $executable, "$machine:$rdir");
22    }
23    return 0; # initial value for the accumulator
24  }

La función initialize retorna el valor inicial para el ''acumulador'' del granjero (en este caso 0, línea 23). Cada vez que termina una tarea se ejecuta un manejador de combinación. El acumulador es una variable que será pasada a dicho manejador de combinación junto con el resultado de la ejecución de la tarea que acaba de finalizar. El resultado retornado por el manejador se guarda en el ''acumulador''.

El Granjero

Después de iniciar el acumulador en las líneas 53-54 el granjero (subrutina farm en la línea 37) divide su tiempo dentro del bucle (líneas 56-86) en enviar tareas a los procesadores ociosos (líneas 57-72) y recolectar resultados desde los que han acabado una tarea (líneas 74-85).

La salida del bucle principal indica que todas las tareas han sido enviadas. No significa sin embargo que todos los resultados hayan sido recibidos, pues puede suceder que los últimos procesos envíados no hayan terminado.

En las líneas 89-95 se espera por la resolución de las últimas tareas.

37  sub farm {
38    my %args = @_;
39    my @tasks = @{$args{tasks} || die "farm Error! Supply tasks argument\n"};
40    my @idles = @{$args{processors} || die "farm Error! Supply processors argument\n"};
41    my $rsh = ($args{rsh} || "/usr/bin/ssh" || "/usr/bin/rsh"); chomp($rsh);
42    my $command = ($args{command} || die "farm Error! Supply a command argument\n");
43    my $combine = ($args{combine} || sub { $_[0] .= "$[1]\n"; });
44    my $debug = ($args{debug} || 0);
45
46    my %FROMCHILD; # Key = PID Value = IO handler for that process
47    my %Task;      # Key = PID Value = [ lista de parámetros de la tarea ]
48    my %Worker;    # Key = PID Value = machine name or address
49
50    my $handle;
51
52    # Initialize
53    my $accum = defined($args{initialize})?
54          $args{initialize}->(\@tasks, \@idles, $rsh,  $command, $debug):undef;
55
56    while (@tasks) {
57      if (@idles) {
58        my $t = shift @tasks;
59        my $w = shift @idles;
60        $handle = IO::Handle->new();
61
62        my $rcmd = "$rsh $w '".
63                  $command->($t, $w, $debug).
64                  "'";
65        warn "$rcmd\n" if $debug;
66
67        my $pid = open($handle, "$rcmd |");
68
69        $FROMCHILD{$pid} = $handle;
70        $Task{$pid} = $t;
71        $Worker{$pid} = $w;
72      }
73
74      my $child = waitpid(-1, WNOHANG);
75      if ($child > 0) { # Hijo cosechado
76        my @t = @{$Task{$child}}; delete($Task{$child});
77        my $w = $Worker{$child};  delete($Worker{$child});
78
79        $handle = $FROMCHILD{$child}; # Recuperamos el canal con ese hijo
80        my @result = <$handle>;
81        push @idles, $w; # Now $w is idle again
82        $combine->($accum, \@result, \@t, $w);
83        warn "From $w ($child) received result:\n@result[0..$#result>1?1:$#result]".
84             "for task (@t)\n" if $debug;
85      }
86    }
87
88    warn "Last tasks\n" if $debug;
89    while (($_ = wait) > 0) {
90      my @task = @{$Task{$_}};
91      $handle = $FROMCHILD{$_};
92      my @result = <$handle>;
93      $combine->($accum, \@result);
94      warn "$Worker{$_} ($_) task (@task), Combined = $accum\n" if $debug;
95    }
96    return $accum;
97  }

El manejador command_handler recibe la tarea (la lista anónima de parámetros) y retorna la cadena con el comando a ejecutar.

25  sub command_handler {
26    my $t = shift;
27    my @parameters = @$t;
28
29    return "/tmp/pi0 @parameters";
30  }
El manejador de combinación suma los dos parámetros que recibe:
32  sub combine {
33    $_[0] += ${$_[1]}[0];
34  }

Controlando el Login de Usuario

El guión asume que se ha instalado un sistema de autentificación automática usando parejas clave pública-clave privada y agentes. El guión no considera el caso en que el login de usuario cambia de máquina a máquina. Una solución a este problema es incluir un fichero de configuración ssh:

hp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n ~/.ssh/config
     1  # man  ssh_config
     2  Host machine1.domain
     3  user casiano
     4
     5  Host machine2.domain
     6  user pp2



Subsecciones
Casiano Rodríguez León
Licencia de Creative Commons
Programación Distribuida y Mejora del Rendimiento
por Casiano Rodríguez León is licensed under a Creative Commons Reconocimiento 3.0 Unported License.

Permissions beyond the scope of this license may be available at http://campusvirtual.ull.es/ocw/course/view.php?id=44.
2012-06-19