#!/usr/bin/perl -w
#
# Copyright (c) 2019 SUSE Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# Forward notifications to the source server
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  unshift @INC,  "$wd";
}

use POSIX;
use Fcntl qw(:DEFAULT :flock);
use XML::Structured ':bytes';
use Getopt::Long ();
use Time::HiRes;

use BSConfiguration;
use BSRPC ':https';
use BSUtil;

use strict;

my $bsdir = $BSConfig::bsdir || "/srv/obs";
my $rundir = "$bsdir/run";
my $eventdir = "$bsdir/events";


my $myeventdir = "$eventdir/notifyforward";

sub parse_options {
  my %opts;
  if (!Getopt::Long::GetOptionsFromArray(\@_, \%opts,
    'stop|exit',
    'restart',
    'logfile=s',
  )) {
    print_usage();
    die("Invalid option(s)\n");
  }
  return (\%opts, @_);
}

sub print_usage {
  $0 =~ /([^\/]+$)/;
  print "Usage: $1 [options]

Options:
  --stop|--exit          - graceful shutdown daemon
  --restart              - restart daemon
  --logfile file         - redirect output to logfile

";
}

sub check_exitrestart {
  if (-e "$rundir/bs_notifyforward.exit") {
    close(RUNLOCK);
    unlink("$rundir/bs_notifyforward.exit");
    BSUtil::printlog("exiting...");
    exit(0);
  }
  if (-e "$rundir/bs_notifyforward.restart") {
    close(RUNLOCK);
    unlink("$rundir/bs_notifyforward.restart");
    BSUtil::printlog("restarting...");
    exec($0, @ARGV);
    die("$0: $!\n");
  }
}

sub markdone {
  my ($markfd, $markoff) = @_;
  defined(sysseek($markfd, $markoff, Fcntl::SEEK_SET)) || die("sysseek $markoff: $!\n");
  syswrite($markfd, "|", 1) == 1 || die("syswrite: $!\n");
}

my $redis_keepalive = {};

sub forwardredis {
  my ($redisdata, $markfd, $markoffs) = @_;
  %$redis_keepalive = () if $redis_keepalive->{'start'} && ($redis_keepalive->{'count'} >= 64 || $redis_keepalive->{'start'} + 300 < time());
  my $param = {
    'uri' => "$BSConfig::srcserver/notify/redis",
    'request' => 'POST',
    'timeout' => 300,
    'headers' => [ 'Content-Type: application/octet-stream' ],
    'data' => BSUtil::tostorable($redisdata),
    'keepalive' => $redis_keepalive,
  };
  eval { BSRPC::rpc($param, undef, 'keepalive=1') };
  if ($@) {
    %$redis_keepalive = ();
    die($@);
  }
  markdone($markfd, $_) for @$markoffs;
  if ($redis_keepalive->{'count'}) {
    print "forwarded ".@$markoffs." redis notifications (keepalive count $redis_keepalive->{'count'})\n";
  } else {
    print "forwarded ".@$markoffs." redis notifications\n";
  }
}

sub forwarddata {
  my ($fd) = @_;

  my $markfd;
  open($markfd, '+<', "$myeventdir/queue.send") || die("$myeventdir/queue.send: $!\n");
  my $markoff = 0;

  my @redisdata;
  my @redismarkoffs;

  while (<$fd>) {
    my $len = length($_);
    die("bad line\n") unless chop($_) eq "\n";
    my @line = split('\|', $_);
    if (!@line || !$line[0]) {
	$markoff += $len;	# empty or marked as done
	next;
    }
    s/%([a-fA-F0-9]{2})/chr(hex($1))/ge for @line;
    my $type = shift @line;

    # batch redis notifications into chunks of 32 updates
    if (@redisdata && ($type ne 'redis' || @redisdata >= 32)) {
      forwardredis(\@redisdata, $markfd, \@redismarkoffs);
      @redisdata = ();
      @redismarkoffs = ();
    }
    if ($type eq 'redis') {
      push @redisdata, \@line;
      push @redismarkoffs, $markoff;
      $markoff += $len;
      next;
    }

    my $param = {
      'uri' => "$BSConfig::srcserver/notify/$type",
      'request' => 'POST',
      'formurlencode' => 1,
      'timeout' => 300,
    };
    BSRPC::rpc($param, undef, @line);
    markdone($markfd, $markoff);
    print "forwarded a $type notification\n";
    $markoff += $len;
  }

  forwardredis(\@redisdata, $markfd, \@redismarkoffs) if @redisdata;
  close $markfd;
}

my $noprogress;

sub doforward {
  my ($fd) = @_;
  eval {
    forwarddata($fd);
  };
  if ($@) {
    warn($@);
    close($fd);
    print "retrying in 60 seconds\n";
    my $now = time();
    $noprogress ||= $now;
    if ($now - $noprogress > 10 * 60) {
      BSUtil::logcritical("no progress forwarding events since 10 minutes");
      $noprogress = $now;
    }
    return $now + 60;
  }
  $noprogress = undef;
  unlink("$myeventdir/queue.send");
  close($fd);
  return undef;
}

# copy @ARGV to keep it untouched in case of restart
my ($options, @args) = parse_options(@ARGV);

BSUtil::mkdir_p_chown($bsdir, $BSConfig::bsuser, $BSConfig::bsgroup) || die("unable to create $bsdir\n");
# Open logfile if requested
BSUtil::openlog($options->{'logfile'}, $BSConfig::logdir, $BSConfig::bsuser, $BSConfig::bsgroup);
BSUtil::drop_privs_to($BSConfig::bsuser, $BSConfig::bsgroup);

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
BSUtil::restartexit($options, 'notifyforward', "$rundir/bs_notifyforward", "$myeventdir/.ping");
BSUtil::printlog("starting build service notifyforward");

mkdir_p($rundir);
open(RUNLOCK, '>>', "$rundir/bs_notifyforward.lock") || die("$rundir/bs_notifyforward.lock: $!\n");
flock(RUNLOCK, LOCK_EX | LOCK_NB) || die("notifyforward is already running!\n");
utime undef, undef, "$rundir/bs_notifyforward.lock";

mkdir_p($myeventdir);
if (! -p "$myeventdir/.ping") {
  POSIX::mkfifo("$myeventdir/.ping", 0666) || die("$myeventdir/.ping: $!");
  chmod(0666, "$myeventdir/.ping");
}

sysopen(PING, "$myeventdir/.ping", POSIX::O_RDWR) || die("$myeventdir/.ping: $!");

my $retry;

if (-e "$myeventdir/queue.send") {
  print "resuming transmission of old data\n";
  my $file;
  BSUtil::lockopen($file, '<', "$myeventdir/queue.send");
  $retry = doforward($file);
}

while (1) {
  check_exitrestart();
  if ($retry) {
    my $now = time();
    if ($now < $retry) {
      sleep(1);
      next;
    }
    undef $retry;
  }
  BSUtil::drainping(\*PING);
  if (-e "$myeventdir/queue.send") {
    my $file;
    BSUtil::lockopen($file, '<', "$myeventdir/queue.send");
    $retry = doforward($file);
    next if $retry;
  }
  if (-e "$myeventdir/queue") {
    my $file;
    BSUtil::lockopen($file, '<', "$myeventdir/queue");
    die if -e "$myeventdir/queue.send";
    rename("$myeventdir/queue", "$myeventdir/queue.send") || die("rename $myeventdir/queue $myeventdir/queue.send: $!\n");
    $retry = doforward($file);
    Time::HiRes::sleep(.5);
  } else {
    print "waiting for an event...\n";
    BSUtil::waitping(\*PING);
  }
}

