#!/usr/bin/perl -w
#
# Copyright (c) 2019 SUSE Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# Forward notifications to the source server
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  unshift @INC,  "$wd";
}

use XML::Structured ':bytes';
use Getopt::Long ();
use Time::HiRes;

use BSConfiguration;
use BSRPC ':https';
use BSFileQueue;
use BSUtil;

use strict;

my $bsdir = $BSConfig::bsdir || "/srv/obs";
my $rundir = "$bsdir/run";
my $eventdir = "$bsdir/events";


my $myeventdir = "$eventdir/notifyforward";
my $runname = 'bs_notifyforward';

sub parse_options {
  my %opts;
  if (!Getopt::Long::GetOptionsFromArray(\@_, \%opts,
    'stop|exit',
    'restart',
    'logfile=s',
  )) {
    print_usage();
    die("Invalid option(s)\n");
  }
  return (\%opts, @_);
}

sub print_usage {
  $0 =~ /([^\/]+$)/;
  print "Usage: $1 [options]

Options:
  --stop|--exit          - graceful shutdown daemon
  --restart              - restart daemon
  --logfile file         - redirect output to logfile

";
}

sub check_exitrestart {
  if (-e "$rundir/$runname.exit") {
    close(RUNLOCK);
    unlink("$rundir/$runname.exit");
    BSUtil::printlog("exiting...");
    exit(0);
  }
  if (-e "$rundir/$runname.restart") {
    close(RUNLOCK);
    unlink("$rundir/$runname.restart");
    BSUtil::printlog("restarting...");
    exec($0, @ARGV);
    die("$0: $!\n");
  }
}

my $redis_keepalive = {};

sub forwardredis {
  my ($redisdata, $markfd, $markoffs) = @_;
  %$redis_keepalive = () if $redis_keepalive->{'start'} && ($redis_keepalive->{'count'} >= 64 || $redis_keepalive->{'start'} + 300 < time());
  my $param = {
    'uri' => "$BSConfig::srcserver/notify/redis",
    'request' => 'POST',
    'timeout' => 300,
    'headers' => [ 'Content-Type: application/octet-stream' ],
    'data' => BSUtil::tostorable($redisdata),
    'keepalive' => $redis_keepalive,
  };
  eval { BSRPC::rpc($param, undef, 'keepalive=1') };
  if ($@) {
    %$redis_keepalive = ();
    die($@);
  }
  BSFileQueue::markdone($markfd, $_) for @$markoffs;
  if ($redis_keepalive->{'count'}) {
    print "forwarded ".@$markoffs." redis notifications (keepalive count $redis_keepalive->{'count'})\n";
  } else {
    print "forwarded ".@$markoffs." redis notifications\n";
  }
}

sub forwarddata {
  my ($fd) = @_;

  my $markfd = BSFileQueue::openmark("$myeventdir/queue.send");
  my $off = 0;

  my @redisdata;
  my @redismarkoffs;

  while (1) {
    my ($len, @line) = BSFileQueue::getnext($fd);
    last unless defined $len;
    if (!@line) {
      $off += $len;	# empty or marked as done
      next;
    }
    my $type = shift @line;

    # batch redis notifications into chunks of 32 updates
    if (@redisdata && ($type ne 'redis' || @redisdata >= 32)) {
      forwardredis(\@redisdata, $markfd, \@redismarkoffs);
      @redisdata = ();
      @redismarkoffs = ();
    }
    if ($type eq 'redis') {
      push @redisdata, \@line;
      push @redismarkoffs, $off;
      $off += $len;
      next;
    }

    my $param = {
      'uri' => "$BSConfig::srcserver/notify/$type",
      'request' => 'POST',
      'formurlencode' => 1,
      'timeout' => 300,
    };
    BSRPC::rpc($param, undef, @line);
    BSFileQueue::markdone($markfd, $off);
    print "forwarded a $type notification\n";
    $off += $len;
  }

  forwardredis(\@redisdata, $markfd, \@redismarkoffs) if @redisdata;
  close $markfd;
}

my $noprogress;

sub doforward {
  my $fd = BSFileQueue::openqueue("$myeventdir/queue.send");
  eval { forwarddata($fd) };
  if ($@) {
    warn($@);
    close($fd);
    print "retrying in 60 seconds\n";
    my $now = time();
    $noprogress ||= $now;
    if ($now - $noprogress > 10 * 60) {
      BSUtil::logcritical("no progress forwarding events since 10 minutes");
      $noprogress = $now;
    }
    return $now + 60;
  }
  $noprogress = undef;
  unlink("$myeventdir/queue.send");
  close($fd);
  return undef;
}

# copy @ARGV to keep it untouched in case of restart
my ($options, @args) = parse_options(@ARGV);

BSUtil::mkdir_p_chown($bsdir, $BSConfig::bsuser, $BSConfig::bsgroup) || die("unable to create $bsdir\n");
# Open logfile if requested
BSUtil::openlog($options->{'logfile'}, $BSConfig::logdir, $BSConfig::bsuser, $BSConfig::bsgroup);
BSUtil::drop_privs_to($BSConfig::bsuser, $BSConfig::bsgroup);

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
BSUtil::restartexit($options, 'notifyforward', "$rundir/$runname", "$myeventdir/.ping");

mkdir_p($rundir);
BSUtil::openrunlock(\*RUNLOCK, "$rundir/$runname", 'notifyforward');
BSUtil::printlog("starting build service notifyforward");

mkdir_p($myeventdir);
BSUtil::openping(\*PING, "$myeventdir/.ping");

my $retry;

if (-e "$myeventdir/queue.send") {
  print "resuming transmission of old data\n";
  $retry = doforward();
}

while (1) {
  check_exitrestart();
  if ($retry) {
    my $now = time();
    if ($now < $retry) {
      sleep(1);
      next;
    }
    undef $retry;
  }
  BSUtil::drainping(\*PING);
  if (-e "$myeventdir/queue.send") {
    $retry = doforward();
    next if $retry;
  }
  if (-e "$myeventdir/queue") {
    BSFileQueue::renamequeue("$myeventdir/queue", "$myeventdir/queue.send");
    $retry = doforward();
    Time::HiRes::sleep(.5);
  } else {
    print "waiting for an event...\n";
    BSUtil::waitping(\*PING);
  }
}

