Wheels (part 1)

The POE based server we just completed is still a bunch of work. What's worse, most of the work never changes from one server to the next. Listening to a server socket and accepting connections from it is a well-established science. Likewise, performing buffered operations on non-blocking sockets is largely the same everywhere. Reinventing these wheels for every server gets old very fast.

We created a group of classes under the POE::Wheel namespace to encapsulate these standard algorithms. Each Wheel contains some initialization code to set up event generators, and each implements the handlers for those events.

Wheels' creation and destruction are very important parts of their operation. Upon creation, they plug their handlers into the session that instantiated them. During destruction, those handlers are unplugged and the event generators are shut down. This close binding prevents one session from giving a wheel to another.

POE::Kernel does not manage wheels for you, so it's important that they be kept somewhere safe. They are most commonly stored in their sessions' heaps.

The events generated by wheels are usually at a higher level than the ones they handle internally. For example the ``input'' events emitted by POE::Wheel::ReadWrite include parsed things, not raw chunks of bytes. This is because POE::Wheel::ReadWrite parses data as well as just reading or writing it.

POE::Wheel::ListenAccept encapsulates the concept of listening on a server socket and accepting connections from it. It takes three parameters: A server socket to listen on, the name of an event to generate when a connection has been accepted, and the name of an event to generate when an error has occurred.

In this sample, a ListenAccept wheel is created to listen on a previously created server socket. When connections arrive, it emits ``event_accepted'' with the accepted client socket in ARG0. If any errors occur, it emits ``event_error'' with some information about the problem. We assume that handlers for these events have been defined and implemented elsewhere.

$_[HEAP]->{server} = POE::Wheel::ListenAccept->new
  ( Handle => $server_socket,
    AcceptEvent => "event_accepted",
    ErrorEvent  => "event_error",
  );

POE::Wheel::ReadWrite implements common algorithms necessary to perform buffered I/O on non-blocking sockets. It is a baroque beast, and we'll only discuss a few of the many parameters it accepts.

In this sample, a ReadWrite wheel will work with a previously accepted client socket. It parses input into lines by default, so every ``client_input'' event represents one line of input. The ``client_error'' events it emits represent the occasional error.

$_[HEAP]->{client} = POE::Wheel::ReadWrite->new
  ( Handle => $client_socket,
    InputEvent => "client_input",
    ErrorEvent => "client_error",
  );

That example is a little misleading. Subsequent ReadWrite wheels would clobber earlier ones, resulting in destroyed connections. The upcoming program will do it right.

Speaking of the example, here it is. Its full listing is in the file listing.evolution.listenaccept.

First we load the modules we'll need. The last line contains some nonstandard magic. Rather than importing symbols into the current package, the parameters to POE.pm are additional modules to load. The POE:: package will be prepended to them, saving a little typing.

#!/usr/bin/perl

use warnings;
use strict;

use POSIX;
use IO::Socket;
use POE qw(Wheel::ListenAccept Wheel::ReadWrite);

If that ``use POE'' line is too weird, it's perfectly acceptable to replace it with the following four lines.

use POE::Kernel;
use POE::Session;
use POE::Wheel::ListenAccept;
use POE::Wheel::ReadWrite;

Next we have the server's main loop. Again, we start the server session, run everything, and exit when things are done. It's nearly identical to the previous example, but some minor changes have been made in the event and handler names.

POE::Session->create
  ( inline_states =>
      { _start => \&server_start,
        server_accepted => \&server_accepted,
        server_error    => \&server_error,
        client_input    => \&client_input,
        client_error    => \&client_error,
      }
  );

POE::Kernel->run();
exit;

Now we handle the server's _start event by creating the server socket and starting a ListenAccept wheel to manage it. As before, we don't keep a copy of the server socket, but we do need to hold onto the ListenAccept wheel. Otherwise the wheel would destruct when it falls off the end of the function, and our server would be very short-lived.

sub server_start {
    my $server = IO::Socket::INET->new
      ( LocalPort => 12345,
        Listen => 10,
        Reuse  => "yes",
      ) or die "can't make server socket: $@\n";

    $_[HEAP]->{server} = POE::Wheel::ListenAccept->new
      ( Handle => $server,
        AcceptEvent => "server_accepted",
        ErrorEvent  => "server_error",
      );
}

ListenAccept will emit a ``server_accepted'' event for every connection it accepts. Each of these events contains a newly accepted client socket in ARG0. The next function, server_accepted(), wraps each socket in a POE::Wheel::ReadWrite instance.

sub server_accepted {
    my $client_socket = $_[ARG0];

    my $wheel = POE::Wheel::ReadWrite->new
      ( Handle => $client_socket,
        InputEvent => "client_input",
        ErrorEvent => "client_error",
      );
    $_[HEAP]->{client}->{ $wheel->ID() } = $wheel;
}

As we alluded to before, server_accepted() takes advantage of every wheel's unique ID to keep them from clobbering each other. Otherwise each new connection would destroy the wheel belonging to the previous one.

Next we handle ReadWrite's input events with client_input(). By default, ReadWrite parses input into lines and emits an input event for each one. Those events include two arguments apiece: the line parsed from the input, and the ID of the wheel that parsed it.

The client_input handler uses the wheel ID to match the input back to its wheel. Once the proper wheel has been established, its put() method is called to buffer the input for writing back to the client. The ReadWrite wheel handles all the buffering and flushing for us.

sub client_input {
    my ( $heap, $input, $wheel_id ) = @_[ HEAP, ARG0, ARG1 ];
    $heap->{client}->{$wheel_id}->put($input);
}

Finally we handle client and server errors with client_error() and server_error(), respectively. We simply delete the corresponding wheel. This destroys any buffers associated with the wheel, then shuts down the appropriate socket.

sub client_error {
    my ( $heap, $wheel_id ) = @_[ HEAP, ARG3 ];
    delete $heap->{client}->{$wheel_id};
}

sub server_error {
    delete $_[HEAP]->{server};
}

There are a couple important points to note, though.

If we had kept a copy of any of these sockets, they would not have closed when their wheels were let go. The extra references we held would have kept them active, and we would have been responsible for destroying them ourselves.

If server_error() ever occurs, possibly because we've run out of file handles to create sockets with, the server socket will shut down but existing client connections will continue. In applications where the clients should also shut down, we would just delete $_[HEAP]-$ >$ {client} as well.

Full listing:

 1	#!/usr/bin/perl
 2	use warnings;
 3	use strict;
 4	use IO::Socket;
 5	use POE qw(Wheel::ListenAccept Wheel::ReadWrite);
 6	### Start the server session.  Map events to the functions that will
 7	### handle them.  Run all the sessions until they stop, and then exit.
 8	POE::Session->create(
 9	  inline_states => {
10	    _start          => \&server_start,
11	    server_accepted => \&server_accepted,
12	    server_error    => \&server_error,
13	    client_input    => \&client_input,
14	    client_error    => \&client_error,
15	  }
16	);
17	POE::Kernel->run();
18	exit;
19	### Initialize the newly created server.  Create the server socket,
20	### and then create the wheel to listen on it and accept connections.
21	sub server_start {
22	  my $server = IO::Socket::INET->new(
23	    LocalPort => 12345,
24	    Listen    => 10,
25	    Reuse     => "yes",
26	  ) or die "can't make server socket: $@\n";
27	  $_[HEAP]->{server} = POE::Wheel::ListenAccept->new(
28	    Handle      => $server,
29	    AcceptEvent => "server_accepted",
30	    ErrorEvent  => "server_error",
31	  );
32	}
33	### Handle new connections from the ListenAccept wheel.  Create
34	### ReadWrite wheels to interact with them.  Store them by each
35	### wheel's unique ID so they don't clobber each other.
36	sub server_accepted {
37	  my $client_socket = $_[ARG0];
38	  my $wheel         = POE::Wheel::ReadWrite->new(
39	    Handle     => $client_socket,
40	    InputEvent => "client_input",
41	    ErrorEvent => "client_error",
42	  );
43	  $_[HEAP]->{client}->{$wheel->ID()} = $wheel;
44	}
45	### Handle input from a ReadWrite wheel.  Echo it back to the client.
46	### Each wheel event comes with the wheel's ID, so we can match the
47	### input back to the wheel for resending.
48	sub client_input {
49	  my ($heap, $input, $wheel_id) = @_[HEAP, ARG0, ARG1];
50	  $heap->{client}->{$wheel_id}->put($input);
51	}
52	### Handle client errors.  Delete the ReadWrite wheel associated with
53	### the client.
54	sub client_error {
55	  my ($heap, $wheel_id) = @_[HEAP, ARG3];
56	  delete $heap->{client}->{$wheel_id};
57	}
58	### Handle server socket errors.  Delete the ListenAccept wheel,
59	### shutting down the server.
60	sub server_error {
61	  delete $_[HEAP]->{server};
62	}

Casiano Rodríguez León
Licencia de Creative Commons
Programación Distribuida y Mejora del Rendimiento
por Casiano Rodríguez León is licensed under a Creative Commons Reconocimiento 3.0 Unported License.

Permissions beyond the scope of this license may be available at http://campusvirtual.ull.es/ocw/course/view.php?id=44.
2012-06-19