#!/usr/bin/perl

#
#  This is part of Cleo batch system project.
#  (C) Sergey Zhumatiy (serg@parallel.ru) 1999-2006
#
#
# You can redistribute and/or modify this program
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# See the GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#

#  Cleo monitor (works on the node)
#
#  Usage: cleo-mon [path/to/config-file]
#
#  Default config - /etc/cleo-mon.conf
#
use lib '/usr/libexec/cleo';

use strict;
use Exporter;
use Fcntl;
use IO::Handle;
use IO::Select;
use IO::Socket;
use IO::File;

#use IO::Pipe;
use POSIX;
use Sys::Syslog;

use Cleo::Conn;

use vars qw($VERSION @ISA @EXPORT);

BEGIN {
  $VERSION = 5.22;    #(b)
  die $@ if ($@);
}

sub qlog($);

#############################################
#
#  Task info incapsulation
#
# Methods:
#   new
#   printout
#   kill
#   free
#   check_pid
#   add_pid
#   del_pid
#   check_deads
#   is_head
#   set_head
#   get_pids
#   mark_dead
#   is_dead
#   set_attr
#   get_attr
#
#############################################
{

  package Task;

  # constructor
  sub new {
    my $self = {};
    $self->{pids}             = {};
    $self->{attr}->{user}     = 'nobody';
    $self->{attr}->{temp_dir} = '/tmp';
    $self->{dead}             = 0;

    bless($self);
    return $self;
  }

  # returns string with printable inetrnals
  sub printout() {
    my $self = shift;
    my $out;

    $out =
        "Task\n pids: "
      . join(', ', map {$_ / $self->{pids}->{$_}} keys(%{$self->{pids}}))
      . "\n";
    $out .= " dead: $self->{dead}\n";
    $out .=
        " attrs:\n"
      . join(";\n  ", map {$_ / $self->{attr}->{$_}} keys(%{$self->{attr}}))
      . "\n";
  }

  # check if pid belongs to this task
  #
  # arg - testing pid
  # ret: 1 if pid belongs to task/ 0 - if not
  sub check_pid {
    my $self = shift;
    return 1 if (exists($self->{pids}->{$_[0]}));
    return 0;
  }

  # adds pid to task
  sub add_pid {
    my $self = shift;
    if ($_[0] > 1 and !main::is_deprecated_pid($_[0])) {
      main::qlog "[TASK] Added pid $_[0]\n";
      $self->{pids}->{$_[0]} = 1;
    }
  }

  # kill all task processes with signal
  #
  # arg (opt): signal (def=SIG_TERM)
  sub kill {
    my $self = shift;
    my $signal;
    if ($_[0] ne '') {
      $signal = $_[0];
    } else {
      $signal = 'TERM';
    }
    main::qlog "[TASK] Kill pids $signal:"
      . join(';', keys(%{$self->{pids}}), "\n")
      if (main::get_setting('log_kills') != 0);

    kill $signal, keys(%{$self->{pids}});
  }

  # delete pid from this task
  #
  # arg: pid
  sub del_pid {
    my $self = shift;
    if ($_[0] > 1) {

      #      main::qlog "Deleted pid $_[0]\n";
      delete $self->{pids}->{$_[0]};
    }
  }

  # check if all task pids are alive
  #
  # arg: autodel (if nonzero, automatically delete dead pids)
  # ret: list of 'dead' pids
  sub check_deads {
    my $self = shift;
    my $ad   = shift;

    my @ret;
    foreach my $i (keys(%{$self->{pids}})) {
      unless (kill 0, $i) {
        push @ret, $i;
        delete $self->{pids}->{$i} if ($ad);
      }
    }
    return @ret;
  }

  # remove all information about pids
  #
  sub free {
    my $self = shift;
    $self->{pids} = {};
  }

  # check, if pid is task head pid
  #
  # arg: pid
  # ret: 1 if pid is head task pid, 0 if not
  sub is_head {
    my $self = shift;
    my $pid  = shift;

    return ($pid == $self->{head}) ? 1 : 0;
  }

  # set pid as task head pid
  #
  # arg: pid
  sub set_head {
    my $self = shift;
    my $pid  = shift;

    return if ($pid < 1);
    $self->{head} = $pid;
    $self->{pids}->{$pid} = 1;
  }

  # return list of all pids
  #
  sub get_pids {
    my $self = shift;

    #    main::qlog "Get pids ".join(',',keys(%{$self->{pids}}))."\n";
    return keys(%{$self->{pids}});
  }

  # mark task as dead
  #
  sub mark_dead {
    my $self = shift;
    $self->{dead} = 1;
  }

  # check if task is dead
  #
  sub is_dead {
    my $self = shift;
    return $self->{dead};
  }

  # set attribute
  #
  sub set_attr {
    my $self = shift;
    my $attr = shift;
    my $val  = shift;

    #    main::qlog "Set attr $attr=$val\n";
    $self->{attr}->{$attr} = $val;
  }

  # get attribute
  #
  sub get_attr {
    my $self = shift;
    my $attr = shift;

    #    main::qlog "Get attr $attr=$self->{attr}->{$attr}\n";
    return $self->{attr}->{$attr};
  }

  # list available attributes
  #
  sub list_attrs {
    my $self = shift;
    return keys(%{$self->{attr}});
  }
}

{

  package XMLNODE;

  sub new(;$) {
    my $self = {};
    if ($_[0] ne '') {
      $self->{name}  = $_[0];
      $self->{index} = 0;
    }

    $self->{nodes} = [];

    bless($self);
    return $self;
  }

  sub get_name {
    my $self = shift;
    return $self->{name};
  }

  sub set_name {
    my $self = shift;
    $self->{name} = $_[0];
  }

  sub get_val {
    my $self = shift;
    return $self->{val};
  }

  sub set_val($) {
    my $self = shift;
    $self->{val} = $_[0];
  }

  sub add_node($) {
    my $self = shift;
    push @{$self->{nodes}}, $_[0];
  }

  sub to_first_node() {
    my $self = shift;
    $self->{index} = 0;

    #    return $self->{nodes}->[0];
  }

  sub next_node() {
    my $self = shift;
    if (exists($self->{index})) {
      return $self->{nodes}->[$self->{index}++];
    }
    return undef;
  }

  sub count_nodes() {
    my $self = shift;
    return scalar @{$self->{nodes}};
  }
}

use vars qw(@for_server $SRV $LST $srv_select $my_name
  $init_conn_time $init_conn_timeout $port $server $pipe_error
  %ran $all_pids %child_pids %server_recievers
  %useruid %groupid @reaped @reapcode
  @attach_collected
  $smart_port $global_rsh_command @attach_requests
  @pre_reaped1 @pre_reaped2 @pre_reapcode1 @pre_reapcode2
  $reaping $reaping2 $new_reaped $SH $shell_conn
  $rsh_from $rsh_hash $rsh_num $rsh_start_time @fake_rshells
  $reaper_child %error_codes @serv_buffer %kill_pids %kill_em
  $STATUS %delayed_kills $last_ran_check @delayed_attaches
  %global_settings %new_global_settings %def_global_settings
  %opt_types @deprec_pids @deprec_uids
  %tasks %delayed_task_kills $wrapper @wrap_hints
  $last_attach @nofilter_users_range %err_signals
  $attach_in_progress $attach_tmout);

#$attach_parent_mask $attach_exe_mask $attach_user

use vars qw(%_d_flush_nolog %_d_rcv_nolog $debug_level);      # debug purpose
use vars qw($_d_alarm_log $_debug_log_head $_debug_yahoo);    # debug purpose

use vars qw($DO_NOT_FORCE $MAX_ERR_SIG);
$DO_NOT_FORCE = 1;                                            # debug!!!
$MAX_ERR_SIG  = 3;

sub attach_handler_second_stage( $ );
sub update_childs;
sub load_config( $;$ );
sub load_state;
sub get_setting( $ );
sub pack_value( $ );
sub unpack_value( $$;$ );
sub save_state();

eval {
  local $SIG{__DIE__} = sub {return;};
  &O_LARGEFILE();
};
if ($@) {
  eval "sub O_LARGEFILE(){return 0;}";
}

sub _print_tasks {
  foreach my $i (keys(%tasks)) {
    qlog "__TASK=$i: " . join(';', $tasks{$i}->get_pids(), "\n");
  }
}

sub print_stack {
  my ($rep, $package, $filename, $line, $subroutine, $i);
  for ($i = 1; $i < 5; ++$i) {
    (undef, $filename, $line, $subroutine) = caller($i);
    $rep .= "$i [${filename}:${line} ${subroutine}]; ";
  }
  qlog "STACK: $rep\n";
}

@deprec_pids          = (1, 8);    # format: min1,max1, min2,max2, ... minN,maxN
@deprec_uids          = (0, 100);  # see above
@nofilter_users_range = (0, 100);

$debug_level = 0;

%def_global_settings = (
  path_prepend            => '',
  path_append             => '',
  attach_tmout            => 60,
  smart_port              => 8855,
  global_rsh_command      => '/usr/bin/ssh',
  mon_save                => '/var/log/cleo-mon.save',
  init_conn_timeout       => 5,
  port                    => 5588,
  hard_kill_after_head    => 15,
  suexec_gid              => 65535,
  debug_pc                => 0,
  hard_kill_delay         => 60,
  last_ran_check_interval => 3600,
  log_kills               => 1,
  dead_cleanup_time       => 600,
  filter_users            => 1,
  pids_update_interval    => 15
);

%opt_types = (

  #                  type:safe:cumulative:sections
  # types: n-umeric, t-ext, h-ash, l-ist (via space), L-st (via comma)
  # sections: ''=all, 'q'=queues, 'g'=global, 'u'=users, 'p'=profiles,
  #           'U'=clusterusers, 'l'=local_user_file
  # sections are ignored here, but must be specified :)

  path_prepend            => ['t', 'y', '', 'g'],
  path_append             => ['t', 'y', '', 'g'],
  smart_port              => ['n', 'y', '', 'g'],
  global_rsh_command      => ['t', 'y', '', 'g'],
  mon_save                => ['t', 'y', '', 'g'],
  init_conn_timeout       => ['n', 'y', '', 'g'],
  hard_kill_after_head    => ['n', 'y', '', 'g'],
  hard_kill_delay         => ['n', 'y', '', 'g'],
  suexec_gid              => ['n', 'y', '', 'g'],
  debug_pc                => ['n', 'y', '', 'g'],
  last_ran_check_interval => ['n', 'y', '', 'g'],
  log_kills               => ['n', 'y', '', 'g'],
  pids_update_interval    => ['n', 'y', '', 'g'],
  port                    => ['n', 'y', '', 'g']
);

%_d_flush_nolog = (

  #                 'init' =>1,
  'run'  => 1,
  'kill' => 1,

  #                 'stat' =>1,
  #                 'exit' =>1,
  'init_attach' => 1,
  'attach'      => 1,
  'ping'        => 1
);
%_d_rcv_nolog = (

  #                 'init' =>1,
  'run'  => 1,
  'kill' => 1,

  #                 'stat' =>1,
  #                 'exit' =>1,
  'init_attach' => 1,
  'attach'      => 1,
  'ping'        => 1
);

$_d_alarm_log = 0;

%error_codes = ();
for (my $i = 1; $i < 128; ++$i) {
  $! = $i;
  if ( ($! eq "Bad file descriptor")
    or ($! eq "File too large")
    or ($! eq "Broken pipe")
    or ($! eq "Machine is not on the network")
    or ($! eq "Communication error on send")
    or ($! eq "Protocol error")
    or ($! eq "Network is down")
    or ($! eq "Network is unreachable")
    or ($! eq "Network dropped connection on reset")
    or ($! eq "Software caused connection abort")
    or ($! eq "Connection reset by peer")
    or ($! eq "Cannot send after transport endpoint shutdown")
    or ($! eq "Connection timed out")) {
    $error_codes{$i} = 1;
  }
}

sub do_syslog( $ ) {
  openlog "cleo-mon", 'pid', 'daemon';
  syslog 'info', $_[0];
  closelog;
}

sub qlog( $ ) {
  my $t = localtime(time);
  printf $STATUS "\[%s\] %-8s: %s", $t, $my_name, $_[0];
  $STATUS->flush();

  #  do_syslog( $_[0] );
}

#
#
#  Reopen log files
#
sub hup_processor() {
  my ($io1, $io2, $io3, $log_file);
  eval {$STATUS->close();};
  eval {

    # trick to avoid reopen STDIN/OUT/ERR
    $io1 = IO::File->new();
    $io1->open('/dev/null');
    $io2 = IO::File->new();
    $io2->open('/dev/null');
    $io3 = IO::File->new();
    $io3->open('/dev/null');
  };

  $log_file = get_setting('log');
  $log_file |= "/var/log/cleo-mon.log";

  $STATUS = IO::File->new();
  unless ($STATUS->open($log_file, O_LARGEFILE | O_WRONLY | O_APPEND | O_CREAT))
  {
    do_syslog("Cannot open '$log_file' ($!). Try /tmp");
    unless (
      $STATUS->open(
        "/tmp/cleo-mon.log", O_LARGEFILE | O_WRONLY | O_APPEND | O_CREAT
      )
      ) {
      do_syslog("Cannot open /tmp/cleo-mon.log ($!). Try /dev/null");
      $STATUS->open("/dev/null", O_WRONLY);
    }
  }
  $STATUS->autoflush(1);

  eval {
    $io1->close();
    $io2->close();
    $io3->close();
  };
}

#
#
#  Dump all internal data
#
sub usr1_processor() {
  my $io = IO::File->new();
  eval {
    my $i;
    $io->open('>/tmp/cleo-agent.dump');
    $io->print("Version: $VERSION\n");

    for $i (sort(keys(%tasks))) {
      $io->print($i->printout());
    }
    $io->print("Connections:\n");
    for $i (sort(keys(%Cleo::Conn::_conn_by_h))) {
      $io->print(" $i: $Cleo::Conn::_conn_by_h{$i}\n");
    }
    $io->print("Settings:\n");
    for $i (sort(keys(%global_settings))) {
      $io->print(" $i: $global_settings{$i}\n");
    }
    $io->print("Delayed kills:\n");
    for $i (sort(keys(%delayed_task_kills))) {
      $io->print(" $i: $delayed_task_kills{$i}\n");
    }

    $io->print('SRV: '
        . $SRV->get_state() . ' ('
        . $SRV->get_peer() . '/'
        . $SRV->get_port()
        . ")\n");
    $io->print('LST: '
        . $LST->get_state() . ' ('
        . $LST->get_peer() . '/'
        . $LST->get_port()
        . ")\n");
    $io->print('attach_collected: ' . join(',', @attach_collected), "\n");
    $io->print("Last attach: $last_attach\n");
    $io->print("Attach in progress: $attach_in_progress\n");
  }
}

#
#
#  Change log level
#
sub usr2_processor() {
  ++$debug_level;
  if ($debug_level > 3) {
    $debug_level = 0;
  }
  qlog "Debug level=$debug_level\n";
}

sub REAPER {
  eval {
    while (($reaper_child = waitpid(-1, &WNOHANG)) > 0) {
      qlog "SIGCHLD: $reaper_child ($?)\n";
      if ($reaping2) {
        push @pre_reaped1,   $reaper_child;
        push @pre_reapcode1, $?;
      } else {
        push @pre_reaped2,   $reaper_child;
        push @pre_reapcode2, $?;
      }
    }
    qlog "Signal processing done\n";
    if ($reaping) {
      qlog "Reaping while reaper.\n";
      $new_reaped = 1;
    } else {
      push @reaped,   @pre_reaped2;
      push @reapcode, @pre_reapcode2;
      @pre_reaped2   = ();
      @pre_reapcode2 = ();
    }
    qlog "1st reaper done\n";
    $SIG{CHLD} = \&REAPER;    # still loathe sysV
  };
  if ($@) {
    qlog "Reaper exeption: $@\n";
  }
}    # REAPER

# check if some processes are dead, but did not send sigchild
sub ran_check() {

  foreach my $i (values(%tasks)) {
    my @deads = $i->check_deads(1);
    foreach my $j (@deads) {

      # child is dead...
      $reaping = 1;
      push @reaped,   $j;
      push @reapcode, 255;
      $reaping = 0;
    }
  }
}

#
#  Do actual reaping tasks
#
#
sub do_reap {
  my ($child, $tmp, $code, $i, $count);

  eval {
    $reaping = 1;
    if (@reaped > 0) {
      update_pids();
      update_childs();
    }

  MAIN_REAPER:
    while ($child = shift @reaped) {
      $code = shift @reapcode;
      qlog "Reaping: pid=$child, code=$code\n";
      foreach $tmp (keys(%tasks)) {

        # is dead pid one of my tasks?
        if ($tasks{$tmp}->check_pid($child)) {

          $tasks{$tmp}->del_pid($child);
          next MAIN_REAPER if $tasks{$tmp}->is_dead;

          # is it task head or last pid?
          $count = scalar($tasks{$tmp}->get_pids());
          if ($tasks{$tmp}->is_head($child) or $count < 2) {

            qlog 'Task head died ('
              . $tasks{$tmp}->get_attr('id') . '/'
              . $tasks{$tmp}->get_attr('owner') . '/'
              . $tasks{$tmp}->get_attr('user') . ' '
              . join(',', sort($tasks{$tmp}->get_pids())) . ")\n";

            # probably do not force kill other task pids
            next MAIN_REAPER if $DO_NOT_FORCE and $count > 1;

            # do not reap attaching task...
            next if $attach_in_progress == $tmp;

            qlog "Soft kill $tmp\n";
            $tasks{$tmp}->kill('TERM');
            $delayed_task_kills{$tmp} =
              time + $tasks{$tmp}->get_attr('hard_kill_after_head');
            $tasks{$tmp}->mark_dead;
            $tasks{$tmp}
              ->set_attr('ttl', time + get_setting('dead_cleanup_time'));
          }
          next MAIN_REAPER;
        }
      }

      # pid not belongs to any task
      qlog "Dead pid $child not belongs to any task. Skip.\n";
    }

    if ($new_reaped) {
      qlog "New reaped\n";
      push @reaped,   @pre_reaped1;
      push @reapcode, @pre_reapcode1;
      @pre_reaped1   = ();
      @pre_reapcode1 = ();
      $reaping2      = 1;
      push @reaped,   @pre_reaped2;
      push @reapcode, @pre_reapcode2;
      @pre_reaped1   = ();
      @pre_reapcode1 = ();
      $reaping2      = 0;
      $new_reaped    = 0;
    }
    $reaping = 0;
  };

  if ($@) {
    qlog "Reaper2 exeption: $@\n";
  }
}    # do_reap

#
# Launch the program after waiting the time interval.
# Returns immediately.
#
# Args:
#      - time to wait
#      - command line
#      - uniq id
#      - [opt] uid
#      - [opt] gid
#
#
#######################################################################
sub launch( $$$;$$ ) {

  # Time_interval, command_line, uniq_id

  my ($time, $prog, $id, $uid, $gid) = @_;
  my ($i, $p);

  $p = fork();
  return unless defined $p;    # FAIL 8(
  return if ($p < 0);          # FAIL 8(
  if ($p) {                    # successful launch
    return;
  }

  # Created process

  $0 = "CLEO LAUNCH";
  $SIG{PIPE} = sub {$pipe_error = 1;};
  $SIG{CHLD} = 'Ignore';
  $SIG{USR1} = 'Ignore';
  $SIG{USR2} = 'Ignore';
  $SIG{HUP}  = 'Ignore';
  $SIG{ABRT} = 'Ignore';
  $SIG{TERM} = 'Ignore';
  $SIG{QUIT} = 'Ignore';
  $SIG{BUS}  = 'Ignore';
  $SIG{SEGV} = 'Ignore';
  $SIG{FPE}  = 'Ignore';
  $SIG{INT}  = 'Ignore';
  $SIG{ILL}  = 'Ignore';

  # try to create child process (daemonize)
  for ($i = 0; $i < 10; ++$i) {
    select(undef, undef, undef, 0.1);
    $p = fork();
    next unless defined $p;
    next if $p < 0;
    last;
  }
  exit 0 unless defined $p;
  exit 0 if ($p != 0);

  if (1) {    # || POSIX::setsid()!=-1) {
    unlink "/tmp/q-launch.$id";    # delete possible symlink
    open X, ">/tmp/q-launch.$id" or exit(1);    # create 'lock-file'
    close X;
    for (; $time > 0; --$time) {
      sleep 1;
      exit(0)
        unless -f "/tmp/q-launch.$id";

      # exit, if launch is not nessesary
    }
    unlink "/tmp/q-launch.$id";                 # delete 'lock-file'
    qlog "LAUNCHING($p) '$prog'\n";

    #    eval { close $LST; };
    #    eval { close $To_server; };
    #    eval { close $From_server; };
    #    eval { close $SH; };
    $SRV->disconnect;
    eval {close STATUS;};
    $ENV{PATH} = '/bin:/usr/bin:/usr/local/bin';

    if ($gid) {
      $( = $) = $gid;
    }
    if ($uid) {
      $< = $> = $uid;
    }
    setpriority(0, $$, 0);
    exec($prog);    # THE CULMINATION!
  }
  exit 1;
}    # launch

#
#   Makes this program a daemon
#
# arg: none
# ret - none
#
#####################################################################
sub daemonize() {
  my ($pid, $i);

  for ($i = 0; $i < 10; ++$i) {
    $pid = fork();
    if (defined $pid) {
      last if $pid >= 0;
    }
  }
  unless (defined $pid) {
    qlog "Cannot daemonize! So die...\n";
    exit(1);
  }
  unless ($pid >= 0) {
    qlog "Cannot daemonize! So die...\n";
    exit(1);
  }
  exit(0) if $pid > 0;
  if (POSIX::setsid() != -1) {
    return;
  }
  return;
  qlog "CANNOT DAEMONIZE!\n";
  exit(1);
}    # daemonize

sub is_deprecated_pid( $ ) {
  my $i = 0;
  my ($min, $max);
  while ($i <= $#deprec_pids) {
    $min = $deprec_pids[$i];
    $max = $deprec_pids[$i + 1];
    return 1 if (($_[0] >= $min) and ($_[0] <= $max));
    $i += 2;
  }
  $i = 0;
  while ($i <= $#deprec_uids) {
    $min = $deprec_uids[$i];
    $max = $deprec_uids[$i + 1];
    if (exists($all_pids->{$_[0]})) {
      return 1
        if (($all_pids->{$_[0]}->{uid} >= $min)
        and ($all_pids->{$_[0]}->{uid} <= $max));
    }
    $i += 2;
  }
  return 0;
}

#
#  ret 1: is unwanted
#      0: wanted
#
#
sub is_unwanted_pid( $ ) {
  my $i = 0;
  my ($min, $max);

  # check whitelisted uids
  while ($i <= $#nofilter_users_range) {
    $min = $nofilter_users_range[$i];
    $max = $nofilter_users_range[$i + 1];
    return 0 if (($_[0] >= $min) and ($_[0] <= $max));
    $i += 2;
  }
  return 1;
}

#
#   Kill pid (send TERM, then KILL)
#
#   With argument 'all' kills all controlled pids (used for shutdown)
#
#####################################################################
sub kill_pid( $ ) {
  my $pid = $_[0];

  if ($pid eq 'all') {

    # kill all tasks
    foreach my $i (keys(%tasks)) {
      $tasks{$i}->kill;
      $delayed_task_kills{$i} =
        time + $tasks{$i}->get_attr('hard_kill_after_head');
      $tasks{$i}->mark_dead;
      $tasks{$i}->set_attr('ttl', time + get_setting('dead_cleanup_time'));
    }
  } else {
    kill 'TERM', $pid unless (is_deprecated_pid($pid));
    $delayed_kills{$pid} = time + get_setting('hard_kill_delay');
  }
}    # kill_pid

#
#   Returns a pid of task by owner and id
#
# arg: owner  - owner queue
#      id     - id of task in owner queue
# ret - list of tasks pids
#     - () if no task found
#
#####################################################################
#sub get_pids( $$ ) {
#  my ( $owner, $id ) = @_;
#
#  qlog "get_pids: $owner,$id\n";
#  if(exists($tasks{"$owner:$id"})){
#    return $tasks{"$owner:$id"}->get_pids();
#  }
#  return ();
#}                               # get_pids

{
  #####################################################################
  #
  # Gets the block from channel (ends with 'end\n')
  #
  # arg: f - the Cleo::Conn
  #      s - if nonzero - lines must ends with "__end\n"
  # ret - pointer to list of lines without 'end\n' as last line...
  #     - empty list if nothing were readed...
  #####################################################################
  sub get_block($;$ ) {
    my ($h, $statmode) = @_;
    my ($tmp, $tmpchar, $err);

    $tmp = $h->read;

    if (!defined $tmp) {

      #error
      qlog("Channel is dead.[$!]\n");
      return undef;
    }

    $tmp =~ s{^(.*?end\n)}{}s;

    # got full message?
    if ($1 ne '') {

      # another message follows. save it.
      if ($tmp ne '') {
        $h->unread($tmp);
      }
      return split(/\n/, $1);
    }

    # not full message yet.
    $h->unread($tmp) if ($tmp ne '');
    return ();
  }

};

#####################################################################
#
# 'Send' answer to server (actually only queue it, see flust_to_server)
#
# args: to
#       hash
#       type
#       id
#       [parameters] - like 'param1',$p1,'param2',$abc ...
#
#####################################################################
sub answer_to_server($$$$;@ ) {

  #
  my ($to, $h, $type, $id, %params) = @_;
  my ($e, $k, $v);

  # are we connected?
  return unless defined $SRV;

  if ($to eq '') {
    qlog "EMPTY TO!\n";
    print_stack;
    return;
  }

  #  $e = {
  #        'to'      => $to,
  #        'type'    => $type,
  #        'id'      => $id,
  #        'hash'    => $h,
  #        'status'  => 'done',    # by default
  #        'success' => 1          # by default
  #       };
  if (!defined($params{'success'})) {
    $e = "[1]";
  } else {
    $e = $params{'success'};
  }
  qlog ">> to=$to, type=$type, id=$id, hash=$h, success=$e\n"
    unless ($_d_flush_nolog{$type});

  $SRV->send("\*$my_name:$to:$h\n$type\n");
  $e = pack_value($id);
  $SRV->send("id:$e\n");

  # default values...
  if (!defined($params{'status'})) {
    $e = pack_value('done');
    $SRV->send("status:$e\n");
  }
  if (!defined($params{'success'})) {
    $e = pack_value(1);
    $SRV->send("success:$e\n");
  }
  while (($k, $v) = each(%params)) {
    $e = pack_value($v);
    qlog "Packed ($k) as '$e'\n" if get_setting('debug_pc');
    qlog "SENDING: $k: $e\n" unless ($_d_flush_nolog{$type});
    $SRV->send("$k:$e\n");
  }

  $SRV->send("end\n");
  $SRV->flush;
}    # answer_to_server

#####################################################################
#
# Actually send all messages to server
#
# args: NONE
#
#####################################################################
#sub flush_to_server() {
#  my ( $to, $type, $hash, $i, $n, $cur, $k, $v, $e );

#  return unless defined $SRV;

#  for $cur (@for_server) {
#    ( $to, $type, $hash ) = ( $cur->{to}, $cur->{type}, $cur->{hash} );

#    qlog "_SENDING to master($to) $type/$cur->{hash}/$cur->{success}\n"
#      unless ( $_d_flush_nolog{$type} );

#    delete $cur->{to};
#    delete $cur->{type};
#    delete $cur->{hash};

#    $SRV->send("\*$my_name:$to:$hash\n$type\n");
#    while ( ( $k, $v ) = each( %{$cur} ) ) {
#      $e = pack_value($v);
#      qlog "Packed ($k) as '$e'\n" if get_setting('debug_pc');
#      qlog "SENDING: $k: $e\n" unless ( $_d_flush_nolog{$type} );
#      $SRV->send("$k:$e\n");
#    }

#    $SRV->send("end\n");
#  }
#  flush_server_channel();
#}                               # flush_to_server

#sub flush_server_channel() {
#    return unless $SRV;

#    $SRV->flush;
#}

#####################################################################
#
# Register a procedure for receiving messages of given type
#
# args: type
#       procedure ( prototype is: sub handler( $$$$$$ ), where args are:
#                   hash, status, from, \%args)
#
#####################################################################
sub register_mon_rcv( $$ ) {
  my ($type, $handler) = @_;
  push @{$server_recievers{$type}}, $handler;
}    # register_mon_rcv

#####################################################################
#
# Unregister a procedure for receiving messages of given type
#
# args: type
#       procedure
#
#####################################################################
sub unregister_mon_rcv( $$ ) {
  my ($type, $handler) = @_;
  my $i;
  for ($i = 0; $i <= scalar(@{$server_recievers{$type}}); ++$i) {
    if ($server_recievers{$type}[$i] eq $handler) {
      splice(@{$server_recievers{$type}}, $i, 0);
      last;
    }
  }
}    # unregister_mon_rcv

#####################################################################
#
# Receive messages from server and dispatch them (call handlers)
#
# args: NONE
# ret:  NONE
#
#####################################################################
sub rcv_from_server() {
  my (@outs, $from, $type, $tmp, $to, $hash, $i, %args, @errors, $unpacked);

  return if (!defined $SRV or ($SRV->get_state ne 'ok'));
  for (;;) {

    # Read the message block
    %args = ();
    $hash = get_parsed_block($SRV, \%args);
    last if ($hash eq '-' or $hash eq '');

    ($from, $to, $type) = ($args{_from}, $args{_to}, $args{_type});

    delete $args{_from};
    delete $args{_to};
    delete $args{_hash};
    delete $args{_type};

    foreach $tmp (keys(%args)) {
      next if ($tmp eq 'success');
      undef $unpacked;
      unpack_value(\$unpacked, $args{$tmp});
      $args{$tmp} = $unpacked;
      qlog "Unpacking: '$tmp' ($args{$tmp}) as '$unpacked,'\n"
        if get_setting('debug_pc');
    }
    if (ref($server_recievers{$type}) eq 'ARRAY') {
      for ($i = 0; $i < scalar(@{$server_recievers{$type}}); ++$i) {
        qlog "checking $type / $i for code...\n"
          unless ($_d_rcv_nolog{$type});
        if (ref($server_recievers{$type}[$i]) eq 'CODE') {
          qlog "Yes! call it! ($type,$hash,$from)\n"
            unless ($_d_rcv_nolog{$type});
          $server_recievers{$type}[$i]->($type, $hash, $from, \%args);
        } else {
          qlog "No. Its " . ref($server_recievers{$type}[$i]) . "\n";
        }
      }
    }
  }    # messages reading loop

  #    }
}    # rcv_from_server

#####################################################################
#
# Gets the block from handle and returns a hash with arguments
#  In result hash '_from','_to','_hash','_type' are special
#
# args: Cleo::Conn
#       return_hash_ref   (\%ret)
# ret:  ''   - no more blocks
#       '-'  - an error occured
#       other- the hash of this block
#
#####################################################################
sub get_parsed_block( $$ ) {
  my ($handle, $out) = @_;

  my (@o, $type, $from, $to, $tmp, $hash, $i);

  @o = get_block($handle);
  return '' unless (@o);

  if (scalar(@o) == 1) {
    qlog "Warning! [$#o] Strange end of message from master... Skipping.\n";
    qlog join(";;", @o) . "\n";
    return '-';
  }
  chomp @o;

  # Check it...
  qlog "HEADER: '$o[0]'\n" if ($_debug_log_head);
  ($from, $to, $hash) = ($o[0] =~ /^\*([^:]+)\s*:([^:]+)\s*:(\S+)$/);
  unless ($from && $to) {
    qlog "Warning! Strange message. No from or to. ($o[0]) Skipping.\n";
    qlog join(";;", @o) . "\n";
    return '-';
  }

  # Get the type
  shift @o;
  $type = $o[0];
  shift @o;
  qlog "GOT: $type;\n";
  unless ($_d_rcv_nolog{$type}) {
    $tmp = join('#', @o);
    $tmp =~ s/\0/^/g;
    qlog "CONTENT: $tmp;\n";
  }

  foreach $i (@o) {
    next unless ($i =~ /^([^:]+)\s*:\s*(.*?)\s*$/);
    if ($1 eq '') {
      qlog "Warning! Bad line: '$i'\n";
      next;
    }
    $out->{$1} = $2;
  }
  $out->{_from} = $from;
  $out->{_to}   = $to;
  $out->{_hash} = $hash;
  $out->{_type} = $type;
  return $hash;
}    # get_parsed_block

#
#  returns uid by username/uid
#
sub get_uid( $ ) {
  return $_[0] if ($_[0] =~ /^\d+$/);

  if (exists($useruid{$_[0]})) {
    return $useruid{$_[0]};
  }

  # nonpriveleged user
  return 65534;
}

#
#  returns group id by group/gid
#
sub get_gid( $ ) {
  return $_[0] if ($_[0] =~ /^\d+$/);

  if (exists($groupid{$_[0]})) {
    return $groupid{$_[0]};
  }

  # nonpriveleged group
  return 65534;
}

#####################################################################
#
# Updates %useruid and %groupid.
#
# args: NONE
# ret:  NONE
#
#####################################################################
sub get_users() {
  my ($v, $u, $g);
  %useruid = %groupid = ();

  while (($u, undef, $v) = getpwent()) {
    $useruid{$u} = $v;
  }
  endpwent();
  while (($g, undef, $v) = getgrent()) {
    $groupid{$g} = $v;
  }
  endgrent();
}    # get_users

# #####################################################################
# #
# # Returns a list of children pids of given pid
# #
# # args: pid
# # ret:  \@pid_list
# #
# #####################################################################
# sub get_children_pids( $ ) {
#   my $pid = $_[0];
#   my @ret = ();

#   my ( $i, $process );

#   while ( ( $i, $process ) = each(%all_pids) ) {
#     push @ret, $i if ( $process->{ppid} == $pid );
#   }
#   return @ret;
# }                               # get_children_pids

#####################################################################
#
# Processes the output from childs
#
# args: NONE
# ret:  NONE
#
#####################################################################
sub read_from_childs() {

  #   my ( $i, $c, $str );

  #   foreach $i ( values(%ran) ) {
  #     if ( defined( $i->{stdout} ) ) {
  #       undef $str;
  #       while ( sysread( $i->{stdout}, $c, 1 ) == 1 ) {
  #         $str .= $c;
  #       }
  #       if ( $str ne '' ) {
  #         qlog "GOT FROM CHILD '$str'\n";
  #       }
  #     }
  #     if ( defined( $i->{stderr} ) ) {
  #       undef $str;
  #       while ( sysread( $i->{stderr}, $c, 1 ) == 1 ) {
  #         $str .= $c;
  #       }
  #       if ( $str ne '' ) {
  #         qlog "GOT ERR FROM CHILD '$str'\n";
  #       }
  #     }
  #   }
}

# #####################################################################
# #
# #  Get all children of given pid
# #
# # args: pid
# # ret:  list of child pids
# #
# #####################################################################

# sub get_childs_pids( $ ) {
#   my $pid = $_[0];

#   my ( @all_childs, @cur_childs );

#   if ( exists( $child_pids{$pid} ) ) {
#     @cur_childs = @{ $child_pids{$pid} };
#   } else {
#     return ();
#   }
#   while ( scalar(@cur_childs) ) {
#     shift @cur_childs;
#     if ( exists( $child_pids{$_} ) ) {
#       push @cur_childs, $child_pids{$_};
#       push @all_childs, $child_pids{$_};
#     }
#   }
#   return @all_childs;
# }

#####################################################################
#
#  Gets pids by exe_mask, parent_mask and user
#
# args: parent_mask
#       exe_mask
#       user
# ret:  list of found pids
#
#####################################################################

sub collect_pids( $$$ ) {
  my ($parent_mask, $exe_mask, $user) = @_;
  my (@parents, @ret, $p, $uid);

  update_pids();
  $uid = get_uid($user);
  if ($parent_mask ne '') {
    eval {
      foreach $p (keys(%{$all_pids})) {
        push @parents, $p
          if ($all_pids->{$p}->{cmdline} =~ /$parent_mask/);
      }
    };

    #      return @ret unless @parents;
  }
  if ($exe_mask ne '') {
    eval {
      foreach $p (keys(%{$all_pids})) {
        if ($all_pids->{$p}->{cmdline} =~ /$exe_mask/) {
          if ((
              @parents == 0
              or grep {$all_pids->{$p}->{ppid} eq $_} @parents
            )
            and ($all_pids->{$p}->{uid} == $uid)
            ) {
            push @ret, $p;
          }
        }
      }
    };
  }
  return @ret;
}

#####################################################################
#
#  Gets pids by exe_mask, parent_mask and user
#
# args: parent_mask
#       exe_mask
#       user
# ret:  list of found pids
#
#####################################################################

sub filter_tasks( $$$ ) {
  my ($parent_mask, $exe_mask, $user) = @_;
  my (@parents, @ret, $p, $uid);

  update_pids();
  $uid = get_uid($user);
  if ($parent_mask ne '') {
    eval {
      foreach $p (keys(%{$all_pids})) {
        push @parents, $p
          if ($all_pids->{$p}->{cmdline} =~ /$parent_mask/);
      }
    };

    #      return @ret unless @parents;
  }
  if ($exe_mask ne '') {
    eval {
      foreach $p (keys(%{$all_pids})) {
        if ($all_pids->{$p}->{cmdline} =~ /$exe_mask/) {
          if ((
              @parents == 0
              or grep {$all_pids->{$p}->{ppid} eq $_} @parents
            )
            and ($all_pids->{$p}->{uid} == $uid)
            ) {
            push @ret, $p;
          }
        }
      }
    };
  }
  return @ret;
}

#####################################################################
#
# Updates {childs} arrays of all %ran entries
#
# args: none
#
# NOTE: call update_pids before!!!
#
#####################################################################

sub update_childs( ) {
  my ($p, $i, $j, %new_childs, %all_my_pids, %check_pids, $flag, $adding);

  $flag = 0;
  eval {
    local $SIG{__DIE__} = sub {;};

    # remember all tasks pids
    %all_my_pids = ();
    foreach $i (keys(%tasks)) {

      #      qlog "111 ($i)\n";
      foreach $p ($tasks{$i}->get_pids()) {

        #        qlog "222 ($i/$p)\n";
        # value = task name!
        $all_my_pids{$p} = $i;
      }
    }

    # find direct(!) childs of our tasks pids
    foreach $p (keys(%{$all_pids})) {
      next if (exists($all_my_pids{$p}));
      next if ($all_pids->{$p}->{inspected} != 0);

      if (exists($all_my_pids{$all_pids->{$p}->{ppid}})) {

        # remember it with task name!
        $new_childs{$p} = $all_my_pids{$all_pids->{$p}->{ppid}};
        $all_pids->{$p}->{inspected} = 1;
      } else {

        # ok, remeber for later checks
        $check_pids{$p} = 1;
      }
    }

    # check unwanted processes last_attach, etc...
    foreach $p (keys(%check_pids)) {
      $all_pids->{$p}->{inspected} = 1;

      # skip dedicated processes
      next if (is_deprecated_pid($p));

      my $not_checked = 1;

      if ($attach_in_progress ne '') {

        # belongs to attaching task?
        if ($all_pids->{$p}->{uid} ==
          get_uid($tasks{$attach_in_progress}->get_attr('user'))) {

          # attach!
          qlog
"Attaching pid: $p ($all_pids->{$p}->{name} / $all_pids->{$p}->{uid}) to $attach_in_progress\n";
          $tasks{$attach_in_progress}->add_pid($p);
          $not_checked = 0;
        }
      }

      # now check if task may attach to last_attach...
      if ($not_checked and exists($tasks{$last_attach})) {
        if ($all_pids->{$p}->{uid} ==
          get_uid($tasks{$last_attach}->get_attr('user'))) {

          # add process to last_attach!!!
          $tasks{$last_attach}->add_pid($p);
          qlog
"Unexpected pid: $p ($all_pids->{$p}->{name} / $all_pids->{$p}->{uid}). Attach to $last_attach.\n";
          $not_checked = 0;
        }
      }
      if ($not_checked > 0) {
        if (is_unwanted_pid($p)) {
          process_unwanted_pid($p);
        } else {

#qlog "Not processed ".get_uid($tasks{$attach_in_progress}->get_attr('user'))." == $all_pids->{$p}->{uid})\n";
          qlog
"Error! I don't know how to process $p ($all_pids->{$p}->{name} / $all_pids->{$p}->{uid})\n";
          qlog "In progress now: $attach_in_progress / "
            . get_uid($tasks{$attach_in_progress}->get_attr('user')) . "\n"
            if ($attach_in_progress ne '');
          qlog "Last attach: $last_attach / "
            . get_uid($tasks{$last_attach}->get_attr('user')) . "\n"
            if ($last_attach ne '');
        }
      }
    }

    # find other (grand-, grand-grand-, ...) childs
    $adding = 1;
    while ($adding) {
      $adding = 0;
      foreach $p (keys(%check_pids)) {
        if (exists($new_childs{$all_pids->{$p}->{ppid}})) {

          # remember it with task name!
          $new_childs{$p} = $new_childs{$all_pids->{$p}->{ppid}};
          delete $check_pids{$p};
          $adding = 1;
        }
      }
    }

    # check hints from wrapper
    foreach $i (@wrap_hints) {

      # if this task is available, remember pid
      if (exists($tasks{$i->[2]})) {

        # add head pid
        $new_childs{$i->[0]} = $i->[2];

        # add process group (negative)
        $new_childs{$i->[1]} = $i->[2];
        qlog "Added hint $i->[0] / $i->[2]\n";
      } else {

        # Illegal task! Kill'em ALL :-E
        # arg is negative, so we're killing pgroup
        qlog "Kill illegal task: $i->[0] $i->[2]\n";
        kill_pid($i->[1]);
      }
    }
    @wrap_hints = ();

    # now %new_childs{pids} contains full list of new tasks names
    # every value eq task name...

    # add them to tasks!
    foreach $p (keys(%new_childs)) {
      if (exists $tasks{$new_childs{$p}}) {
        $tasks{$new_childs{$p}}->add_pid($p);
        qlog "Added: $p to $new_childs{$p}\n";
        $flag = 1;
      } else {
        qlog "ERROR! No such task: $new_childs{$p} (for pid $p)\n";
      }
    }
  };
  if ($@) {
    qlog "Update childs: '$@' (p=$p; new_childs: "
      . join(',', keys(%new_childs)) . ")\n";
  }
  save_state() if $flag;
}    # ~update_childs

#####################################################################
#
#  Updates a table with parent-child dependenses and pids info
#
# args: none
#
#####################################################################

sub update_pids() {
  my ($p, @lines, $new_pids, $i);

  opendir(PROC, '/proc') or return;
  $new_pids = {};
  foreach $p (readdir(PROC)) {

    # do not recheck processes
    if (exists($all_pids->{$p})) {
      $new_pids->{$p} = $all_pids->{$p};
      next;
    }

    # skip trash
    next if ($p !~ /^\d+$/);

    next unless (open(P, "</proc/$p/status"));
    while (<P>) {
      if (/Uid:\s+(\d+)\s+(\d+)/) {
        $new_pids->{$p}->{uid}  = $1;
        $new_pids->{$p}->{euid} = $2;
        next;
      }
      if (/Gid:\s+(\d+)\s+(\d+)/) {
        $new_pids->{$p}->{gid}  = $1;
        $new_pids->{$p}->{egid} = $2;
        next;
      }
      if (/PPid:\s+(\d+)/) {
        $new_pids->{$p}->{ppid} = $1;
        next;
      }
      if (/Name:\s+(\S+)/) {
        $new_pids->{$p}->{name} = $1;
        next;
      }
    }
    close P;
    next unless (open(P, "</proc/$p/cmdline"));
    @lines = <P>;
    $new_pids->{$p}->{cmdline} = join(' ', @lines);
    $new_pids->{$p}->{cmdline} =~ s/\0/ /g;
    close P;
  }
  closedir(PROC);
UPDATE_PIDS_LOOP:
  foreach $p (keys(%{$all_pids})) {
    next if (exists $new_pids->{$p});
    foreach $i (@reaped) {
      next UPDATE_PIDS_LOOP if $i == $p;
    }
    push @reaped,   $p;
    push @reapcode, 127;
    qlog "DEAD $p ($all_pids->{$p}->{cmdline})\n";
  }
  $all_pids = $new_pids;
}

#
#  Do actions on unwanted pid.
#
#  Arg: pid.
#
#
sub process_unwanted_pid( $ ) {
  my $p = $_[0];
  my $f = get_setting('filter_users');

  if ($f == 0) {

    # ignore
    return;
  } elsif ($f == 1) {

    #warn
    qlog
      "Unwanted pid $p ($all_pids->{$p}->{name} / $all_pids->{$p}->{uid}).\n";
  } else {

    # kill!
    qlog
      "Unwanted pid $p ($all_pids->{$p}->{name} / $all_pids->{$p}->{uid}).\n";
    qlog "KILL $p!\n";
    kill_pid($p);
  }
}

#####################################################################
#
#  Substitute pseudo-varibles by actual values
#  (actually - only $node)
#
# args: text
#       struct
#
#####################################################################

sub subst_task_prop( $$ ) {
  my ($_node) = $my_name;
  my ($text, $child) = @_;

  $$text =~ s/\$([\w\d_]+)/'$_'.$1/gee;
}

#################################################################################
##
##
##           LOCAL ACTIONS
##
#################################################################################

{
  my $last_pids_update;
  my $last_users_update;

  sub local_checks() {
    my $t = time;
    my $i;

    my $save_needed = 0;

    # try to read any hints about new tasks
    if (<$wrapper>) {
      chomp;

      # pid sid queue:id
      if (/(\d+) (-?\d+) (\S+)/) {
        qlog "Readed hint: '$_' \n";
        push @wrap_hints, [$1, $2, $3];
      }
    }

    if ($t > $last_pids_update + get_setting('pids_update_interval')) {
      update_pids();
      update_childs();

      #
      # hard pid kills
      #
      for $i (keys(%delayed_kills)) {
        if ($delayed_kills{$i} <= $t) {
          qlog "Delayed hard kill pid $i\n";
          if (kill(0, $i) and !is_deprecated_pid($i)) {
            kill 'KILL', -$i if $i > 0;
            kill 'KILL', $i;
          }
          delete $delayed_kills{$i};
        }
      }

      #       #
      #       # soft kill pids
      #       #
      #       for $i ( keys(%kill_em) ) {
      #         if ( $kill_em{$i} <= $t ) {
      #           qlog "Delayed kill of $i\n";
      #           kill_pid($i);
      #           delete $kill_em{$i};
      #         }
      #       }
      #
      # hard kill tasks
      #
      foreach $i (keys(%delayed_task_kills)) {
        if ($delayed_task_kills{$i} <= $t) {
          if (exists $tasks{$i}) {
            qlog "Delayed task kill: $i\n";
            $tasks{$i}->kill('KILL');
            unless (
              deldir(
                $tasks{$i}->get_attr('tmpdir'),
                $tasks{$i}->get_attr('user')
              )
              ) {
              qlog 'Cannot delete temp dir '
                . $tasks{$i}->get_attr('tmpdir') . ' by '
                . $tasks{$i}->get_attr('user') . "\n";
            }
            answer_to_server(
              'main',     0,
              'finished', $tasks{$i}->get_attr('id'),
              'code',     127,
              'com_line', $tasks{$i}->get_attr('com_line'),
              'owner',    $tasks{$i}->get_attr('owner'),
              'is_rsh',   $tasks{$i}->get_attr('is_rsh')
            );    #,
                  #'pid',      $i );
            $tasks{$i}->mark_dead();
            $tasks{$i}
              ->set_attr('ttl', time + get_setting('dead_cleanup_time'));
          }
          delete $delayed_task_kills{$i};

          #delete $tasks{$i};
          $save_needed = 1;
        }
      }

      $last_pids_update = $t;
    }

    #
    # update users list every hour
    #
    if ($t > $last_users_update + 3600) {
      get_users();
      $last_users_update = $t;
    }

    #
    # HARD kill pids
    #
    for $i (keys(%kill_pids)) {
      if ($kill_pids{$i}->{time} > $t) {
        kill_pid_action($i);
        delete $kill_pids{$i};
      }
    }

#!    #
#!    #  Check for delayed init attaches
#!    #
#!    if( $delayed_requests{init_attach}->{timeout} > $t ) {
#!
#!      # request timed out
#!
#!      $delayed_requests{init_attach}->{blocked} = 0;
#!      $delayed_requests{timeout} = 0;
#!      qlog
#!        "INIT_ATTACH timed out. No attach request was sent. Forget, try next.\n";
#!    }

    #!    if ( @delayed_attaches
    #!         and ( $delayed_requests{init_attach}->{blocked} == 0 ) ) {
    #!      my $args = shift @delayed_attaches;
    #!      qlog "Delayed init_attach ($args->[0])\n";
    #!      init_attach_real_handler(@$args);
    #!    }

    #!    #
    #!    # Do attach actions
    #!    #
    #!    if (@attach_requests) {
    #!      for ( $i = 0; $i < @attach_requests; ) {
    #!        attach_handler_second_stage( $attach_requests[$i] );
    #!        if ( $attach_requests[$i]->{tmout} < $t ) {
    #!          answer_to_server(
    #!                           $attach_requests[$i]->{from},
    #!                           $attach_requests[$i]->{hash},
    #!                           'attach',
    #!                           $attach_requests[$i]->{id},
    #!                           'success',
    #!                           '1' );
    #!          splice( @attach_requests, $i, 1 );
    #!          $delayed_requests{init_attach}->{blocked} = 0;
    #!          $delayed_requests{init_attach}->{timeout} = 0;
    #!        } else {
    #!          ++$i;
    #!        }
    #!      }
    #!    }

    # is attach progress timed out?
    if ($attach_tmout > 0 and $attach_tmout < time) {
      qlog "Finishing attach for $attach_in_progress.\n";
      if (exists($tasks{$attach_in_progress})) {
        do_attach(
          $attach_in_progress,
          $tasks{$attach_in_progress}->get_attr('user'),
          $tasks{$attach_in_progress}->get_attr('owner')
        );

        # save last attach data...
        $last_attach = $attach_in_progress;

      }

      # cancel attaching
      $attach_in_progress = '';
      $attach_tmout       = 0;
    }

    #
    #  Check childs, who didn't send sigchild
    #
    if ($t > $last_ran_check + get_setting('last_ran_check_interval')) {
      ran_check();
      $last_ran_check = $t;
    }

    # delete delayed killed tasks
    foreach $i (keys(%tasks)) {

      # is marked as dead?
      if ($tasks{$i}->is_dead) {

        # timed out?
        if ($tasks{$i}->get_attr('ttl') > $t) {
          delete $tasks{$i};
        }
      }
    }

    #
    #  read, send, check deads
    #
    read_from_childs();

    #flush_server_channel();
    Cleo::Conn::allflush;
    do_reap();

    save_state() if $save_needed;
  }
};

# restart service
sub restart() {
  my $i;
  my $p = fork();
  exit(1) unless defined $p;    # FAIL 8(
  exit(1) if ($p < 0);          # FAIL 8(

  if ($p) {                     # Parent.
    exit(0);
  }

  # try to daemonize
  for ($i = 0; $i < 10; ++$i) {
    select(undef, undef, undef, 0.1);
    $p = fork();
    next unless defined $p;
    next if $p < 0;
    last;
  }
  exit 1 unless defined $p;
  exit 1 if ($p != 0);

  if (1) {    # || POSIX::setsid()!=-1) {
    $SRV->disconnect;
    eval {close STATUS;};

    exec('/usr/sbin/cleo-mon');    # THE CULMINATION!
  }
  exit 1;
}

# process error singal (sigsegv e.g.)
sub err_signal( $ ) {
  qlog "Error! I got singal $_[0]...\n";
  if (++$err_signals{$_[0]} > $MAX_ERR_SIG) {
    qlog "Too many ERROR singals. Restarting.\n";
    restart();
  }
}

##############################################################################
##############################################################################
##############################################################################
##
##
##           THE MAIN PROGRAMM
##
##############################################################################
##############################################################################
##############################################################################

eval {
  my (
    $parent, $s,      @s,      $pid,       $i, $foreground,
    $f,      %answer, $config, $conf_load, $log_file
  );

  $SIG{CHLD} = \&REAPER;
  $SIG{HUP}  = \&hup_processor;
  $SIG{USR1} = \&usr1_processor;
  $SIG{USR2} = \&usr2_processor;
  $SIG{TERM} = sub {qlog "GOT SIGTERM\n"; exit_handler(0, 0, 0, 0);};
  $SIG{QUIT} = sub {err_signal('QUIT');};
  $SIG{BUS}  = sub {err_signal('BUS');};
  $SIG{SEGV} = sub {err_signal('SEGV');};
  $SIG{FPE}  = sub {err_signal('FPE');};
  $SIG{ILL}  = sub {err_signal('ILL');};
  $SIG{XCPU} = sub {err_signal('XCPU');};

  $SIG{PIPE}    = sub {qlog "Error! PIPE...\n";};
  $SIG{ALRM}    = sub {qlog "Error! Unhadled alarm\n"; die "alarm\n";};
  $SIG{__DIE__} = sub {qlog "A!!!!!!!!!! I'm dying: '$_[0]'\n"; print_stack;};

  $foreground = 0;

  $config = '/etc/cleo-mon.conf';
  while ($ARGV[0]) {
    if ($ARGV[0] =~ /^-(\w)/) {
      if ($1 eq 'f') {
        $foreground = 1;
      }
    } else {
      $config = $ARGV[0];
    }
    shift;
  }

  unless ($foreground) {
    open(STDIN,  '</dev/null');
    open(STDOUT, '>/dev/null');
    open(STDERR, '>/dev/null');

    #  close STDIN;
    #  close STDOUT;
    #  close STDERR;
  }

  $server         = 'localhost';
  $last_ran_check = 0;

  %global_settings = %def_global_settings;
  if (!load_config($config)) {
    $conf_load = 1;

    #qlog "Loaded config from $config\n";
    foreach $i (keys(%new_global_settings)) {
      $global_settings{$i} = $new_global_settings{$i};
    }
  }

  if ($foreground) {
    $STATUS = *STDOUT;
  } else {
    hup_processor();
  }

  $my_name = get_setting('hostname');
  if ($my_name eq '') {
    $my_name = `uname -n`;
    chomp $my_name;
  }

  setpriority(0, $$, -19);

  # disable OOM-killer for us...
  if (open(OOM, ">/proc/$$/oom_adj")) {
    print OOM "-17\n";
  }

  qlog "Started $my_name\n";

  qlog "Failed load conf file '$config'. Use defaults\n" unless $conf_load;

  load_state();

  qlog("Checking old ran tasks...\n");
  ran_check();
  qlog("Done checking old ran tasks...\n");

  # create wrapper pipe
  mkfifo('/tmp/cleo-wrapper', 0722);

  # try to open it
  $wrapper = new IO::File;
  no strict;
  eval {
    local $SIG{__DIE__} = sub {return;};
    &O_CLOEXEC();
  };
  if ($@) {
    eval "sub O_CLOEXEC(){return 0;}";
  }
  if ($wrapper->open('/tmp/cleo-wrapper', O_RDONLY | O_CLOEXEC | O_NONBLOCK)) {
    qlog "Wrapper socket opened\n";
  } else {
    $wrapper->open('< /dev/null');
  }
  use strict;

  # create listen socket
  $port = get_setting('port');
  $LST = Cleo::Conn->new_listen($port, 16);
  unless (defined $LST) {
    qlog "Cannot create listening socket. Exit.\n";
    exit(1);
  }

  if ($LST->listen) {
    qlog "Cannot create listening socket on $port. Exit.\n";
    exit(1);
  }

  daemonize() unless $foreground;

  if ($my_name eq '' or $my_name eq '(none)') {

    # empty name!
    $0 = 'cleo-mon WAITING FOR HOSTNAME';

    while ($my_name eq '') {
      sleep 10;
    }
  }

  $0 = 'cleo-mon';

  #  $port = get_setting('smart_port');
  #  $SH   = make_listen_socket($port)
  #    or die "Cannot make listen socket on smart port '$port'\n";

  register_mon_rcv('init',          \&init_handler);
  register_mon_rcv('ping',          \&ping_handler);
  register_mon_rcv('run',           \&run_handler);
  register_mon_rcv('run_first',     \&run_first_handler);
  register_mon_rcv('kill',          \&kill_handler);
  register_mon_rcv('kill_pid',      \&kill_pid_handler);
  register_mon_rcv('stat',          \&stat_handler);
  register_mon_rcv('exit',          \&exit_handler);
  register_mon_rcv('init_attach',   \&init_attach_handler);
  register_mon_rcv('attach',        \&attach_handler);
  register_mon_rcv('cancel_attach', \&cancel_attach_handler);
  register_mon_rcv('internal_info', \&int_info_handler);
  register_mon_rcv('signal',        \&signal_handler);

  unless (open(PID, ">/var/run/qmon.pid"))
  {    # or die "Cannot write pid to /var/run/qmon.pid!\n";
    open(PID, ">/tmp/qmon.pid");
  }
  print PID $$;
  close PID;

  #####################################################################
  #
  # The main loop
  #
  #####################################################################
  for (;;) {

    #
    #  Check new connections from server, if needed
    #

    if (!defined $SRV or ($SRV->get_state ne 'ok')) {
      if (defined $SRV) {
        qlog "Lost server connection\n";
        $SRV->disconnect;
        undef $SRV;
      }
      if ($LST->get_state eq 'dead') {
        $LST->listen;
      }
      if ($LST->get_state eq 'listen') {
        $SRV = $LST->accept;
        if (defined $SRV) {
          qlog "Connection from server " . $SRV->get_peer() . "!\n";
          $init_conn_time = time;
        }
      }
    }

    #
    #  Check new messages from server
    #
    rcv_from_server();

    #
    #  Check new shells
    #
    #    $shell_conn = $SH->accept();
    #    if ($shell_conn) {
    #      update_pids();
    #      update_childs();
    #      eval {
    #        qlog "New rshell!\n";
    #        $SIG{ALRM} = sub { die "rsh"; };
    #        alarm 2;
    #        @s = <$shell_conn>;
    #        close $shell_conn;
    #        $s = shift @s;
    #        if ( $s =~ /(\d+)/ ) {
    #          $pid = $1;
    #          qlog "PID=$pid !\n";
    #          push @fake_rshells, $pid;
    #          $parent = 0;
    #        RSH_LOOP:
    #          foreach $i ( keys(%tasks) ) {
    #            if($tasks{$i}->check_pid($pid)){
    #              qlog "Belongs to task $i\n";
    #              last RSH_LOOP;
    #            }
    #          }
    #        }
    #
    #        $s = shift(@s);
    #        qlog "Got line: '$s'\n";
    #
    #        unless ($parent) {
    #
    #          alarm 0;
    #          die "FAKE RSH - no parent!!!($s)\n";
    #        }
    #
    #        if ( $s eq '' ) {
    #          qlog "empty rsh answer!\n";
    #          alarm 0;
    #          return;
    #        }
    #        $s .= "\n" if ( substr( $s, -1, 1 ) ne "\n" );
    #
    #        %answer             = ();
    #        $answer{user}       = $tasks{$i}->get_attr('user');
    #        $answer{id}         = $tasks{$i}->get_attr('id');
    #        $answer{owner}      = $tasks{$i}->get_attr('owner');
    #        $answer{stdout}     = $tasks{$i}->get_attr('stdout');
    #        $answer{stderr}     = $tasks{$i}->get_attr('stderr');
    #        $answer{stdin}      = $tasks{$i}->get_attr('stdin');
    #        $answer{outfile}    = $tasks{$i}->get_attr('outfile');
    #        $answer{rsh_string} = $s;
    #
    #        $answer{pid} = $pid;
    #
    #        qlog "Success!\n";
    #        answer_to_server( 'main', 0, 'run_sh', 1, %answer );
    #      };
    #      alarm 0;
    #      qlog "Ops: $@\n" if $@;
    #    }

    #
    #  Make all local checks
    #
    local_checks();

    #
    #  Send answers to server
    #
    #flush_to_server();
    Cleo::Conn::allflush;
    select(undef, undef, undef, 0.1);
  }
};    # ~eval

qlog "Monitor has die. Reason:$@\n";
print_stack;
close STATUS;
die "Ooops... I cant belive it! ($@) [$ENV{PATH}]\n";

#####################################################################
#####################################################################
#####################################################################
#####################################################################
#####################################################################

#####################################################################
#
#  Handlers
#
#####################################################################

#
#   INIT         HANDLER
#
#####################################################################
sub init_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;
  my $i;

  qlog "HANDLER: INIT\n";

  #  qlog "Creating connection to $server:$args->{port}\n";
  foreach $i (keys(%$args)) {
    next if ($i eq 'port' or $i eq 'auth');
    $global_settings{$i} = $args->{$i};
    qlog "Setted parameter '$i' to '$args->{$i}'\n";
  }

  #  $To_server = IO::Socket::INET->new(
  #                                     PeerAddr => $server,
  #                                     PeerPort => $args->{port},
  #                                     Proto    => 'tcp' );
  #  unless ($To_server) {
  #    qlog "Cannot create socket: $!\n";
  #    return;
  #  }
  #  qlog "Success\n";
  $init_conn_time = 0;    # Reset timeout
}

#
#   PING         HANDLER
#
#####################################################################
sub ping_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  qlog "HANDLER: PING\n" if ($debug_level > 1);
  answer_to_server($from, $hash, 'ping', 1);
}

#
#   RUN_FIRST          HANDLER
#
#####################################################################
sub run_first_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  my (%answer, @files, @files2, $i, $j, $f, $found_file);
  my ($e, $user, $group, $pid);

  qlog "HANDLER: RUN_FIRST (id=$args->{id}, owner=$args->{owner})\n";

  $answer{id}    = $args->{id};
  $answer{owner} = $args->{owner};

  if ($args->{file_mask} ne '') {
    statfiles($args->{file_mask}, \@files);
    $pid = execute_task($args, \%answer);
    if ($pid < 0) {
      $answer{reason} = 'cannot execute';
      answer_to_server($from, $hash, 'run_first', 0, %answer);
      return;
    }
    sleep 3;
    statfiles($args->{file_mask}, \@files2);
    $found_file = '';
  RFH_LOOP:
    while ($j = pop @files2) {
      for ($i = 0; $i <= $#files; ++$i) {
        if ($j eq $files[$i]) {
          splice(@files, $i, 1);
          next RFH_LOOP;
        }
      }
      $found_file = $j;
      last;
    }
    if ($found_file ne '') {
      if (open(F, "<$found_file")) {
        my @lines = <F>;
        close F;
        my $l = pack('u', join('', @lines));
        $l =~ s/\n//g;
        $answer{file} = $l;
        answer_to_server($from, $hash, 'run_first', 1, %answer);
        return;
      }
    }
    $answer{reason} = 'no file by mask';
    answer_to_server($from, $hash, 'run_first', 0, %answer);
  } else {

    #  use 'smart' rsh replacements
    eval {
      $pid = execute_task($args, \%answer);
      if ($pid < 1) {
        qlog "Exec failed!\n";
        alarm 0;
        $answer{reason} = 'exec failed';
        answer_to_server($from, $hash, 'run_first', 0, %answer);
        return;
      }
      answer_to_server($from, $hash, 'run_first', 1, %answer);

      #       $rsh_num=$args->{nproc}-($args->{count_first}?1:0);
      #       $rsh_hash=$hash;
      #       $rsh_from=$from;
    };
    alarm 0;
    update_pids();
    update_childs();
  }
}

#
#   RUN          HANDLER
#
#####################################################################
sub run_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  my (%answer, $pipe_stdin, $pipe_stdout, $pipe_stderr, $pid);
  my ($e, $user, $group, $t);

  qlog "HANDLER: RUN (id=$args->{id}, owner=$args->{owner})\n";

  $answer{id}    = $args->{id};
  $answer{owner} = $args->{owner};

  $args->{node} = $my_name;
  if ($args->{second_run} eq $my_name) {
    unless (exists($tasks{"$args->{owner}:$args->{id}"})) {
      $answer{reason} = 'no such task!';
      answer_to_server($from, $hash, 'run', 0, %answer);
      qlog
        "NO SUCH TASK! ($args->{id},$args->{owner},rsh_pid=$args->{rsh_pid})\n";
      return;
    }
    my $new_pid = execute_task($args, \%answer);
    if ($new_pid < 1) {
      $answer{reason} = 'cannot execute';
      answer_to_server($from, $hash, 'run', 0, %answer);
    }
    $t = "$args->{owner}:$args->{id}";
    $tasks{$t} = Task::new();
    $tasks{$t}->set_head($new_pid);
    $tasks{$t}->set_attr('rsh_pid', $args->{rsh_pid});
    $tasks{$t}->set_attr('user',    $args->{user});
    $tasks{$t}->set_attr('id',      $args->{id});
    $tasks{$t}
      ->set_attr('hard_kill_after_head', get_setting('hard_kill_after_head'));

    $answer{pid} = $new_pid;
    qlog "New TASK RUNNED!($args->{id})[$new_pid/$ran{$new_pid}->{rsh_pid}]\n";
  } else {
    $pid = execute_task($args, \%answer);
    if ($pid < 1) {
      $answer{reason} = 'cannot execute';
      answer_to_server($from, $hash, 'run', 0, %answer);
    }

    # successfully runned task. fill attributes
    $answer{pid} = $pid;
    $t           = "$args->{owner}:$args->{id}";
    $tasks{$t}   = Task::new();
    $tasks{$t}->set_head($pid);
    $tasks{$t}->set_attr('rsh_pid', $args->{rsh_pid});
    $tasks{$t}->set_attr('user',    $args->{user});
    $tasks{$t}->set_attr('id',      $args->{id});
    $tasks{$t}
      ->set_attr('hard_kill_after_head', get_setting('hard_kill_after_head'));

    qlog "TASK RUNNED! [$pid]($args->{rsh_pid}/$args->{id})\n";
  }
  select(undef, undef, undef, 0.1);
  update_pids();
  update_childs();
  answer_to_server($from, $hash, 'run', 1, %answer);
}    # run_handler

#
#   KILL         HANDLER
#
#   kills by id and owner.
#
#####################################################################
sub kill_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  my ($pid, @pids, $i, $name);

  qlog "HANDLER: KILL (id=$args->{id}, owner=$args->{owner}, "
    . "user=$args->{user}, task=$args->{task})\n";
  update_pids();
  update_childs();

  $name = "$args->{owner}:$args->{id}";

  if (exists($tasks{$name})) {

    # task exists now

    # is this task in attach process?
    if ($attach_in_progress eq $name) {
      if ($args->{user} eq '') {
        $args->{user} = $tasks{$name}->get_attr('user');
      }

      do_attach($name, $args->{user}, $args->{owner});
      qlog "Cancel attach $name\n";
      $last_attach        = $attach_in_progress;
      $attach_in_progress = '';
      $attach_tmout       = 0;
    }

    # do soft kill
    unless ($tasks{$name}->is_dead) {
      $tasks{$name}->kill('TERM');
      $delayed_task_kills{$name} =
        time + $tasks{$name}->get_attr('hard_kill_after_head');
      $tasks{$name}->mark_dead;
    }
    answer_to_server($from, $hash, 'kill', $args->{id}, 'success', 1);
  } else {
    answer_to_server($from, $hash, 'kill', $args->{id}, 'success', 0, 'reason',
      'No such task!');
  }

  save_state();
  qlog "SOFT KILL finished\n";
}    # kill_handler

#
#   KILL_PID     HANDLER
#
#   kills by pid
#
#####################################################################
sub kill_pid_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  my ($pid, @pids);

  qlog "HANDLER: KILL_PID (id=$args->{id}, pid=$args->{pid}, "
    . "owner=$args->{owner}, user=$args->{user}, task=$args->{task})\n";

  #  qlog "KILL_PID: " . join( ':', %$args ) . ";\n";
  if ($args->{pid}) {
    $args->{wait_secs} = 0 unless defined $args->{wait_secs};
    $kill_pids{$args->{pid}}->{time} = time + $args->{wait_secs};
    $kill_pids{$args->{pid}}->{hash} = $hash;
    $kill_pids{$args->{pid}}->{from} = $from;
    $kill_pids{$args->{pid}}->{id}   = $args->{id};
  } else {
    answer_to_server($from, $hash, 'kill_pid', $args->{id}, 'success', 1);
  }
  save_state();
}    # kill_pid_handler

sub kill_pid_action( $ ) {
  my $pid = $_[0];
  kill_pid($pid);
  answer_to_server(
    $kill_pids{$pid}->{from},
    $kill_pids{$pid}->{hash},
    'kill_pid', $kill_pids{$pid}->{id},
    'success', 1
  );
}

#
#   EXIT         HANDLER
#
#   exits the monitor.
#
#####################################################################
sub exit_handler( $$$$ ) {

  qlog "Exiting...\n";
  save_state();
  kill_pid('all');
  qlog "Shutdown.\n";
  exit(0);
}    # exit_handler

#
#   STAT         HANDLER
#
#####################################################################
sub stat_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  answer_to_server($from, $hash, 'stat', 0, 'reason', 'Not implemented yet');
}    # kill_handler

#
#   INTERNAL_INFO         HANDLER
#
#####################################################################
sub int_info_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;
  my @val;

  # update node name...
  $my_name = `uname -n`;
  chomp $my_name;

  foreach my $i (keys(%tasks)) {

    #    $ran{$i}->{is_rsh} = 0 unless ( $ran{$i}->{is_rsh} );
    my @pids = $tasks{$i}->get_pids();
    my $pid = scalar(@pids) > 0 ? $pids[0] : 0;

    # empty task?
    if ($pid == 0) {
      qlog "Warning! Empty task found ($i)! Delayed detele info...\n";
      $tasks{$i}->mark_dead();
      $tasks{$i}->set_attr('ttl', time + get_setting('dead_cleanup_time'));
      next;
    }
    my $is_rsh = $tasks{$i}->get_attr('is_rsh');
    $is_rsh = 0 if ($is_rsh eq '');
    my $owner = $tasks{$i}->get_attr('owner');
    if ($owner eq '') {
      $owner = '_none_';
      $tasks{$i}->set_attr('owner', '_none_');
    }
    my $id = $tasks{$i}->get_attr('id');
    if ($id eq '') {
      $id = '0';
      $tasks{$i}->set_attr('id', 0);
    }

    push @val, "id:$id owner:$owner is_rsh:$is_rsh pid:$pid";
  }
  push @val, "ver:$VERSION";
  answer_to_server($from, $hash, 'internal_info', 1, 'val', join('#', @val));
}    # int_info_handler

#
#
#  Attach to predefined task collected pids.
#
#  arg: - task id
#       - user
#       - owner
#
##########################################################
sub do_attach( $$$;$ ) {
  my ($id, $user, $owner, $temp_dir) = @_;
  my (@collected, @new_coll, @new_attached);
  my ($i, $p, $pmask, $emask);

  qlog "DO_ATTACH: id=$id, user=$user, owner=$owner\n";

  push @collected, @attach_collected;

  if (!defined $tasks{$id}) {
    qlog "Warning! Task '$id' does not exists yet (init_attach failed?)\n";
    $tasks{$id} = Task::new();
    $tasks{$id}->set_attr('user', $user);
    $id =~ /:(.*)/;
    $tasks{$id}->set_attr('id',    $1);
    $tasks{$id}->set_attr('owner', $owner);
    $tasks{$id}
      ->set_attr('hard_kill_after_head', get_setting('hard_kill_after_head'));
    if ($temp_dir ne '') {
      launch(0, "mkdir $temp_dir",      "mkdir-$id", $user);
      launch(0, "chmod 0700 $temp_dir", "chmod-$id", $user);
    }
  }

  $pmask = $tasks{$id}->get_attr('parent_mask');
  $emask = $tasks{$id}->get_attr('exe_mask');
  if ($emask == '') {$emask = '.*';}
  update_pids();
  update_childs();
  @new_coll = collect_pids($pmask, $emask, $user);
  if (@new_coll > @collected) {
    qlog "Collected($pmask,$emask,$user): " . scalar(@new_coll) . "\n";
    qlog "Was collected: " . scalar(@collected) . "\n";
  ATTACH_LOOP:
    foreach $p (@new_coll) {

      foreach $i (values(%tasks)) {

        # skip already runned tasks
        next ATTACH_LOOP if ($i->check_pid($p));

        # skip cleo launching processes
        next ATTACH_LOOP if ($all_pids->{$p}->{name} =~ /^CLEO LAUNCH/);
      }

      #skip already counted
      next if grep {$_ eq $p} @collected;

      # take in account
      push @new_attached, $p;
      qlog "Attach to $id successfull: $p ($all_pids->{$p}->{cmdline})\n";

      #      unless ( $args->{all} ) {
      #        qlog "Exit attaching!\n";
      #        last;
      #      }
    }
  }

  #  $args->{collected} = \@new_coll;

  # all new pids were collected
  #return unless (@new_attached);

  # add pids to the task
  foreach $p (@new_attached) {
    $tasks{$id}->add_pid($p);

    #qlog "ATT: $p: $ran{$p}->{id}/$ran{$p}->{owner}/$ran{$p}->{name};\n";
    #return unless ( $args->{all} );
  }
  save_state();
}

#
#   INIT_ATTACH         HANDLER
#
#####################################################################
sub init_attach_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;
  my ($attach_parent_mask, $attach_exe_mask);

  $attach_parent_mask = $args->{parent_mask};
  $attach_exe_mask    = $args->{exe_mask};
  if ($attach_exe_mask == '') {
    $attach_exe_mask = '.*';
  }

  qlog "HANDLER: INIT_ATTACH (id=$args->{id}, "
    . "owner=$args->{owner}, user=$args->{user})\n";
  if (($attach_in_progress ne '') and (exists($tasks{$attach_in_progress}))) {

    qlog
      "Warning: Another attach in progress ($attach_in_progress). Switch!!!\n";

    # finish attach to old task
    do_attach(
      $attach_in_progress,
      $tasks{$attach_in_progress}->get_attr('user'),
      $tasks{$attach_in_progress}->get_attr('owner')
    );
  }

  # create task for attaching.
  my $t = "$args->{owner}:$args->{id}";
  $tasks{$t} = Task::new();
  $tasks{$t}->set_attr('user',    $args->{user});
  $tasks{$t}->set_attr('rsh_pid', $args->{rsh_pid});
  $tasks{$t}->set_attr('id',      $args->{id});
  $tasks{$t}->set_attr('owner',   $args->{owner});
  $tasks{$t}
    ->set_attr('hard_kill_after_head', get_setting('hard_kill_after_head'));

  if ($args->{temp_dir} ne '') {
    launch(0, "mkdir $args->{temp_dir}",      "mkdir-$t", $args->{user});
    launch(0, "chmod 0700 $args->{temp_dir}", "chmod-$t", $args->{user});
  }

  # remember new task
  $attach_in_progress = $t;
  $attach_tmout =
    time + ($args->{tmout} > 0 ? $args->{tmout} : get_setting('attach_tmout'));

  my %x = ('node' => $my_name);
  subst_task_prop(\$attach_exe_mask,    \%x);
  subst_task_prop(\$attach_parent_mask, \%x);
  @attach_collected =
    collect_pids($attach_parent_mask, $attach_exe_mask, $args->{user});
  qlog "Collected: "
    . scalar(@attach_collected)
    . " PIDS: "
    . join(';', @attach_collected) . "\n";

  answer_to_server($from, $hash, 'init_attach', 0, 'success', '1');
  return;
}    # init_attach_handler

#
#   ATTACH         HANDLER
#
#####################################################################
sub attach_handler( $$$$ ) {

  #  my ( $type, $hash, $from, $args ) = @_;
  #  my $t;
  #
  #  $t="$args->{owner}:$args->{id}";
  #
  #  qlog "HANDLER: ATTACH '$t' $args->{user}\n";
  #
  #  if(!defined $tasks{$t}){
  #      qlog "Warning! Task '$t' does not exists yet (init_attach failed?)\n";
  #      #$tasks{$t}=Task::new();
##!!!
#      $tasks{$t}=Task::new();
#          $tasks{$t}->set_attr('user',$args->{user});
#          $tasks{$t}->set_attr('id',$args->{id});
#          $tasks{$t}->set_attr('owner',$args->{owner});
#          $tasks{$t}->set_attr('hard_kill_after_head',get_setting('hard_kill_after_head'));
#
#      #answer_to_server( $from, $hash, 'attach', $args->{id}, 'success', '0',
#      #                  'reason' , 'no such task');
#  }
#
#  # do actual attach
#  if($attach_in_progress==$t){
#    do_attach("$args->{owner}:$args->{id}", $args->{user}, $args->{owner});
#    $last_attach=$attach_in_progress;
#    qlog "!!    1\n";
#    $attach_tmout=0;
#    $attach_in_progress=0;
#  }
#
#  answer_to_server($from,$hash,'attach',$args->{id},'success','1');
#
  my ($type, $hash, $from, $args) = @_;
  my $t;

  $t = "$args->{owner}:$args->{id}";

  qlog "HANDLER: ATTACH '$t' $args->{user} ($args->{tmout})\n";

  if (!defined $tasks{$t}) {
    qlog "Warning! Task '$t' does not exists yet (init_attach failed?)\n";
    $tasks{$t} = Task::new();
    $tasks{$t}->set_attr('user',  $args->{user});
    $tasks{$t}->set_attr('id',    $args->{id});
    $tasks{$t}->set_attr('owner', $args->{owner});
    $tasks{$t}
      ->set_attr('hard_kill_after_head', get_setting('hard_kill_after_head'));

    #answer_to_server( $from, $hash, 'attach', $args->{id}, 'success', '0',
    #                  'reason' , 'no such task');
  }

  # not current attach???
  if ($attach_in_progress != $t) {

    #finish that attach
    qlog
"Warning! Task $attach_in_progress was not attache yet. Finishing attach.\n";
    do_attach(
      $attach_in_progress,
      $tasks{$attach_in_progress}->get_attr('user'),
      $tasks{$attach_in_progress}->get_attr('owner')
    );

  }
  $attach_in_progress = $t;
  $attach_tmout =
    time + ($args->{tmout} > 0 ? $args->{tmout} : get_setting('attach_tmout'));

  answer_to_server($from, $hash, 'attach', $args->{id}, 'success', '1');

}    # attach_handler

#
#   CANCEL_ATTACH         HANDLER
#
#####################################################################
sub cancel_attach_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;
  my ($attach_parent_mask, $attach_exe_mask);

  qlog "HANDLER: CANCEL_ATTACH (id=$args->{id}, "
    . "owner=$args->{owner}, user=$args->{user})\n";
  if ($attach_in_progress eq '') {

    qlog "No attach in progress...\n";
  }

  delete $tasks{$attach_in_progress};
  @attach_collected   = ();
  $attach_in_progress = '';

  answer_to_server($from, $hash, 'cancel_attach', 0, 'success', '1');
  return;
}    # cancel_attach_handler

#
#   SIGNAL (FREEZE)      HANDLER
#
#####################################################################
sub signal_handler( $$$$ ) {
  my ($type, $hash, $from, $args) = @_;

  my ($pid, @pids, $i, $name);

  qlog "HANDLER: SIGNAL (id=$args->{id}, owner=$args->{owner}, "
    . "user=$args->{user}, task=$args->{task}, val=$args->{val})\n";
  update_pids();
  update_childs();

  $name = "$args->{owner}:$args->{id}";
  if (exists($tasks{$name})) {

    # task exists now

    # do freeze
    unless ($tasks{$name}->is_dead) {
      qlog "DO signal '$args->{val}' on task '$name'\n";
      $tasks{$name}->kill($args->{val});
    }
    answer_to_server($from, $hash, 'signal', $args->{id}, 'success', 1);
  } else {
    answer_to_server($from, $hash, 'signal', $args->{id}, 'success', 1,
      'reason', 'No such task!');
  }

  save_state();
  qlog "SIGNAL finished\n";
}    # freeze_handler

sub execute_task($$ ) {
  my ($args, $answer) = @_;

  my ($the_suexec_gid, $user, $group, $pipe_stdin, $pipe_stdout, $pipe_stderr,
    $pid, $e, $v, $g, $t);

RUN_H_CRADDLE:
  {    # ^
    if (defined $args->{suexec_gid}) {
      $args->{suexec_gid} =~ /^(\d+)/;
      $the_suexec_gid = $1;
    } else {
      $the_suexec_gid = get_setting("suexec_gid");
    }

    $answer->{error} = "Cannot chdir to $args->{dir}";
    chdir $args->{dir} or last RUN_H_CRADDLE;
    $user = get_uid($args->{user});
    foreach $g (split(/\s+/, $args->{group})) {
      $group .= get_gid($g) . ' ';
    }
    if ($args->{pre_exec} =~ /\S/) {
      eval {
        local $SIG{ALRM} = sub {die "q_pre timeout\n";};
        alarm 10;
        qlog "alarm 10\n" if ($_d_alarm_log);
        system("$args->{pre_exec}");
        qlog "pre-exec ($args->{pre_exec}) Succeed\n";
        alarm 0;
        qlog "alarm 0\n" if ($_d_alarm_log);
      };
    }
    if ($args->{just_exec} =~ /\S/) {
      launch(10, $args->{just_exec}, "$args->{owner}.$args->{id}");
    }

    qlog "forking!\n";
    $pid = fork();
    unless (defined $pid) {
      $answer->{error} = "Cannot fork.";
      last RUN_H_CRADDLE;
    }
    if ($pid < 0) {
      $answer->{error} = "Cannot fork.";
      last RUN_H_CRADDLE;
    }

    if ($pid > 0) {    # parent
      qlog "My child->$pid ($args->{owner}/$args->{id})\n";

      $t = "$args->{owner}:$args->{id}";
      $tasks{$t} = Task::new();
      $tasks{$t}->set_head($pid);
      $tasks{$t}->set_attr('rsh_pid', $args->{rsh_pid});
      $tasks{$t}->set_attr('id',      $args->{id});
      $tasks{$t}->set_attr('user',    $args->{user});
      $tasks{$t}->set_attr('owner',   $args->{owner});
      $tasks{$t}
        ->set_attr('hard_kill_after_head', get_setting('hard_kill_after_head'));
      $tasks{$t}->set_attr('group',    $args->{group});
      $tasks{$t}->set_attr('com_line', $args->{com_line});
      $tasks{$t}->set_attr('temp_dir', $args->{temp_dir});
      $tasks{$t}->set_attr('env',      $args->{env});
      $tasks{$t}->set_attr('dir',      $args->{dir});

      undef $answer->{error};
      save_state();
      return $pid;
    } else {    # child

      $my_name .= "[ch]";
      qlog "Child ($$)!\n";

      # Change process group
      #
      #      setpgrp($$,$$);

      $< = $> = get_uid($args->{user});

      # Override temp dir
      if (-e $args->{temp_dir}) {
        qlog
"Warning! Temp dir '$args->{temp_dir}' already exists! Reset to /tmp\n";
        $args->{temp_dir} = "/tmp";
      }
      unless (mkdir($args->{temp_dir}, 0700)) {
        qlog
"Warning! Cannot create temp dir '$args->{temp_dir}'! Reset to /tmp\n";
        $args->{temp_dir} = "/tmp";
      }
      $ENV{TEMP_DIR} = $args->{temp_dir};

      # Change path...
      $ENV{PATH} =
          get_setting('path_prepend') . ':'
        . $ENV{PATH} . ':'
        . get_setting('path_append');
      $ENV{P4_RSHCOMMAND} =
        get_setting('global_rsh_command');    #'/home/root/q4/rsh';

      #qlog "PATH=$ENV{PATH};\n";

      qlog "Use uid=$<,$>;gid=$(,$)\n";

      chdir($args->{dir}) or chdir('/tmp');

      foreach $e (@{$args->{env}}) {
        unless ($e =~ /^(\S+)\s*=\s*(.*)$/) {
          qlog "ERROR! Bad env: '$e'\n";
          next;
        }
        $ENV{$1} = $2;
        qlog "ENV '$1'=$2.\n";
      }

      # Execute!
      qlog "Executable: '$args->{com_line}'\n";

      #      eval { close $LST; };
      #      eval { close $SH; };
      #      eval { close $From_server; };
      #      eval { close $To_server; };
      $SRV->disconnect;

      #      close $STATUS;
      #      $STATUS=new IO::File(">/tmp/qqq");
      qlog "Opening stdin ($args->{stdin})\n";
      if (($args->{stdin} =~ /\S/) && ($args->{stdin} ne '-')) {
        qlog "Opening stdin ($args->{stdin})!!!!\n";
        $args->{stdin} =~ tr/\|\`\&\#\$\@\<\>//;

        #sysopen(STDIN,"$args->{stdin}",O_RDONLY)
        #  or qlog "Cannot open input file ($args->{stdin})\n";
        my $fd = POSIX::open("$args->{stdin}", O_RDONLY | O_CREAT);
        POSIX::dup2($fd, 0);
        qlog "opened stdin ($args->{stdin})\n";

        #        usr1_processor();
      } else {

#qlog "redirect stdin to pipe ".(0+$pipe_stdin->fileno)."/".(0+fileno(STDIN))."\n";
        qlog "redirect stdin to null\n";

        # redirect to /dev/null
        my $fd = POSIX::open('/dev/null', O_RDONLY);
        POSIX::dup2($fd, 0);

        #        open(STDIN,'</dev/null');
        #          if(!defined (POSIX::dup2(INULL->fileno,0))){
        #            qlog "Cannot redirect stdin to pipe [$!]\n";
        #          }
      }
      qlog "Use3\n";
      if (($args->{stderr} =~ /\S/) && ($args->{stderr} ne '-')) {
        $args->{stderr} =~ tr/\|\`\&\#\$\@\<\>//;

#        sysopen(STDERR,"$args->{stderr}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
#          or die "Cannot open error file ($args->{stderr})\n";
        my $fd = POSIX::open("$args->{stderr}",
          O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE);
        POSIX::dup2($fd, 2);
        qlog "Opened ($args->{stderr}) for stderr\n";
      } else {
        if ($args->{outfile} =~ /\S/) {

#          sysopen(STDERR,"$args->{outfile}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
#            or qlog "Cannot open errors file ($args->{outfile})\n";
          my $fd = POSIX::open("$args->{outfile}",
            O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE);
          POSIX::dup2($fd, 2);
          qlog "Opened errors file ($args->{outfile})\n";
        } else {

          # redirect to pipe!
          #          open(STDERR,'>/dev/null');
          my $fd = POSIX::open('/dev/null',
            O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE);
          POSIX::dup2($fd, 2);
          qlog "stderr -> /dev/null\n";

        }
      }

      #      qlog  "Use2\n";
      if (($args->{stdout} =~ /\S/) && ($args->{stdout} ne '-')) {
        $args->{stdout} =~ tr/\|\`\&\#\$\@\<\>//;

#        sysopen(STDOUT,"$args->{stdout}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
#          or die "Cannot open stdout file ($args->{stdout})\n";
        my $fd = POSIX::open("$args->{stdout}",
          O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE);
        POSIX::dup2($fd, 1);
        qlog "Opened ($args->{stdout}) for stdout\n";
      } else {
        if ($args->{outfile} =~ /\S/) {

#          sysopen(STDOUT,"$args->{outfile}",O_WRONLY|O_APPEND|O_CREAT|O_LARGEFILE)
#            or qlog "Cannot open output file ($args->{outfile}) [$!]\n";
          my $fd = POSIX::open("$args->{outfile}",
            O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE);
          POSIX::dup2($fd, 1);
          qlog "Opened ($args->{outfile}) for outfile\n";
        } else {

          # redirect to pipe!
          qlog "Redirect output to null\n";

          #          open(STDOUT,'>/dev/null');
          my $fd = POSIX::open('/dev/null',
            O_WRONLY | O_APPEND | O_CREAT | O_LARGEFILE);
          POSIX::dup2($fd, 1);

          #          POSIX::dup2(ONULL->fileno,1);
          #          POSIX::dup2($pipe_stdout->fileno(),fileno(STDOUT));
          #          fcntl(STDOUT,F_SETFL,fcntl(STDOUT,F_GETFL,0)|O_WRONLY);
          qlog "redirect out to pipe\n";
        }
      }

      no strict;
      qlog "Execute...\n";
      eval {close $STATUS;};
      setpriority(0, $$, 0);
      exec($args->{com_line});
      qlog "print \"Cannot execute '$args->{com_line}' on $my_name\n";
      exit(1);
    }
  }
  qlog "Error: $answer->{error}\n";
  return -1;
}

sub statfiles($$ ) {
  my ($mask, $list) = @_;

  my ($path, $filemask, $i, $j);

  #  qlog "Statfiles $mask\n";
  $mask =~ m{(/.*)/([^/]+)};
  ($path, $filemask) = ($1, $2);
  chdir($path) or return;
  opendir(D, ".") or return;

  #  qlog "Current dir: $path; mask=$filemask\n";
  foreach $i (readdir(D)) {
    next unless -f $i;
    eval "(\$j)=\$i =~ m{^($filemask)\$};";
    if ($j ne '') {

      #      qlog ">> $j\n";
      push @$list, $i;
    }
  }
  closedir(D);

  #  qlog "Files:".join(';',@$list)."\n";
}

#
#  Saves the line, named by first argument with
#  value REFERENCED by second agrument.
#
#############################################################
sub save_xml($$ ) {
  if ( (ref($_[1]) eq 'REF')
    || (ref($_[1]) eq 'CODE')
    || (ref($_[1]) eq 'GLOB')) {
    return;
  }
  my $sav = $_[1];

  print SAV " <$_[0]>\n";
  if (ref($_[1]) eq 'ARRAY') {
    if (scalar(@{$sav}) > 0) {
      print SAV " <array><el>" . join('</el><el>', @{$sav}) . '</el></array>';
    } else {
      print SAV ' <array></array>';
    }
  } elsif (ref($sav) eq 'HASH') {
    print SAV ' <hash>'
      . join('', map {'<$_>$sav{$_}</$_>'} keys(%{$sav}))
      . '</hash>';
  } elsif ($_[1] =~ y/\0\n\r//) {
    my $p = pack('u', $sav);
    $p =~ y/\n\r//d;
    print SAV "<packed>$p</packed>";
  } else {
    print SAV "<scalar>$sav</scalar>";
  }
  print SAV "</$_[0]>\n";
}    #~save_line

#
#  Loads the REST of line (only value!).
#  Args: rest of line
#  Returns readed scalar or the ref to a hash or an array.
#
###########################################################
sub load_line( $ ) {
  if (substr($_[0], 0, 2) eq "\0A") {
    my @x = split("\0", substr($_[0], 2));
    qlog "Loaded array: " . join(';', @x) . "\n";
    return \@x;
  } elsif (substr($_[0], 0, 2) eq "\0H") {
    my %x = split("\0", substr($_[0], 2));
    qlog "Loaded hash: " . join(';', %x) . "\n";
    return \%x;
  } elsif (substr($_[0], 0, 2) eq "\0U") {
    my $x = unpack('u', substr($_[0], 2));
    qlog "Unpacked: $x\n";
    return $x;
  } else {
    my $x = $_[0];
    return $x;
  }
}    #~load_line

#
#  Saves the monitor state
#
#########################################################
sub save_state() {

  my ($i, $attr, $val);

  $i = get_setting('mon_save');
  if (!rename($i, "$i.bak")) {
    qlog "Cannot to create backup of '$i'\n";
  }
  if (!open(SAV, ">$i")) {
    qlog "Cannot save status to '$i'\n";
    return;
  }
  qlog "Save status to '$i' " . (caller(1))[2] . "  " . (caller(1))[3] . "\n",;

  print SAV "<xml>\n";
  foreach $i (keys(%tasks)) {
    if ($i eq '') {
      qlog "Warning! Empty task found. Delayed delete...\n";

      #      delete $tasks{$i};
      $tasks{$i}->mark_dead();
      $tasks{$i}->set_attr('ttl', time + get_setting('dead_cleanup_time'));
      next;
    }

    #    if ( $i < 1 ) {
    #      qlog "Warning! Bad task found ($i). Deleted\n";
    #      delete $tasks{$i};
    #      next;
    #    }
    qlog "save $i\n";
    print SAV "<task>\n <task_id>$i</task_id>\n";
    my @pids = $tasks{$i}->get_pids();
    save_xml('pids', \@pids);
    foreach $attr ($tasks{$i}->list_attrs()) {
      save_xml($attr, $tasks{$i}->get_attr($attr));
    }
    print SAV "</task>\n";
  }
  print SAV "</xml>\n";
  close SAV;
  qlog "Saving done\n";
}

#
#  Loads the monitor state
#
#########################################################
sub old_load_state() {

  my ($key, $val, %e);

  $val = get_setting('mon_save');
  unless (open(SAV, "<$val")) {
    qlog "Cannot LOAD status from '$val'\n";
    return;
  }

  <SAV>;
  %e   = ();
  %ran = ();
  qlog "Loading state ($val)\n";
  while (<SAV>) {
    chomp;
    if (/^\#/) {    # new entry
      if ($e{pid} > 0) {
        %{$ran{$e{pid}}} = %e;
        qlog "$e{pid} loaded\n";
      } else {
        qlog "Warning! Bad pid! (" . join(';', %e) . "\n";
      }
      %e = ();
      next;
    }
    m/^(\S+):\s(.*)/;
    ($key, $val) = ($1, $2);
    $e{$key} = load_line($val);
  }
  if ($e{pid} > 0) {
    %{$ran{$e{pid}}} = %e;
    qlog "$e{pid} loaded\n";
  } else {
    qlog "Warning! No pid! (" . join(';', %e) . "\n";
  }

  close SAV;
  qlog "Loading done\n";
  update_pids();
  update_childs();
}

sub load_array( $ ) {
  my $attr_content = shift;
  my (@array, $val);

  $attr_content->to_first_node();
  while (defined($val = $attr_content->next_node())) {
    push @array, $val->get_val();
  }
  return \@array;
}

sub load_hash( $ ) {
  my $attr_content = shift;
  my (%hash, $val);

  $attr_content->to_first_node();
  while (defined($val = $attr_content->next_node())) {
    $hash{$val->get_name()} = $val->get_val();
  }
  return \%hash;
}

sub load_state() {

  my (
    $state,        $name, $node,    $top,  @stack,
    @lines,        $val,  $topnode, $task, $attr,
    $attr_content, $info, $task_id, $value
  );

  $val = get_setting('mon_save');
  unless (open(SAV, "<$val")) {
    qlog "Cannot LOAD status from '$val'\n";
    return;
  }

  @lines = <SAV>;
  $state = join(' ', @lines);
  undef @lines;
  close SAV;

  $stack[0] = XMLNODE::new('root');
  $top = 0;
  while (1) {

    #    qlog "DEBUG: parsing '$state'\n";
    if ($state =~ s/^\s*<([\w:.\/-]+)>//) {
      $name = $1;

      # closing?
      if ($name =~ s/^\///) {
        if ($name eq $stack[$top]->get_name()) {
          undef $stack[$top];    # it is saved in upper node info
          --$top;
        } else {

          # error
          qlog "Error closing: '$name' found, but '"
            . $stack[$top]->get_name()
            . "' expected.\n";
        }
        next;
      }

      # new element

      my $node = XMLNODE::new($name);
      $stack[$top]->add_node($node);
      $stack[++$top] = $node;
    }

    # not <> => data
    elsif ($state =~ s/^\s*([^<]+)</</) {
      $stack[$top]->set_val($1);
    }

    # end of text or ERROR
    else {
      if ($state !~ /^\s*$/) {
        qlog "Error: cannot parse '$state'\n";
      }
      last;
    }
  }

  # Loaded data into xml tree...
  # Now decode it!

  $top = 0;
  $stack[0]->to_first_node();
  $topnode = $stack[0]->next_node();
  if (!defined $topnode) {
    qlog "Warning! Cannot load saved state! Ignore...\n";
    return;
  }
  if ($topnode->get_name ne 'xml') {
    qlog "ERROR! Bad saved state: no <xml> top node\n";
    return;
  }

  # foreach all tasks...
  $topnode->to_first_node();
  while (defined($task = $topnode->next_node())) {
    if ($task->get_name() eq 'task') {

      $task->to_first_node();
      $info = Task::new;

      $task_id = '';
      while (defined($attr = $task->next_node())) {

        #$attr=$task->get_val();

        $value = $attr->get_val();
        qlog "attr: " . $attr->get_name() . "; " . $attr->get_val() . "\n";

        #is it complicated?
        if ($attr->count_nodes() > 0) {
          $attr->to_first_node();
          $attr_content = $attr->next_node();
          if ($attr_content->get_name eq 'array') {
            $value = load_array($attr_content);
          } elsif ($attr_content->get_name eq 'hash') {
            $value = load_hash($attr_content);
          } elsif ($attr_content->get_name eq 'packed') {
            $attr_content->to_first_node();
            $val = $attr_content->next_node();
            $val = $val->get_val();

            $value = unpack('u', $val);
          } elsif ($attr_content->get_name eq 'scalar') {

            #$attr_content->to_first_node();
            #$val=$attr_content->next_node();
            #$val=$val->get_val();
            $val = $attr_content->get_val();

            $value = $val;
          } else {
            $value = $attr_content->get_val();
          }
        }

        if ($attr->get_name eq 'task_id') {
          $task_id = $value;
        } elsif ($attr->get_name eq 'pids') {
          foreach my $p (@$value) {
            $info->add_pid($p);
          }
        } elsif ($attr->get_name eq 'is_dead') {
          if ($value > 0) {
            $info->mark_dead();
          }
        } else {
          $info->set_attr($attr->get_name(), $value);
        }
      }
      if ($task_id eq '') {
        qlog "ERROR: Loaded task with empty id! Ignored.\n";
        next;
      }
      if ( $info->get_attr('owner') eq ''
        or $info->get_attr('id')   eq ''
        or $info->get_attr('user') eq '') {
        qlog "ERROR: Bad task loaded: owner="
          . $info->get_attr('owner') . ' id='
          . $info->get_attr('id')
          . ' user='
          . $info->get_attr('user')
          . ". Ignored.\n";
        next;
      }

      # add info to tasks
      $tasks{$task_id} = $info;
    }

    # if task name is NOT 'task'...
    else {
      qlog 'Warning! Non-task info on top level ('
        . $task->get_name()
        . "). Ignored.\n";
    }
  }

  # all tasks loaded...

  qlog "Loading done\n";
  update_pids();
  update_childs();
}

#
#  Loads config file.
#  args: 1 - file name
#        2 - (opt) safety (1 - unsafe load, 0 - safe)
#
#  ret:  0 if success, 1 if fail to open file
#
################################################################
sub load_config( $;$ ) {
  my ($file, $unsafe) = @_;
  my ($var, $val);

  open(IN, "<$file") or return 1;

  while (<IN>) {
    chomp;
    next if (/^\s*\#/);
    next if (/^\s*$/);

    unless ($_ =~ m/^\s*(\S+)\s*\=\s*(.*)$/) {
      qlog "Bad string in config file: $_\n";
      next;
    }
    ($var, $val) = ($1, $2);
    $val =~ s/^\s+//;
    $val =~ s/\s+$//;
    if (assign_new_value(\%new_global_settings, $var, $val, $unsafe, 'g')) {
      qlog "Bad option in config file: '$_'\n";
    }
  }
  close IN;
  return 0;
}

#
#  Assigns new value to hash element according %opt_types
#
#  args: 1 hash reference
#        2 hash key
#        3 value to be assigned
#        4 unsafe flag (0 - safe assign, 1 - forced)
#        5 (opt) section name (a letter)
#
#  ret:  returns 0 if succeed, nonzero if not
#
###########################################################
sub assign_new_value( $$$$;$ ) {
  my ($hash, $key, $val, $unsafe, $s) = @_;

  if (exists $opt_types{$key}) {
    my ($type, $safe, $cumul, $section) = @{$opt_types{$key}};
    if (!$unsafe && ($safe ne 'y')) {
      return 1;
    }
    if (($s ne '') and ($section ne '') and ($section !~ m/$s/)) {
      return 1;
    }

    if ($type eq 't') {    # text
      if ($cumul) {
        $hash->{$key} .= $val;
      } else {
        $hash->{$key} = $val;
      }
    } elsif ($type eq 'n') {    # numeric
      if ($val !~ /^(\d+)/) {
        qlog
          "Warning! Bad value for $key (must be numeric, but '$val' found)\n";
        return 1;
      }
      $hash->{$key} = $1;
    } elsif ($type eq 'h') {    # hash
      if ($val !~ /^(\S+)\s+(.*)$/) {
        qlog "Warning! Bad value for $key (must be hash, but '$val' found)\n";
        return 1;
      }
      $hash->{$key}->{$1} = $2;
    } elsif ($type eq 'l') {    # list via space
      if ($cumul) {
        push @{$hash->{$key}}, split(/\ +/, $val);
      } else {
        @{$hash->{$key}} = split(/\ +/, $val);
      }
    } elsif ($type eq 'L') {    # list via coma, semicolon or space
      if ($cumul) {
        push @{$hash->{$key}}, split(/[\s\;\,]/, $val);
      } else {
        @{$hash->{$key}} = split(/[\s\;\,]/, $val);
      }
      qlog "LIST '$key' :" . join(';', @{$hash->{$key}}) . ";\n";
    } elsif ($type eq "\@") {    # list via coma
      if ($cumul) {
        push @{$hash->{$key}}, split(/\,/, $val);
      } else {
        @{$hash->{$key}} = split(/\,/, $val);
      }
    }
  } else {
    return 1;
  }
  return 0;
}

sub get_setting( $ ) {
  my ($sname) = @_;

  return $global_settings{$sname} if (exists $global_settings{$sname});
  return undef;
}

#
#  deletes dir recursively as user
#  returns true if succeed, false if not
#
sub deldir($;$ ) {    #delete directory recursively
  my ($arg, $u) = @_;
  my $file;

  $arg =~ s!/+$!!;    # remove trailing slash(es)

  $arg =~ tr{\;\*\&\\~\?\<\>\|\'\"\`}{}d;    # remove uliked symbols
  $arg =~ s!/\.\./!!g;                       # remove updirs

  qlog "DELDIR '$arg'\n";
  unless ($arg) {
    qlog "Invalid arg for deldir: '$arg'\n";
    return 1;
  }

  if ( ($arg eq '/')
    or ($arg eq '/usr')
    or ($arg eq '/bin')
    or ($arg eq '/sbin')
    or ($arg eq '/var')
    or ($arg eq '/etc')) {
    qlog "DANGER!!! try to remove $arg\n";
    return 3;
  }
  unless (-d "/$arg") {
    qlog "No such dir '$arg'\n";
    return 2;
  }

  $file = "/bin/rm -rf '/$arg'";

  {
    local $ENV{PATH} = '/bin:/usr/bin:/usr/local/bin';
    if ($u) {
      launch(0, $file, '', get_uid {$u}, get_gid {$u});
    } else {
      launch(0, $file, '');
    }
  }
  return 0 if (-e "/$arg");
  return 1;
}    # deldir

sub pack_value( $ ) {
  my ($tmp, $i);

  if (ref($_[0]) eq 'ARRAY') {
    $tmp = "\0A";
    for ($i = 0; $i < scalar(@{$_[0]}); ++$i) {
      $tmp .= pack_value(${$_[0]}[$i]);
    }
    $tmp .= "\0E";
    return $tmp;
  } elsif (ref($_[0]) eq 'HASH') {
    $tmp = "\0H";
    foreach $i (keys(%{$_[0]})) {
      $tmp .= "$i" . pack_value($_[0]->{$i});
    }
    $tmp .= "\0E";
    return $tmp;
  } elsif (ref($_[0]) eq 'REF') {
    return undef;
  } elsif (ref($_[0]) eq 'CODE') {
    return undef;
  } elsif (ref($_[0]) eq 'GLOB') {
    return undef;
  } elsif ($_[0] =~ y/\0\n\r//) {
    $tmp = pack('u', $_[0]);
    $tmp =~ s/\n//g;
    return "\0U${tmp}\0E";
  }
  return "\0S" . $_[0] . "\0E";
}

#
#  Unpacks string encoded by pack_value
#  Args:
#         1 - reference to result variable (must be scalar!)
#         2 - string to decode
#         3 - (optional) index to start from
#  Ret:
#         new index in source string (just after final \0[E])
#
#######################################################
sub unpack_value( $$;$ ) {
  my ($res, $val, $index) = @_;
  my ($tmp, $i2, $my_res);

  $index ||= 0;
  undef $$res;
  $tmp = substr($val, $index, 1);
  qlog "UPCK: '" . substr($val, $index) . "'\n"
    if get_setting('debug_pc');
  if ($tmp eq "\0") {    # complex type
    ++$index;
    $tmp = substr($val, $index, 1);
    ++$index;
    if ($tmp eq 'E') {    #end mark
      return $index;
    } elsif ($tmp eq "A") {    #array
      @$my_res = ();
      for (;;) {
        if (substr($val, $index, 2) eq "\0E") {    #end
          $index += 2;
          last;
        }
        $index = unpack_value(\$tmp, $val, $index);
        push @$my_res, $tmp;
      }
      qlog "Unpacked array\n" if get_setting('debug_pc');
    } elsif ($tmp eq "H") {    #hash
      my $key;
      %$my_res = ();
      for (;;) {
        if (substr($val, $index, 2) eq "\0E") {    #end
          $index += 2;
          last;
        }
        $i2 = index($val, "\0", $index);
        $i2 = length($val)
          if ($i2 < 0);                            #Ooops! Not found terminator
        $key = substr($val, $index, $i2 - $index);

        $index = unpack_value(\$tmp, $val, $i2);
        $my_res->{$key} = $tmp;
      }
      qlog "Unpacked hash " . join(';', keys(%$my_res)) . "\n"
        if get_setting('debug_pc');
    } elsif ($tmp eq "U") {    #uuencode
      $i2 = index($val, "\0E", $index);
      $i2 = length($val) if ($i2 < 0);    #Ooops! Not found terminator
      $my_res = unpack('u', substr($val, $index, $i2 - $index));
      $index = $i2 + 2;
      qlog "Unpacked uu '$my_res'\n" if get_setting('debug_pc');
    } elsif ($tmp eq "S") {               #simple scalar
      $i2 = index($val, "\0E", $index);
      $i2 = length($val) if ($i2 < 0);    #Ooops! Not found terminator
      $my_res = substr($val, $index, $i2 - $index);
      $index = $i2 + 2;
      qlog "Unpacked '$my_res'\n" if get_setting('debug_pc');
    } else {
      qlog "Warning! error while decoding in pos $index '$val'\n";
      $my_res = '';
    }
  } else {    #simple scalar
    qlog "Malformed scalar! (" . substr($val, $index, -1) . "\n";
  }
  $$res = $my_res;
  return $index;
}

__END__
#
#  PART of INIT_ATTACH HANDLER, which does actual work
#
#  OLD
#
####################################################################
sub init_attach_real_handler_OLD($$$$$$) {
  my ( $hash, $from, $attach_parent_mask, $attach_exe_mask, $attach_user,
       $tmout )
    = @_;
  my (%x,$t);

  qlog
    "REAL init_attach  ($attach_parent_mask/$attach_exe_mask/$attach_user)\n";

  %x = ( 'node' => $my_name );
  subst_task_prop( \$attach_exe_mask,    \%x );
  subst_task_prop( \$attach_parent_mask, \%x );
  @attach_collected =
    collect_pids( $attach_parent_mask, $attach_exe_mask, $attach_user );
  qlog
    "init_attach successfull ($attach_parent_mask/$attach_exe_mask/$attach_user)"
      . scalar(@attach_collected) . "\n";
  qlog "PIDS: " . join( ';', @attach_collected ) . "\n";

  answer_to_server( $from, $hash, 'init_attach', 0, 'success', '1' );
  $delayed_requests{init_attach}->{timeout} = time + $tmout - 1;
  $delayed_requests{init_attach}->{blocked} = 1;
}

#
#   INIT_ATTACH         OLD HANDLER
#
#####################################################################
sub init_attach_handler_OLD( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;

  $attach_parent_mask = $args->{parent_mask};
  $attach_exe_mask    = $args->{exe_mask};
  $attach_user        = $args->{user};

  qlog "HANDLER: INIT_ATTACH1 (id=$args->{id}, ".
       "owner=$args->{owner}, user=$args->{user})\n";
#  qlog "init_attach  ($attach_parent_mask/$attach_exe_mask/$attach_user)\n";
  if ( $delayed_requests{init_attach}->{blocked} ) {

    # some attach is in progress... wait.

    qlog "Another attach in progress. Switch!!!\n";

    push @delayed_attaches,
      [
       $hash,            $from,        $attach_parent_mask,
       $attach_exe_mask, $attach_user, $args->{tmout} ];
    return;
  }

  my $t="$args->{owner}:$args->{id}";
  $tasks{$t}=Task::new();

  init_attach_real_handler( $hash, $from, $attach_parent_mask,
                            $attach_exe_mask, $attach_user, $args->{tmout} );
  return;
}                               # init_attach_handler_OLD

#
#   OLD ATTACH         HANDLER
#
#####################################################################
sub attach_handler_OLD( $$$$ ) {
  my ( $type, $hash, $from, $args ) = @_;
  my @collected;
  my %args2 = %$args;

  qlog "HANDLER: ATTACH id=$args->{id}, owner=$args->{owner}, ".
       "user=$args->{user}, tmout=$args->{tmout}, all=$args->{all}\n";

  push @collected, @attach_collected;
  $args2{collected}= \@collected;
  $args2{from}     = $from;
  $args2{hash}     = $hash;
  $args2{id}       = $args->{id};
  $args2{owner}    = $args->{owner};
  $args2{user}     = $args->{user};
  $args2{user}     = $attach_user if ($args2{user} eq '');
  $args2{tmout}    = 60 if ( $args->{tmout} < 1 );
  $args2{tmout}    += time;

  push @attach_requests, \%args2;

  sleep 1;
  attach_handler_second_stage( \%args2 );
}                               # attach_handler_OLD

#
#   ATTACH         HANDLER (SECOND STAGE)
#
#####################################################################
sub attach_handler_second_stage_OLD( $ ) {
  my $args = $_[0];
  my ( @new_coll, @new_attached, $collected, $p, $times, $i, $t );

  $collected = $args->{collected};

  #  return if ( scalar(@$attached) > 0 and $args->{all} == 0 );


  $t="$args->{owner}:$args->{id}";

  qlog "ATT2: '$t' ($args->{user})\n";

  if(!defined $tasks{$t}){
      qlog "Warning! Task '$t' does not exists yet (init_attach failed?)\n";
      $tasks{$t}=Task::new();
  }
#  $tasks{$t}->set_head($new_pid);
  $tasks{$t}->set_attr('rsh_pid',$args->{rsh_pid});
  $tasks{$t}->set_attr('id',$args->{id});
  $tasks{$t}->set_attr('user',$args->{user});
  $tasks{$t}->set_attr('owner',$args->{owner});
  $tasks{$t}->set_attr('hard_kill_after_head',get_setting('hard_kill_after_head'));

  if($args->{temp_dir} ne ''){
    launch(0,"mkdir $args->{temp_dir}", "mkdir-$t", $args->{user});
    launch(0,"chmod 0700 $args->{temp_dir}", "chmod-$t", $args->{user});
  }

  update_pids();
  update_childs();
  @new_coll =
    collect_pids( $attach_parent_mask, $attach_exe_mask, $attach_user );
  if ( @new_coll > @$collected ) {
    qlog "Collected($attach_parent_mask,$attach_exe_mask,$attach_user): "
      . scalar(@new_coll) . "\n";
    qlog "Was collected: " . scalar(@$collected) . "\n";
  ATTACH_LOOP:
    foreach $p (@new_coll) {

      foreach $i (values(%tasks)){
        # skip already runned tasks
        next ATTACH_LOOP if($i->check_pid($p));
        # skip cleo launching processes
        next ATTACH_LOOP if($all_pids->{$p}->{name} =~ /^CLEO LAUNCH/);
      }

      #skip already counted
      next if grep { $_ eq $p } @$collected;

      # take in account
      push @new_attached, $p;
      qlog "Attach to $args->{owner}:$args->{id} successfull - $p".
           " ($all_pids->{$p}->{cmdline})\n";

      unless ( $args->{all} ) {
        qlog "Exit attaching!\n";
        last;
      }
    }
  }
  $args->{collected} = \@new_coll;

  # all new pids were collected
  return unless (@new_attached);

  # add pids to the task
  foreach $p (@new_attached) {
    $tasks{$t}->add_pid($p);

#    qlog "ATT: $p: $ran{$p}->{id}/$ran{$p}->{owner}/$ran{$p}->{name};\n";
    return unless ( $args->{all} );
  }
  save_state();

  #  answer_to_server($from,$hash,'attach',$args->{id},'success','1');
} # attach_handler_second_stage_OLD

