#!/usr/bin/perl
##################################################
package ForkedUA;

# Mike Schilli <m@perlmeister.com>, 2001
##################################################

use warnings;
use strict;

use IO::Select;
use Tie::RefHash;
use Storable qw(freeze thaw);
use LWP::UserAgent;

my $DEBUG        = 0;

##################################################
sub new {
##################################################
    my ($class) = shift;

    my $self = { 
        ua        => LWP::UserAgent->new(),
        reqs      => [],
        status    => [],
        debug     => 0,
        responses => [],
        processes => 10,
        @_ };

    $DEBUG = $self->{debug};
    return bless($self, $class);
}

##################################################
sub debug { print(@_, "\n") if $DEBUG; }
sub ua    { return shift->{ua} }
##################################################

##################################################
sub register {
##################################################
    my($self, @requests) = @_;
    push(@{$self->{reqs}}, @requests);
}
 
##################################################
sub process {
##################################################
    my $self = shift;

    tie(my %handle_to_index, 'Tie::RefHash');

    my $watcher = IO::Select->new();

        # Alle Kinder hochfahren und Kommuni-
        # kationskanäle aufbauen
    foreach my $i (0..$self->{processes}-1) {
        my($down, $up, $pid) = $self->mk_child();

            # Auf die Überwachungsliste
        $watcher->add($up);

            # Zuordnung Handle => Kind-Index
        $handle_to_index{$up} = $i;

            # Status dieses Kindes
        $self->{status}->[$i] = {
            down     => $down,
            up       => $up,
            busy     => 0,
            pid      => $pid 
        };
    }

    my $busy  = 0;
    my $reqid = 0;

    while(@{$self->{reqs}} or $busy) {
        $busy = 0;

            # Wartende Kinder mit Arbeit versorgen
        for my $i (0..$#{$self->{status}}) {
            if($self->{status}->[$i]->{busy}) {
                    # Kind arbeitet schon
                $busy++;
            } else {
                next unless @{$self->{reqs}};
                    # Es gibt noch Arbeit
                my $req = shift @{$self->{reqs}};
                    # Arbeitsauftrag versenden
                pipe_send(
                    $self->{status}->[$i]->{down}, 
                    freeze($req));

                $self->{status}->[$i]->{busy} = 1;
                    # Index dieses Requests merken
                $self->{status}->[$i]->{reqid} = 
                                         $reqid++;
                $busy++;
            }
        }

            # Kinder-Ergebnisse einsammeln
        for my $up ($watcher->can_read()) {
            my $resp = thaw(pipe_recv($up));

                # Antwort => Ergebnisarray
            my $i = $handle_to_index{$up};
            my $rid = 
                  $self->{status}->[$i]->{reqid};
            $self->{responses}->[$rid] = $resp;

            $self->{status}->[$i]->{busy} = 0;
            $busy--;
        }
    }

        # Alle Kinder runterfahren
    foreach my $i (0..$self->{processes}-1) {
        pipe_send($self->{status}->[$i]->{down}, 
                  "");
            # Auf Kind warten
        waitpid($self->{status}->[$i]->{pid}, 0);
    }

    return (@{$self->{responses}});
}

##################################################
sub pipe_send {
##################################################
    my($fh, $message) = @_;

    my $bytes = sprintf "0x%08x", length($message);
    syswrite($fh, $bytes . $message);
}

##################################################
sub pipe_recv {
##################################################
    my($fh) = @_;

    die "Protocol corrupted" if 
        sysread($fh, my $bytes, 10) != 10;
    $bytes = hex($bytes);

    my $data = "";
    while($bytes != 0) {
        my $read = sysread($fh, my $chunk, $bytes);
        last unless defined $read;
        $bytes -= $read; $data .= $chunk;
    }
    return $data;
}

##################################################
sub mk_child {
##################################################
    my $self = shift;

    pipe my $down_read, my $down_write or 
        die "Cannot open Child-Parent pipe: $!";
    pipe my $up_read, my $up_write or 
        die "Cannot open Parent-Child pipe: $!";

    defined(my $pid = fork) or die "Can't fork\n";

    if($pid) {    # Vater => Kindseitige Kanäle
                  # schließen und zurückkehren
        close($down_read);
        close($up_write);
        return($down_write, $up_read, $pid);
    }

        # Hierher kommt nur das Kind
    close($down_write);
    close($up_read);

        # Endlose Arbeitsschleife für das Kind
    {   my $data = pipe_recv($down_read);

            # Kommando zum Beenden erhalten?
        if($data eq "") {
            debug "CHILD[$$] shutting down";
            close($down_read);
            close($up_write);
            exit 0;
        }

            # Auftrag eingegangen
        my $req = thaw $data;
        debug "CHILD[$$] received request for ", 
              $req->uri->as_string;

            # Auftrag ausführen
        my $resp = $self->{ua}->request($req);
        debug "CHILD[$$] completed request for ", 
              $req->uri->as_string;

            # Response-Objekt zum Vater senden
        pipe_send($up_write, freeze($resp));
            # Nächster Request
        redo;
    }
}

1;
