#!/usr/bin/perl -w
#
# Copyright (c) 2006, 2007 Michael Schroeder, Novell Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# The Job Dispatcher
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  # FIXME: currently the bs_dispatcher makes assumptions on being in a
  # properly set up working dir, e.g. with subdirs 'worker' and
  # 'build'.  Either that is cleaned up or this stays in, for the sake
  # of startproc and others being able to start a bs_srcserver without
  # knowing that it has to be started in the right directory....

  chdir "$wd";
  unshift @INC,  "build";
  unshift @INC,  ".";
}

use POSIX;
use Data::Dumper;
use Digest::MD5 ();
use List::Util;
use Fcntl qw(:DEFAULT :flock);
use XML::Structured ':bytes';
use Storable;

use BSConfig;
use BSRPC;
use BSUtil;
use BSXML;

use strict;

my $nosrcchangescale = 3;	# -4.77

my %powerpkgs;

if ($BSConfig::powerpkgs) {
  %powerpkgs = map {$_ => 1} @{$BSConfig::powerpkgs || []};
}

my $user = $BSConfig::bsuser;
my $group = $BSConfig::bsgroup;

!defined($user) || defined($user = (getpwnam($user))[2]) || die("unknown user\n");
!defined($group) || defined($group = (getgrnam($group))[2]) || die("unknown group\n");
if (defined $group) {
  ($), $() = ($group, $group);
  die "setgid: $!\n" if ($) != $group);
}
if (defined $user) {
  ($>, $<) = ($user, $user);
  die "setuid: $!\n" if ($> != $user);
}

my $port = 5252;        #'RR'
$port = $1 if $BSConfig::reposerver =~ /:(\d+)$/;

my %cando = (
# this code sucks and is on the list to be rewritten
# switch on next 3 lines if you want arm, mips, ppc and sh4 qemu emulated builds on a x86 worker
#  'i586'    => [          'i586',         'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el', 'mips', 'mipsel', 'mips64', 'mips64el', 'ppc', 'ppc64', 'sh4'],
#  'i686'    => [          'i586', 'i686', 'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el', 'mips', 'mipsel', 'mips64', 'mips64el', 'ppc', 'ppc64', 'sh4'],
#  'x86_64'  => ['x86_64', 'i586', 'i686', 'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el', 'mips', 'mipsel', 'mips64', 'mips64el', 'ppc', 'ppc64', 'sh4'],
# switch on next 3 lines if you want only arm qemu emulated builds on a x86 worker
  'i586'    => [          'i586',         'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el', 'mips', 'mipsel',       'sh4'],
  'i686'    => [          'i586', 'i686', 'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el', 'mips', 'mipsel',       'sh4'],
  'x86_64'  => ['x86_64', 'i586', 'i686', 'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el', 'mips', 'mipsel',       'sh4'],
#
  'ppc'     => [                                                                                        'ppc'                ],
  'ppc64'   => [                                                                                        'ppc', 'ppc64',      ],
  'armv4l'  => [                          'armv4l'                                                                           ],
  'armv5el' => [                          'armv4l', 'armv5el',                                                               ],
  'armv6el' => [                          'armv4l', 'armv5el', 'armv6el'                                                     ],
  'armv7el' => [                          'armv4l', 'armv5el', 'armv6el', 'armv7el'                                          ],
  'armv8el' => [                          'armv4l', 'armv5el', 'armv6el', 'armv7el', 'armv8el'                               ],
  'sh4'     => [                                                                                                        'sh4'],
  'parisc'  => ['hppa', 'hppa64:linux64'],
  'parisc64'=> ['hppa64', 'hppa:linux32'],
  'ia64'    => ['ia64'],
  's390'    => ['s390'],
  's390x'   => ['s390x', 's390'],
  'sparc'   => ['sparcv8', 'sparc'],
  'sparc64' => ['sparc64v', 'sparc64', 'sparcv9v', 'sparcv9', 'sparcv8:linux32' , 'sparc:linux32'],
  'mips'    => ['mips'],
  'mips64'  => ['mips64', 'mips'],
  'local'   => ['local'],
);

# 4h build will add .5 to the load
# 4h idle will half the load
my $decay = log(.5)/(4*3600);

my $rundir = $BSConfig::rundir || "$BSConfig::bsdir/run";
my $workersdir = "$BSConfig::bsdir/workers";
my $jobsdir = "$BSConfig::bsdir/jobs";
my $eventdir = "$BSConfig::bsdir/events";

my $reporoot = "$BSConfig::bsdir/build";

sub getcodemd5 {
  my ($dir, $cache) = @_;
  my $md5 = '';
  my %new;
  my $doclean;
  my @files = grep {!/^\./} ls($dir);
  my @bfiles = grep {!/^\./} ls("$dir/Build");
  my %bfiles = map {$_ => 1} @bfiles;
  @files = sort(@files, @bfiles);
  $cache ||= {};
  for my $file (@files) {
    my $f = $bfiles{$file} ? "$dir/Build/$file" : "$dir/$file";
    next unless -f $f;
    my @s = stat _;
    my $id = "$s[9]/$s[7]/$s[1]";
    $new{$id} = 1; 
    if ($cache->{$id}) {
      $md5 .= "$cache->{$id}  $file\n";
      next;
    }    
    $cache->{$id} = Digest::MD5::md5_hex(readstr($f));
    $md5 .= "$cache->{$id}  $file\n";
    $doclean = 1; 
  }
  if ($doclean) {
    for (keys %$cache) {
      delete $cache->{$_} unless $new{$_};
    }    
  }
  return Digest::MD5::md5_hex($md5);
}

my $workerdircache = {};
my $builddircache = {};

my %badhost;

sub assignjob {
  my ($job, $idlename, $arch) = @_;
  local *F;

  print "assignjob $arch/$job -> $idlename\n";
  my $jobstatus = {
    'code' => 'dispatching',
  };
  if (!BSUtil::lockcreatexml(\*F, "$jobsdir/$arch/.dispatch.$$", "$jobsdir/$arch/$job:status", $jobstatus, $BSXML::jobstatus)) {
    print "job lock failed!\n";
    return undef;
  }

  # got the lock, re-check if job is still there
  if (! -e "$jobsdir/$arch/$job") {
    unlink("$jobsdir/$arch/$job:status");
    close F;
    print "job disappered!\n";
    return undef;
  }

  # prepare job data
  my $infoxml = readstr("$jobsdir/$arch/$job");
  my $jobid = Digest::MD5::md5_hex($infoxml);
  my $info = XMLin($BSXML::buildinfo, $infoxml);

  my $workercode = getcodemd5('worker', $workerdircache);
  my $buildcode = getcodemd5('build', $builddircache);

  # get the worker data
  my $worker = readxml("$workersdir/idle/$idlename", $BSXML::worker, 1);
  if (!$worker) {
    unlink("$jobsdir/$arch/$job:status");
    close F;
    print "worker is gone!\n";
    return undef;
  }

  eval {
    BSRPC::rpc({
      'uri'     => "http://$worker->{'ip'}:$worker->{'port'}/build",
      'timeout' => 10,
      'request' => "PUT",
      'headers' => [ "Content-Type: text/xml" ],
      'data'    => $infoxml,
    }, undef, "port=$port", "workercode=$workercode", "buildcode=$buildcode");
  };
  if ($@) {
    my $err = $@;
    print "rpc error: $@";
    unlink("$jobsdir/$arch/$job:status");
    close F;
    if ($err =~ /cannot build this package/) {
      $badhost{"$info->{'project'}/$info->{'package'}/$info->{'arch'}/@{[(split(':', $idlename, 2))[1]]}"} = time();
      return 'badhost';
    }
    unlink("$workersdir/idle/$idlename");	# broken client
    return undef;
  }
  unlink("$workersdir/idle/$idlename");	# no longer idle
  $jobstatus->{'code'} = 'building';
  $jobstatus->{'uri'} = "http://$worker->{'ip'}:$worker->{'port'}";
  $jobstatus->{'workerid'} = $worker->{'workerid'} if defined $worker->{'workerid'};
  $jobstatus->{'starttime'} = time();
  $jobstatus->{'hostarch'} = $worker->{'hostarch'};
  $jobstatus->{'jobid'} = $jobid;

  # put worker into building list
  $worker->{'job'} = $job;
  $worker->{'arch'} = $arch;
  mkdir_p("$workersdir/building");
  writexml("$workersdir/building/.$idlename", "$workersdir/building/$idlename", $worker, $BSXML::worker);

  # write new status and release lock
  writexml("$jobsdir/$arch/.$job:status", "$jobsdir/$arch/$job:status", $jobstatus, $BSXML::jobstatus);
  close F;
  return 'assigned';
}

sub sendeventtosrcserver {
  my ($ev) = @_;
  my @args;
  for ('type', 'project', 'package', 'repository', 'arch', 'job') {
    push @args, "$_=$ev->{$_}" if defined $ev->{$_};
  }
  my $param = {
    'uri' => "$BSConfig::srcserver/event",
    'timeout' => 10,
  };
  BSRPC::rpc($param, undef, @args);
}

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
print "starting build service dispatcher\n";

# get lock
mkdir_p($rundir);
open(RUNLOCK, '>>', "$rundir/bs_dispatch.lock") || die("$rundir/bs_dispatch.lock: $!\n");
flock(RUNLOCK, LOCK_EX | LOCK_NB) || die("dispatcher is already running!\n");
utime undef, undef, "$rundir/bs_dispatch.lock";

my $dispatchprios;
my $dispatchprios_project;
my $dispatchprios_id = '';

my %infocache;

while (1) {

  if (-s "$jobsdir/finished") {
    local *F;
    if (open(F, '<', "$jobsdir/finished")) {
      unlink("$jobsdir/finished");
      my $load = BSUtil::retrieve("$jobsdir/load", 1) || {};
      while (<F>) {
	next unless /\n$/s;
	my @s = split('\|', $_);
	s/%([a-fA-F0-9]{2})/chr(hex($1))/ge for @s;
	my ($projid, $repoid, $arch, $packid, $start, $end, $result, $workerid, $hostarch) = @s;
	next unless $start =~ /^[0-9]+$/s;
	next unless $end=~ /^[0-9]+$/s;
	next if $end <= $start;
	my $prpa = "$projid/$repoid/$arch";
	$load->{$prpa} = [0, 0] unless $load->{$prpa};
	my $l = $load->{$prpa};
	if ($l->[0] < $end) {
	  my $d = $end - $l->[0];
	  $l->[1] *= exp($decay * $d);
	  $l->[1] += (1 - exp($decay * ($end - $start)));
	  $l->[0] = $end;
	} else {
	  my $d = $l->[0] - $end;
	  $l->[1] += (1 - exp($decay * ($end - $start))) * exp($decay * $d);
	}
      }
      close F;
      BSUtil::store("$jobsdir/load.new", "$jobsdir/load", $load);
    }
  }

  my @dispatchprios_s = stat("$jobsdir/dispatchprios");
  if (!@dispatchprios_s) {
    $dispatchprios = undef;
    $dispatchprios_project = undef;
    $dispatchprios_id = '';
  } elsif ($dispatchprios_id ne "$dispatchprios_s[9]/$dispatchprios_s[7]/$dispatchprios_s[1]") {
    $dispatchprios_id = "$dispatchprios_s[9]/$dispatchprios_s[7]/$dispatchprios_s[1]";
    $dispatchprios = BSUtil::retrieve("$jobsdir/dispatchprios", 1);
    $dispatchprios_project = undef;
    if ($dispatchprios) {
      # create dispatchprios_project hash
      $dispatchprios_project = {};
      for (@{$dispatchprios->{'prio'} || []}) {
	$dispatchprios_project->{$_->{'project'}} ||= [] if defined $_->{'project'};
      }
      my @p = keys %$dispatchprios_project;
      push @p, ':all:';
      for (@{$dispatchprios->{'prio'} || []}) {
	if (defined($_->{'project'})) {
	  push @{$dispatchprios_project->{$_->{'project'}}}, $_;
	} else {
	  for my $p (@p) {
	    push @{$dispatchprios_project->{$p}}, $_;
	  }
	}
      }
    }
  }

  my $load = BSUtil::retrieve("$jobsdir/load", 1) || {};
  my $now = time();
  for my $prpa (sort keys %$load) {
    my $l = $load->{$prpa};
    my $ll = $l->[1];
    $ll *= exp($decay * ($now - $l->[0])) if $now > $l->[0];
    $load->{$prpa} = $ll;
  }
  
  my @idle = grep {!/^\./} ls("$workersdir/idle");
  my %idlearch;
  for my $idle (@idle) {
    my $harch = (split(':', $idle, 2))[0];
    for (@{$cando{$harch} || []}) {
      push @{$idlearch{$_}}, $idle;
    }
  }
  #print "finding jobs\n";
  my %jobs;
  my %maybesrcchange;
  for my $arch (sort keys %idlearch) {
    my $ic = $infocache{$arch} || {};
    my @b = grep {!/^\./} ls("$jobsdir/$arch");
    my %locked = map {$_ => 1} grep {/:status$/} @b;
    my %notlocked = map {$_ => 1} grep {!$locked{$_}} @b;
    for (grep {!$notlocked{$_}} keys (%{$infocache{$arch} || {}})) {
      delete $infocache{$arch}->{$_};
    }
    # adapt load
    for my $job (keys %locked) {
      my $jn = $job;
      $jn =~ s/:status$//;
      next unless $notlocked{$jn};
      $jn =~ s/-[0-9a-f]{32}$//s;
      my ($projid, $repoid, $packid) = split('::', $jn);
      next unless defined $packid;
      my $prpa = "$projid/$repoid/$arch";
      $load->{$prpa} ||= 0;
      $load->{$prpa} += 1;
    }
    @b = grep {!/:(?:dir|status|new)$/} @b;
    @b = grep {!$locked{"$_:status"}} @b;
    for my $job (@b) {
      my $info;
      if (0) {
        $info = readxml("$jobsdir/$arch/$job", $BSXML::buildinfo, 1);
      } else {
	my $jn = $job;
	$jn =~ s/-[0-9a-f]{32}$//s;
	my ($projid, $repoid, $packid) = split('::', $jn);
	$info = {'project' => $projid, 'repository' => $repoid, 'package' => $packid, 'arch' => $arch};
      }
      my $prpa = "$info->{'project'}/$info->{'repository'}/$info->{'arch'}";
      push @{$jobs{$prpa}}, $job;
      $info = $ic->{$job};
      if (!$info || ($info->{'reason'} && ($info->{'reason'} eq 'new build' || $info->{'reason'} eq 'source change'))) {
	$maybesrcchange{$prpa} = 1;
      }
    }
  }

  #print "calculating scales\n";
  my %scales;
  my @jobprpas = keys %jobs;
  for my $prpa (@jobprpas) {
    $load->{$prpa} = rand(.01) unless $load->{$prpa};
    $scales{$prpa} = 0;
    if ($BSConfig::dispatch_adjust) {
      my @prios = @{$BSConfig::dispatch_adjust || []};
      while (@prios) {
        my ($match, $adj) = splice(@prios, 0, 2);
        $scales{$prpa} += $adj if $prpa =~ /^$match/s;
      }
    }
    if ($dispatchprios) {
      my ($project, $repository, $arch) = split('/', $prpa, 3);
      for (@{$dispatchprios_project->{$project} || $dispatchprios_project->{':all:'} || []}) {
        next unless defined($_->{'adjust'});
        next if defined($_->{'project'}) && $_->{'project'} ne $project;
        next if defined($_->{'repository'}) && $_->{'repository'} ne $repository;
        next if defined($_->{'arch'}) && $_->{'arch'} ne $arch;
        $scales{$prpa} = 0 + $_->{'adjust'};
      }
    }
    $scales{$prpa} = exp(-$scales{$prpa} * (log(10.)/10.));
  }

  if (1) {
    #print "writing debug data\n";
    # write debug data
    if (@jobprpas) {
      BSUtil::store("$rundir/.dispatch.data", "$rundir/dispatch.data", {
        'load' => $load,
        'scales' => \%scales,
        'jobs' => \%jobs,
        'powerpkgs' => \%powerpkgs,
      });
    }
  }

  my %didsrcchange;
  my $assigned = 0;
  my %extraload;

  # the following helps a lot...
  #print "fast src change load adapt\n";
  for my $prpa (@jobprpas) {
    next if $maybesrcchange{$prpa};
    $didsrcchange{$prpa} = 1;
    $load->{$prpa} *= $nosrcchangescale;
  }

  @jobprpas = sort {$scales{$a} * $load->{$a} <=> $scales{$b} * $load->{$b}} @jobprpas;

  #print "assigning jobs\n";
  while (@jobprpas) {
    my $prpa = shift @jobprpas;
    my $arch = (split('/', $prpa))[2];
    next unless @{$idlearch{$arch} || []};
    my @b = @{$jobs{$prpa} || []};
    next unless @b;

    #printf "%s %d %d\n", $prpa, $scales{$prpa} * $load->{$prpa}, scalar(@b);

    my $nextload = @jobprpas ? $scales{$jobprpas[0]} * $load->{$jobprpas[0]} : undef;

    # sort all jobs, src change jobs first
    my @srcchange;
    my $ic = $infocache{$arch};
    $ic = $infocache{$arch} = {} unless $ic;
    for my $job (@b) {
      $ic->{$job} ||= readxml("$jobsdir/$arch/$job", $BSXML::buildinfo, 1) || {};
      my $info = $ic->{$job};
      if (!$info->{'readytime'}) {
	my @s = stat("$jobsdir/$arch/$job");
	$info->{'readytime'} = $s[9];
      }
      if ($info->{'reason'} && ($info->{'reason'} eq 'new build' || $info->{'reason'} eq 'source change')) {
	# only count direct changes as source change, not changes because of
	# a change in a linked package
	if ($info->{'reason'} eq 'new build' || !$info->{'revtime'} || $info->{'readytime'} - $info->{'revtime'} < 24 * 3600) {
	  push @srcchange, $job;
	}
      }
    }
    @b = List::Util::shuffle(@b);
    @b = sort {($ic->{$b}->{'needed'} || 0) <=> ($ic->{$a}->{'needed'} || 0) || ($ic->{$a}->{'readytime'} || 0) <=> ($ic->{$b}->{'readytime'} || 0)} @b;
    my %powerjobs;
    if (%powerpkgs && $BSConfig::powerhosts) {
      for my $job (@b) {
	my $jn = $job;
	$jn =~ s/-[0-9a-f]{32}$//s;
	my ($projid, $repoid, $packid) = split('::', $jn);
	$powerjobs{$job} = 1 if $powerpkgs{$packid};
      }
      if (%powerjobs) {
	# bring em to front!
	@b = ((grep {$powerjobs{$_}} @b), (grep {!$powerjobs{$_}} @b));
      }
    }
    my %srcchange = map {$_ => 1} @srcchange;
    if (@srcchange) {
      # bring em to front!
      @b = ((grep {$srcchange{$_}} @b), (grep {!$srcchange{$_}} @b));
    }

    my $rerun;
    for my $job (@b) {
      if (!$srcchange{$job} && !$didsrcchange{$prpa}) {
	$didsrcchange{$prpa} = 1;
	$load->{$prpa} *= $nosrcchangescale;
	if (defined($nextload) && $scales{$prpa} * $load->{$prpa} > $nextload) {
	  $rerun = 1;
	  last;
	}
      }
      my @idle = List::Util::shuffle(@{$idlearch{$arch} || []});
      last unless @idle;
      my %poweridle;
      if ($powerjobs{$job}) {
	# reduce to powerhosts
	for my $idle (splice @idle) {
	  my $idlehost = (split(':', $idle, 2))[1];
	  push @idle, $idle if grep {$idlehost =~ /^$_/} @$BSConfig::powerhosts;
	}
      }
      my $tries = 0;
      my $haveassigned;
      my ($project, $repository, $arch) = split('/', $prpa, 3);
      for my $idle (@idle) {
	last unless -e "$jobsdir/$arch/$job";
        next if $badhost{"$project/$ic->{$job}->{'package'}/$arch/@{[(split(':', $idle, 2))[1]]}"};
	last if $assigned && $tries >= 5;
	$tries++;
	my $res = assignjob($job, $idle, $arch);
	if (!$res) {
	  my $harch = (split(':', $idle, 2))[0];
	  for (@{$cando{$harch} || []}) {
	    $idlearch{$_} = [ grep {$_ ne $idle} @{$idlearch{$_}} ];
	  }
	  next;
	}
	next if $res eq 'badhost';
	my $harch = (split(':', $idle, 2))[0];
	for (@{$cando{$harch} || []}) {
	  $idlearch{$_} = [ grep {$_ ne $idle} @{$idlearch{$_}} ];
	}
	$assigned++;
	$jobs{$prpa} = [ grep {$_ ne $job} @{$jobs{$prpa}} ];
	$load->{$prpa} += 1;
	$haveassigned = 1;
	last;
      }
      # Tricky, still increase load so that we don't assign
      # too many non-powerjobs. But only do that once for each powerjob.
      if (!$haveassigned && $powerjobs{$job} && !$extraload{"$arch/$job"}) {
	$load->{$prpa} += 1;
	$extraload{"$arch/$job"} = 1;
      }
      # Check if load changes changed our order. If yes, re-sort and start over.
      if (defined($nextload) && $scales{$prpa} * $load->{$prpa} > $nextload) {
	$rerun = 1;
	last;
      }
    }
    if ($rerun) {
      # our load was changed so much that the order was changed. put us back
      # on the queue and re-sort.
      unshift @jobprpas, $prpa;
      @jobprpas = sort {$scales{$a} * $load->{$a} <=> $scales{$b} * $load->{$b}} @jobprpas;
    }
    last if $assigned > 20;
  }
  for my $evname (ls("$eventdir/repository")) {
    next if $evname =~ /^\./;
    my $ev = readxml("$eventdir/repository/$evname", $BSXML::event, 1);
    next unless $ev;
    eval {
      sendeventtosrcserver($ev);
    };
    if ($@) {
      warn($@);
    } else {
      unlink("$eventdir/repository/$evname");
    }
  }
  for my $evname (ls("$eventdir/dispatch")) {
    next if $evname =~ /^\./;
    my $ev = readxml("$eventdir/dispatch/$evname", $BSXML::event, 1);
    next unless $ev;
    next if $ev->{'due'} && time() < $ev->{'due'};
    delete $ev->{'due'};
    eval {
      if ($ev->{'type'} eq 'built') {
        # resend to rep server
      } elsif ($ev->{'type'} eq 'badhost') {
        print "badhost event: $ev->{'project'}/$ev->{'package'}/$ev->{'arch'}/$ev->{'job'}\n";
	$badhost{"$ev->{'project'}/$ev->{'package'}/$ev->{'arch'}/$ev->{'job'}"} = time();
      } else {
        sendeventtosrcserver($ev);
      }
    };
    if ($@) {
      warn($@);
    } else {
      unlink("$eventdir/dispatch/$evname");
    }
  }
  sleep(1) unless $assigned;
  printf("assigned $assigned jobs\n") if $assigned;
  if (%badhost) {
    my $now = time();
    for (keys %badhost) {
      if ($badhost{$_} + 24*3600 < $now) {
        print "deleting badhost $_\n";
        delete $badhost{$_};
      }
    }
  }

  if (-e "$rundir/bs_dispatch.exit") {
    unlink("$rundir/bs_dispatch.exit");
    print "exiting...\n";
    exit(0);
  }
  if (-e "$rundir/bs_dispatch.restart") {
    unlink("$rundir/bs_dispatch.restart");
    print "restarting...\n";
    exec($0);
    die("$0: $!\n");
  }
}
