Mapping our server to POE.

Now we'll translate the IO::Select server into one using POE. We'll use some of POE's lowest-level features. We won't save much effort this way, but the new program will retain a lot of the structure of the last.

Believe it or not, the IO::Select server is already driven by events. It contains a main loop that detects events and dispatches them, and it has a series of functions that handle those events.

To begin with, we'll throw together an empty skeleton of a POE program. Many of the IO::Select program's pieces will be draped on it shortly.

#!/usr/bin/perl

use warnings;
use strict;

use POSIX;
use IO::Socket;
use POE;

POE::Session->create
  ( inline_states =>
      {
      }
  );

POE::Kernel->run();
exit;

Before we can continue, we need to decide what the significant events are in the program. This will flesh out the program's overall structure.

Once we know what we'll be doing, we can finish off the POE::Session constructor. We create names for the events and define the functions that will handle them. Here's the filled in Session constructor.

POE::Session->create
  ( inline_states =>
      { _start => \&server_start,
        event_accept => \&server_accept,
        event_read   => \&client_read,
        event_write  => \&client_write,
        event_error  => \&client_error,
      }
  );

Now it's time to start porting the IO::Select code over. We still need to track the input and output buffers for client connections, but we won't use %ready hash here. The structures can remain global because they're keyed on socket handles, and those never collide.

my %inbuffer  = ();
my %outbuffer = ();

Next we bring over large chunks of the IO::Select. Each is triggered by one of the events we've specified, so each will migrate into one of their handlers.

First the remaining initialization code goes into _start. The _start handler creates the server socket and allocates its event generator with select_read(). POE::Kernel's select_read() method takes two parameters: a socket handle to watch and an event to dispatch when the handle is ready for reading.

sub server_start {
    my $server = IO::Socket::INET->new
      ( LocalPort => 12345,
        Listen => 10,
        Reuse  => "yes",
      ) or die "can't make server socket: $@\n";

    $_[KERNEL]->select_read( $server, "event_accept" );
}

Notice that we don't save the server socket. POE::Kernel keeps track of it for us and will pass it back as an argument to event_accept. We only need a copy of the socket if we want to do something special.

Looking back to the POE::Session constructor, the event_accept event is handled by server_accept(). This handler will accept the new client socket and allocate a watcher for it.

sub server_accept {
    my ( $kernel, $server ) = @_[ KERNEL, ARG0 ];

    my $new_client = $server->accept();
    $kernel->select_read( $new_client, "event_read" );
}

Next we handle input from the client in client_read(). It is called when an input event from select_read() is dispatched to the session. That first (0th) argument of that event is the handle that's become ready, so we can read from the socket without keeping a copy of it all the time.

The new client_read() is mostly the same as handle_read() from the IO::Select server. The accept() code has moved to another handler, and we don't bother with %ready anymore.

Errors are passed to event_error's handler via POE::Kernel's yield() method. The yield() method posts events just the way we want them, so it's up to client_read() to pass the client socket itself. The socket is included with event_error as its first argument, $_[ARG0].

Finally, if any output is buffered at the end of this handler, we make sure the client socket is watched for writability. The event_write handler will be called when the client socket can be written to.

sub client_read {
    my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];

    my $data = "";
    my $rv   = $client->recv( $data, POSIX::BUFSIZ, 0 );

    unless ( defined($rv) and length($data) ) {
        $kernel->yield( event_error => $client );
        return;
    }

    $inbuffer{$client} .= $data;
    while ( $inbuffer{$client} =~ s/(.*\n)// ) {
        $outbuffer{$client} .= $1;
    }

    if ( exists $outbuffer{$client} ) {
        $kernel->select_write( $client, "event_write" );
    }
}

Next we define what happens when client sockets can be written to. Again, the first argument for this event is the socket that can be worked with.

If the client's output buffer is empty, we stop watching it for writability and return immediately. Otherwise we try to write the entire buffer to the socket. Whatever isn't written remains in the buffer for the next time. If it was all written, though, we destroy the buffer entirely.

The client_write() function handles errors similar to the way client_read() does.

sub client_write {
    my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];

    unless ( exists $outbuffer{$client} ) {
        $kernel->select_write($client);
        return;
    }

    my $rv = $client->send( $outbuffer{$client}, 0 );
    unless ( defined $rv ) {
        warn "I was told I could write, but I can't.\n";
        return;
    }

    if ( $rv == length( $outbuffer{$client} ) or
        $! == POSIX::EWOULDBLOCK
      ) {
        substr( $outbuffer{$client}, 0, $rv ) = "";
        delete $outbuffer{$client} unless length $outbuffer{$client};
        return;
    }

    $kernel->yield( event_error => $client );
}

Finally we handle any errors that occurred along the way. We remove the client socket's input and output buffers, turn off all select-like events, and make sure the socket is closed. This effectively destroys the client's connection.

sub client_error {
    my ( $kernel, $client ) = @_[ KERNEL, ARG0 ];

    delete $inbuffer{$client};
    delete $outbuffer{$client};

    $kernel->select($client);
    close $client;
}

And it's done.

Casiano Rodríguez León
Licencia de Creative Commons
Programación Distribuida y Mejora del Rendimiento
por Casiano Rodríguez León is licensed under a Creative Commons Reconocimiento 3.0 Unported License.

Permissions beyond the scope of this license may be available at http://campusvirtual.ull.es/ocw/course/view.php?id=44.
2012-06-19