En la sección anterior vimos un pipeline con $N
etapas
que resuelve el problema de la mochila 0-1. Se arrancan $N
threads cada una de las cuales calcula una columna de la
tabla de programación dinámica
.
Si tenemos un sistema multiprocesador es posible confiar que el sistema operativo asigne las threads a los procesadores de forma adecuada y existe alguna mejora en el rendimiento.
Sin embargo, tal esperanza es infundada en la mayoría de las aplicaciones y plataformas. Si repasamos el algoritmo anterior veremos que el tiempo invertido en cómputo entre dos comunicaciones es pequeño:
62 $left[$c] = $from_left->dequeue(); 63 if ($c >= $w[$k]) { 64 my $y = $left[$c-$w[$k]]+$p[$k]; 65 $f[$c] = ($left[$c] < $y)? $y : $left[$c]; 66 } 67 else { 68 $f[$c] = $left[$c]; 69 } 70 $to_right->enqueue($f[$c]);Después de recibir el valor en la línea 62 se realizan los (breves) computos de las líneas 64-65 y se procede a un envío. Sólo podemos tener expectativas de mejorar el rendimiento si el tiempo invertido entre las dos comunicaciones es mayor que el tiempo de cómputo entre ellas. Que no es el caso.
Otro problema es que, en la práctica, el número de procesadores
disponibles en una máquina es mucho menor que el número de threads.
Mantener un gran número de threads supone una sobrecarga para el sistema
mayor que mantener un número pequeño.
El siguiente código permite la creación de un pipeline
con un número arbitrario $N
de etapas que son asignadas
a un número fijo $nt
de threads (posiblemente igual o del orden
del número de procesadores en el sistema). El comportamiento de la subrutina pipe
es equivalente al presentado en la sección anterior pero el número
de threads utilizado es fijo y no depende del tamaño del problema.
El programa principal es similar al de la sección anterior.
La diferencia está en la introducción del parámetro
especificando el número $numthreads
que es ahora
diferente del número de etapas:
lhp@nereida:~/Lperl/src/threads$ cat -n pipeknap.pl 1 #!/usr/bin/perl -w 2 # Virtualización de un pipe. 3 # Se supone que el número de procesos virtuales requeridos 4 # es conocido de antemano. 5 6 use strict; 7 use threads; 8 use threads::shared; 9 use Thread::Queue; 10 use Carp; 11 use IO::File; 12 13 ### main 14 my $numthreads = shift || 2; # número de threads "físicas" 15 croak "Usage:\n$0 num_threads inputfile\n" unless @ARGV; 16 my ($M, $w, $p) = ReadKnap($ARGV[0]); 17 my $N = @$w; 18 19 &pipe($numthreads, $N, \&knap, $M, $w, $p);
Ahora el bucle de creación de threads crea $nt
threads.
Estas threads ejecutan una subrutina que envuelve la tarea $task
pasada como parámetro.
Las subrutina-envoltorio ejecuta un bucle (líneas 43-45)
que asigna a la thread $id
el cálculo de las etapas $id
, $id+$nt
, $id+2*$nt
, etc.
21 ### subrutinas de ayuda 22 23 sub pipe { # crea el anillo que virtualiza el pipe de tamaño N 24 my $nt = shift(); # número de threads 25 my $N = shift(); # número de procesadores virtuales 26 my $task = shift(); # subrutina a ejecutar 27 my @channel : shared; # array de colas 28 my @t; # array de tareas 29 # El resto de argumentos son argumentos de $task 30 31 my $id; # identificador de procesador físico 32 33 # creamos canales ... 34 for($id=0; $id < $nt; $id++) { 35 $channel[$id] = new Thread::Queue; # canal comunicando procesos $id y ($id+1)%$nt 36 } 37 38 # creamos threads ... 39 for($id=0; $id < $nt; $id++) { 40 my $wrap = sub { # clausura que envuelve a la función de usuario $task 41 my $i; # identificador de proceso virtual 42 43 for($i = $id; $i < $N; $i += $nt) { 44 $task->($i, $channel[($id+$nt-1)%$nt], $channel[$id], @_); 45 } 46 }; # end wrap 47 48 $t[$id] = threads->new($wrap, @_); 49 } 50 # La thread 0 no trabaja, sólo espera ... 51 for($id=0; $id < $nt; $id++) { 52 $t[$id]->join(); # join debe ser ejecutado por el padre 53 } 54 }
Las restantes subrutinas son iguales que en la sección anterior.
$N
no es multiplo de $nt
?