#!/usr/bin/perl -w
#
# Copyright (c) 2019 SUSE Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# Forward notifications to the source server
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  unshift @INC,  "$wd";
}

use Data::Dumper;
use XML::Structured ':bytes';
use Getopt::Long ();
use Time::HiRes;

use BSConfiguration;
use BSUtil;
use BSFileQueue;
use BSRedis ':tls';

use strict;

my $bsdir = $BSConfig::bsdir || "/srv/obs";
my $rundir = "$bsdir/run";
my $eventdir = "$bsdir/events";


my $myeventdir = "$eventdir/redis";
my $runname = 'bs_redis';

sub parse_options {
  my %opts;
  if (!Getopt::Long::GetOptionsFromArray(\@_, \%opts,
    'stop|exit',
    'restart',
    'logfile=s',
  )) {
    print_usage();
    die("Invalid option(s)\n");
  }
  return (\%opts, @_);
}

sub print_usage {
  $0 =~ /([^\/]+$)/;
  print "Usage: $1 [options]

Options:
  --stop|--exit          - graceful shutdown daemon
  --restart              - restart daemon
  --logfile file         - redirect output to logfile

";
}

sub check_exitrestart {
  if (-e "$rundir/$runname.exit") {
    close(RUNLOCK);
    unlink("$rundir/$runname.exit");
    BSUtil::printlog("exiting...");
    exit(0);
  }
  if (-e "$rundir/$runname.restart") {
    close(RUNLOCK);
    unlink("$rundir/$runname.restart");
    BSUtil::printlog("restarting...");
    exec($0, @ARGV);
    die("$0: $!\n");
  }
}

my $lua_deleteresult = q{
  redis.call('DEL', KEYS[1], KEYS[2], KEYS[3])
  return 'OK'
};

my $lua_updateresult = q{
  local pkgs = {}
  local details = {}
  local olddetails = {}
  local i = 1 ; while 1 do
    local pkg = ARGV[i]
    if not pkg then break end
    local detail = ARGV[i + 1]
    if detail:sub(1,9) == "scheduled" or detail:sub(1,7) == "blocked" then
      local olddetail = redis.call('HGET', KEYS[1], pkg)
      if olddetail and olddetail:sub(1,9) ~= "scheduled" and olddetail:sub(1,7) ~= "blocked" then redis.call('HSET', KEYS[3], pkg, olddetail) end
      olddetails[pkg] = true
    end
    redis.call('HSET', KEYS[1], pkg, detail)
    pkgs[pkg] = true
    details[detail] = true
    i = i + 2
  end
  for _,k in ipairs(redis.call('HKEYS', KEYS[1])) do
    if not pkgs[k] then redis.call('HDEL', KEYS[1], k) end
  end
  for _,k in ipairs(redis.call('HKEYS', KEYS[2])) do
    if not details[k] then redis.call('HDEL', KEYS[2], k) end
  end
  for _,k in ipairs(redis.call('HKEYS', KEYS[3])) do
    if not olddetails[k] then redis.call('HDEL', KEYS[3], k) end
  end
  return 'OK'
};

my $lua_updateoneresult = q{
  redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
  if ARGV[2]:sub(1,9) ~= "scheduled" and ARGV[2]:sub(1,7) ~= "blocked" then redis.call('HDEL', KEYS[3], ARGV[1]) end
  if ARGV[3] then redis.call('HDEL', KEYS[2], ARGV[3]) end
  return 'OK'
};

my $lua_updatejobstatus = q{
  redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
  return 'OK'
};

my $lua_updatejobstatus_del = q{
  redis.call('HDEL', KEYS[1], ARGV[1])
  return 'OK'
};

my $lua_updatescmsync = q{
  local oldrepo = redis.call('HGET', KEYS[1], 'repo')
  if oldrepo then redis.call('SREM', KEYS[2] .. oldrepo, KEYS[1]) end
  redis.call('HSET', KEYS[1], 'repo', ARGV[1], 'branch', ARGV[2], 'trackingbranch', ARGV[3]);
  redis.call('SADD', KEYS[2] .. ARGV[1], KEYS[1])
  return 'OK'
};

my $lua_updatescmsync_del = q{
  local oldrepo = redis.call('HGET', KEYS[1], 'repo')
  if oldrepo then redis.call('SREM', KEYS[2] .. oldrepo, KEYS[1]) end
  redis.call('DEL', KEYS[1])
  return 'OK'
};

my $red;	# the redis handle

sub forwarddata {
  my ($fd) = @_;
  my $markfd = BSFileQueue::openmark("$myeventdir/queue.send");
  my $off = 0;
  my $cnt = 0;
  while (1) {
    my ($len, @line) = BSFileQueue::getnext($fd);
    last unless defined $len;
    if (!@line) {
      $off += $len;		# empty or marked as done
      next;
    }
    my $cmd = shift @line;
    my $prpa = shift @line;
    die unless $prpa;
    if ($cmd eq 'deleteresult') {
      unshift @line, 'EVAL', $lua_deleteresult, 3, "result.$prpa", "jobs.$prpa", "oldresult.$prpa";
    } elsif ($cmd eq 'updateresult') {
      die("odd number of arguments in $cmd\n") if scalar(@line) % 2;
      if (@line) {
        unshift @line, 'EVAL', $lua_updateresult, 3, "result.$prpa", "jobs.$prpa", "oldresult.$prpa";
      } else {
        unshift @line, 'EVAL', $lua_deleteresult, 3, "result.$prpa", "jobs.$prpa", "oldresult.$prpa";
      }
    } elsif ($cmd eq 'updateoneresult') {
      unshift @line, 'EVAL', $lua_updateoneresult, 3, "result.$prpa", "jobs.$prpa", "oldresult.$prpa";
    } elsif ($cmd eq 'updatejobstatus') {
      if (@line == 1) {
        unshift @line, 'EVAL', $lua_updatejobstatus_del, 1, "jobs.$prpa";
      } else {
        unshift @line, 'EVAL', $lua_updatejobstatus, 1, "jobs.$prpa";
      }
    } elsif ($cmd eq 'updatescmsync') {
      if (!defined($line[0])) {
        unshift @line, 'EVAL', $lua_updatescmsync_del, 2, "scmsync.$prpa", "scmsync_idx_repo.";
      } else {
	$line[1] = '' unless defined $line[1];
	$line[2] = '' unless defined $line[2];
        unshift @line, 'EVAL', $lua_updatescmsync, 2, "scmsync.$prpa", "scmsync_idx_repo.";
      }
    } else {
      die("unknown redis command '$cmd'\n");
    }
    #print "RUN @line\n";
    $red->run(@line);
    BSFileQueue::markdone($markfd, $off);
    $cnt++;
    $off += $len;
  }
  close $markfd;
  print "sent $cnt redis notifications\n";
}

my $noprogress;

sub doforward {
  my $fd = BSFileQueue::openqueue("$myeventdir/queue.send");
  eval { forwarddata($fd) };
  if ($@) {
    warn($@);
    close($fd);
    print "retrying in 60 seconds\n";
    my $now = time();
    $noprogress ||= $now;
    if ($now - $noprogress > 10 * 60) {
      BSUtil::logcritical("no progress sending redis events since 10 minutes");
      $noprogress = $now;
    }
    return $now + 60;
  }
  $noprogress = undef;
  unlink("$myeventdir/queue.send");
  close($fd);
  return undef;
}

sub critlogger {
  my ($logfile, $msg) = @_;
  return unless $logfile;
  my $logstr = sprintf "%s: %-7s %s\n", BSUtil::isotime(time), "[$$]", $msg;
  BSUtil::appendstr($logfile, $logstr);
}

# copy @ARGV to keep it untouched in case of restart
my ($options, @args) = parse_options(@ARGV);

BSUtil::mkdir_p_chown($bsdir, $BSConfig::bsuser, $BSConfig::bsgroup) || die("unable to create $bsdir\n");
# Open logfile if requested
BSUtil::openlog($options->{'logfile'}, $BSConfig::logdir, $BSConfig::bsuser, $BSConfig::bsgroup);
BSUtil::drop_privs_to($BSConfig::bsuser, $BSConfig::bsgroup);

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
BSUtil::restartexit($options, 'redis', "$rundir/$runname", "$myeventdir/.ping");

my $critlogfile = "$BSConfig::logdir/redis.crit.log";
BSUtil::setcritlogger(sub { critlogger($critlogfile, $_[0]) });

mkdir_p($rundir);
BSUtil::openrunlock(\*RUNLOCK, "$rundir/$runname", 'redis');
BSUtil::printlog("starting build service redis forwarder");

mkdir_p($myeventdir);
BSUtil::openping(\*PING, "$myeventdir/.ping");

my $retry;

die("No redis server configured\n") unless $BSConfig::redisserver;
die("Redis server must be of scheme redis[s]://<server>[:port]\n") unless $BSConfig::redisserver =~ /^(rediss?):\/\/(?:(?:([^\/\@:]+):)?([^\/\@]*)\@)?([^\/:]+)(?::(\d+))?$/;

$red = BSRedis->new('server' => $4, 'port' => $5, 'user' => $2, 'password' => $3, 'tls' => ($1 eq 'rediss' ? 1 : 0)); 

if (-e "$myeventdir/queue.send") {
  print "resuming transmission of old data\n";
  $retry = doforward();
}

while (1) {
  check_exitrestart();
  if ($retry) {
    my $now = time();
    if ($now < $retry) {
      sleep(1);
      next;
    }
    undef $retry;
  }
  BSUtil::drainping(\*PING);
  if (-e "$myeventdir/queue.send") {
    $retry = doforward();
    next if $retry;
  }
  if (-e "$myeventdir/queue") {
    BSFileQueue::renamequeue("$myeventdir/queue", "$myeventdir/queue.send");
    $retry = doforward();
    Time::HiRes::sleep(.5);
  } else {
    print "waiting for an event...\n";
    BSUtil::waitping(\*PING);
  }
}

