En la sección anterior vimos un pipeline con $N etapas
que resuelve el problema de la mochila 0-1. Se arrancan $N
threads cada una de las cuales calcula una columna de la
tabla de programación dinámica
.
Si tenemos un sistema multiprocesador es posible confiar que el sistema operativo asigne las threads a los procesadores de forma adecuada y existe alguna mejora en el rendimiento.
Sin embargo, tal esperanza es infundada en la mayoría de las aplicaciones y plataformas. Si repasamos el algoritmo anterior veremos que el tiempo invertido en cómputo entre dos comunicaciones es pequeño:
62 $left[$c] = $from_left->dequeue();
63 if ($c >= $w[$k]) {
64 my $y = $left[$c-$w[$k]]+$p[$k];
65 $f[$c] = ($left[$c] < $y)? $y : $left[$c];
66 }
67 else {
68 $f[$c] = $left[$c];
69 }
70 $to_right->enqueue($f[$c]);
Después de recibir el valor en la línea 62 se realizan
los (breves) computos de las líneas 64-65 y se procede
a un envío. Sólo podemos tener expectativas de mejorar
el rendimiento si el tiempo invertido entre las dos comunicaciones
es mayor que el tiempo de cómputo entre ellas. Que no es
el caso.
Otro problema es que, en la práctica, el número de procesadores
disponibles en una máquina es mucho menor que el número de threads.
Mantener un gran número de threads supone una sobrecarga para el sistema
mayor que mantener un número pequeño.
El siguiente código permite la creación de un pipeline
con un número arbitrario $N de etapas que son asignadas
a un número fijo $nt de threads (posiblemente igual o del orden
del número de procesadores en el sistema). El comportamiento de la subrutina pipe
es equivalente al presentado en la sección anterior pero el número
de threads utilizado es fijo y no depende del tamaño del problema.
El programa principal es similar al de la sección anterior.
La diferencia está en la introducción del parámetro
especificando el número $numthreads que es ahora
diferente del número de etapas:
lhp@nereida:~/Lperl/src/threads$ cat -n pipeknap.pl 1 #!/usr/bin/perl -w 2 # Virtualización de un pipe. 3 # Se supone que el número de procesos virtuales requeridos 4 # es conocido de antemano. 5 6 use strict; 7 use threads; 8 use threads::shared; 9 use Thread::Queue; 10 use Carp; 11 use IO::File; 12 13 ### main 14 my $numthreads = shift || 2; # número de threads "físicas" 15 croak "Usage:\n$0 num_threads inputfile\n" unless @ARGV; 16 my ($M, $w, $p) = ReadKnap($ARGV[0]); 17 my $N = @$w; 18 19 &pipe($numthreads, $N, \&knap, $M, $w, $p);
Ahora el bucle de creación de threads crea $nt threads.
Estas threads ejecutan una subrutina que envuelve la tarea $task
pasada como parámetro.
Las subrutina-envoltorio ejecuta un bucle (líneas 43-45)
que asigna a la thread $id
el cálculo de las etapas $id, $id+$nt, $id+2*$nt, etc.
21 ### subrutinas de ayuda
22
23 sub pipe { # crea el anillo que virtualiza el pipe de tamaño N
24 my $nt = shift(); # número de threads
25 my $N = shift(); # número de procesadores virtuales
26 my $task = shift(); # subrutina a ejecutar
27 my @channel : shared; # array de colas
28 my @t; # array de tareas
29 # El resto de argumentos son argumentos de $task
30
31 my $id; # identificador de procesador físico
32
33 # creamos canales ...
34 for($id=0; $id < $nt; $id++) {
35 $channel[$id] = new Thread::Queue; # canal comunicando procesos $id y ($id+1)%$nt
36 }
37
38 # creamos threads ...
39 for($id=0; $id < $nt; $id++) {
40 my $wrap = sub { # clausura que envuelve a la función de usuario $task
41 my $i; # identificador de proceso virtual
42
43 for($i = $id; $i < $N; $i += $nt) {
44 $task->($i, $channel[($id+$nt-1)%$nt], $channel[$id], @_);
45 }
46 }; # end wrap
47
48 $t[$id] = threads->new($wrap, @_);
49 }
50 # La thread 0 no trabaja, sólo espera ...
51 for($id=0; $id < $nt; $id++) {
52 $t[$id]->join(); # join debe ser ejecutado por el padre
53 }
54 }
Las restantes subrutinas son iguales que en la sección anterior.
$N no es multiplo de $nt?
