#!/usr/bin/python3.13 -s

"""Detect compact-binary-merger gravitational-wave signals in low latency. This
program takes a fixed template bank and performs matched filtering on a
continuous stream of strain data in fixed short blocks of time. It is capable
of reading from low latency data directories on the LDG clusters. Work is
parallelized through the use of MPI, so this script should typically be
launched with mpirun.

See https://arxiv.org/abs/1805.11174 for an overview."""

import sys
import argparse, numpy, pycbc, logging, cProfile, h5py, lal, json
import os.path
import itertools
import platform
import subprocess
import multiprocessing
from multiprocessing.dummy import threading
from matplotlib import use
use('agg')
from shutil import which
from mpi4py import MPI as mpi
from astropy.time import Time
from pycbc.pool import BroadcastPool
from pycbc import fft, version, waveform, scheme, makedir
from pycbc.types import MultiDetOptionAction, MultiDetMultiColonOptionAction
from pycbc.filter import LiveBatchMatchedFilter, compute_followup_snr_series
from pycbc.filter import followup_event_significance
from pycbc.strain import StrainBuffer
from pycbc.events.ranking import newsnr
from pycbc.events.coinc import LiveCoincTimeslideBackgroundEstimator as Coincer
from pycbc.events.single import LiveSingle
from pycbc.io.gracedb import CandidateForGraceDB
from pycbc.io.hdf import recursively_save_dict_contents_to_group
from pycbc.types import positive_int
import pycbc.waveform.bank
from pycbc.vetoes.sgchisq import SingleDetSGChisq
from pycbc.waveform.waveform import props
from pycbc.population import live_pastro_utils as livepau
from pycbc import mchirp_area
from pycbc.detector import ppdets
from pycbc.filter import resample
from pycbc.psd import estimate
from pycbc.psd import variation
from pycbc.live import snr_optimizer
from pycbc import conversions as conv

# Use cached class-based FFTs in the resample and estimate module
resample.USE_CACHING_FOR_LFILTER = True
estimate.USE_CACHING_FOR_WELCH_FFTS = True
estimate.USE_CACHING_FOR_INV_SPEC_TRUNC = True

try:
    from setproctitle import setproctitle
except ImportError:
    def setproctitle(title):
        pass


def ptof(p, ft):
    """Convert p-value to FAR via foreground time `ft`.
    """
    return numpy.log1p(-p) / -ft

def ftop(f, ft):
    """Convert FAR to p-value via foreground time `ft`.
    """
    return 1 - numpy.exp(-ft * f)

def combine_ifar_pvalue(ifar, pvalue, livetime):
    """Convert original IFAR to p-value and combine with followup p-value.
    """
    from scipy.stats import combine_pvalues
    # NB units of ifar and livetime must be the same
    _, pnew = combine_pvalues([ftop(1. / ifar, livetime), pvalue])
    nifar = 1. / ptof(pnew, livetime)
    # take max of original IFAR and combined IFAR & apply trials factor
    return numpy.maximum(ifar, nifar) / 2.


class LiveEventManager(object):
    def __init__(self, args, bank):
        self.low_frequency_cutoff = args.low_frequency_cutoff
        self.bank = bank
        # all interferometers involved in the analysis, whether for generating
        # candidates or doing localization only
        self.ifos = set(args.channel_name.keys())
        # interferometers used for localization only
        self.skymap_only_ifos = set(args.skymap_only_ifos or [])
        # subset of the interferometers allowed to produce candidates
        self.trigg_ifos = self.ifos - self.skymap_only_ifos

        # Figure out what we are supposed to process within the pool of MPI processes
        self.comm = mpi.COMM_WORLD
        self.size = self.comm.Get_size()
        self.rank = self.comm.Get_rank()

        if self.rank > 0:
            # We are a matched-filtering process, so nothing else is needed
            return

        self.path = args.output_path
        self.mc_area_args = mchirp_area.from_cli(args, parser)
        self.padata = livepau.PAstroData(args.p_astro_spec, args.bank_file)
        self.use_date_prefix = args.day_hour_output_prefix
        self.ifar_upload_threshold = args.ifar_upload_threshold
        self.pvalue_lookback_time = args.pvalue_lookback_time
        self.pvalue_livetime = args.pvalue_combination_livetime
        self.gracedb_server = args.gracedb_server
        self.gracedb_search = args.gracedb_search
        self.gracedb_labels = args.gracedb_labels
        self.gracedb_testing = not args.enable_production_gracedb_upload
        self.enable_gracedb_upload = args.enable_gracedb_upload
        self.run_snr_optimization = args.run_snr_optimization
        self.snr_opt_label = args.snr_opt_label
        self.snr_opt_options = args.snr_opt_extra_opts
        self.gracedb = None

        # Keep track of which events have been uploaded
        self.last_few_coincs_uploaded = []

        # Give this a nominal value, it is required but not used
        self.fu_cores = None
        if self.run_snr_optimization:
            # preestimate the number of CPU cores that we can afford giving
            # to followup processes without slowing down the main search
            bg_cores = len(tuple(itertools.combinations(self.trigg_ifos, 2)))
            analysis_cores = 1 + bg_cores
            if platform.system() != 'Darwin':
                available_cores = len(os.sched_getaffinity(0))
                self.fu_cores = available_cores - analysis_cores
                if self.fu_cores <= 0:
                    logging.warning(
                        'Insufficient number of CPU cores (%d) to '
                        'run search and trigger followups. Uploaded '
                        'triggers will momentarily increase the lag',
                        available_cores
                    )
                    self.fu_cores = 1
            else:
                # To enable mac testing, this is just set to 1
                self.fu_cores = 1

        if args.enable_embright_has_massgap:
            if args.embright_massgap_max < self.mc_area_args['mass_bdary']['ns_max']:
                parser.error('MAX_GAP value cannot be lower than MAX_NS limit')
            self.mc_area_args['embright_mg_max'] = args.embright_massgap_max

    def commit_results(self, results):
        logging.info('Committing triggers')
        self.comm.gather(results, root=0)

    def barrier(self):
        self.comm.Barrier()

    def barrier_status(self, status):
        return self.comm.allreduce(status, op=mpi.LAND)

    def gather_results(self):
        """ Collect results from the mpi subprocesses and collate them into
        contiguous sets of arrays.
        """

        if self.rank != 0:
            raise RuntimeError('Not root process')

        logging.info('Gathering triggers')
        all_results = self.comm.gather(None, root=0)
        data_ends = [a[1] for a in all_results if a is not None]
        results = [a[0] for a in all_results if a is not None]

        combined = {}
        for ifo in results[0]:
            # check if any of the results returned invalid
            try:
                for r in results:
                    if r[ifo] is False:
                        raise ValueError
            except ValueError:
                continue
            combined[ifo] = {}
            for key in results[0][ifo]:
                combined[ifo][key] = numpy.concatenate([r[ifo][key] for r in results])

        return combined, data_ends[0]

    def compute_followup_data(self, ifos, triggers,
                              followup_ifos=None, recalculate_ifar=False):
        """Figure out which of the followup detectors are usable, and compute
        SNR time series for all the available detectors.
        """
        out = {}
        followup_ifos = [] if followup_ifos is None else followup_ifos

        template_id = triggers[f'foreground/{ifos[0]}/template_id']
        htilde = self.bank[template_id]

        coinc_times = {
            ifo: triggers[f'foreground/{ifo}/end_time'] for ifo in ifos
        }

        # Get the SNR series for the ifos that made the initial coinc
        for ifo in ifos:
            # NOTE we only check the state/DQ of followup IFOs here.
            # IFOs producing the coincidence are assumed to also
            # produce valid SNR series.
            snr_series = compute_followup_snr_series(
                self.data_readers[ifo],
                htilde,
                coinc_times[ifo],
                check_state=False
            )

            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}

        # Determine if the other ifos can contribute to the coincident event,
        # if so then update the info in `triggers` appropriately
        for ifo in followup_ifos:
            pvalue_info = followup_event_significance(
                ifo,
                self.data_readers[ifo],
                self.bank,
                template_id,
                coinc_times,
                lookback=self.pvalue_lookback_time
            )
            if pvalue_info is None:
                continue
            out[ifo] = {'snr_series': pvalue_info['snr_series']}
            self.get_followup_info(
                ifos[0],
                ifo,
                triggers,
                pvalue_info,
                recalculate_ifar=recalculate_ifar and ifo not in self.skymap_only_ifos
            )

        # the SNR time series sample rate can vary slightly due to
        # rounding errors, so force all of them to be identical
        fix_delta_t = None
        for ifo in out:
            if 'snr_series' not in out[ifo]:
                continue
            if fix_delta_t is None:
                fix_delta_t = out[ifo]['snr_series']._delta_t
            else:
                out[ifo]['snr_series']._delta_t = fix_delta_t

        return out

    def get_followup_info(
        self,
        coinc_ifo,
        ifo,
        triggers,
        pvalue_info,
        recalculate_ifar=False
    ):
        peak_time = pvalue_info['peak_time']
        pv = pvalue_info['pvalue']
        pv_sat = pvalue_info['pvalue_saturated']

        # Copy the common fields from the other detector;
        # ignore fields that contain detector-specific data
        fields_to_ignore = set([
            'end_time', 'snr', 'stat', 'coa_phase',
            'chisq', 'chisq_dof', 'sg_chisq', 'sigmasq'
        ])
        for key in set(triggers):
            if f'foreground/{coinc_ifo}/' not in key:
                continue
            _, _, name = key.split('/')
            if name in fields_to_ignore:
                continue
            triggers[f'foreground/{ifo}/{name}'] = triggers[key]

        # Set the detector-specific fields for which we have data
        snr_series_peak = pvalue_info['snr_series'].at_time(
            peak_time,
            nearest_sample=True
        )
        base = f'foreground/{ifo}/'
        triggers[base + 'end_time'] = float(peak_time)
        triggers[base + 'snr'] = triggers[base + 'stat'] = abs(snr_series_peak)
        triggers[base + 'coa_phase'] = numpy.angle(snr_series_peak)
        triggers[base + 'sigmasq'] = pvalue_info['sigma2']
        if recalculate_ifar:
            # Calculate new ifar
            triggers['foreground/ifar'] = combine_ifar_pvalue(
                triggers['foreground/ifar'],
                pv,
                self.pvalue_livetime
            )
            triggers['foreground/ifar_saturated'] |= pv_sat
            triggers[f'foreground/pvalue_{ifo}'] = pv
            triggers[f'foreground/pvalue_{ifo}_saturated'] = pv_sat

    def setup_optimize_snr(
        self, results, live_ifos, triggering_ifos, fname
    ):
        """Setup the network SNR optimization process for a
        candidate event. See arXiv:2008.07494 for details.

        Parameters
        ----------
        results : dict
            Information about the candidate.
        live_ifos : list
            List of detectors with usable data.
        triggering_ifos : list
            List of detectors that originally identified the candidate,
            providing reliabile estimates of the merger time. Must not be a
            superset of `live_ifos`.
        fname : str
            File name where the candidate information has been saved.
            Used to name other files produced as part of setting up the
            SNR optimization.
        """
        template_id = results[f'foreground/{live_ifos[0]}/template_id']
        p = props(self.bank.table[template_id])
        p.pop('approximant')
        apr = self.bank.approximant(template_id)
        min_buffer = self.bank.minimum_buffer + 0.5
        buff_size = \
            pycbc.waveform.get_waveform_filter_length_in_time(apr, **p)

        tlen = self.bank.round_up((buff_size + min_buffer) \
            * self.bank.sample_rate)
        flen = int(tlen / 2 + 1)
        delta_f = self.bank.sample_rate / float(tlen)
        cmd = f'timeout {args.snr_opt_timeout} '
        exepath = which('pycbc_optimize_snr')
        cmd += exepath + ' '

        # Add SNR optimization options to the command
        if self.snr_opt_options is not None:
            cmd += self.snr_opt_options

        # Add data files according to the event we are optimizing
        data_fils_str = '--data-files '
        psd_fils_str = '--psd-files '
        for ifo in live_ifos:
            curr_fname = fname.replace(
                '.xml.gz', f'_{ifo}_data_overwhitened.hdf'
            )
            curr_data = self.data_readers[ifo].overwhitened_data(delta_f)
            curr_data.save(curr_fname)
            data_fils_str += f'{ifo}:{curr_fname} '
            curr_fname = fname.replace('.xml.gz', f'_{ifo}_psd.hdf')
            curr_psd = curr_data.psd
            curr_psd.save(curr_fname)
            psd_fils_str += f'{ifo}:{curr_fname} '
        cmd += data_fils_str
        cmd += psd_fils_str

        curr_fname = fname.replace('.xml.gz', '_attributes.hdf')
        with h5py.File(curr_fname, 'w') as hdfp:
            for ifo in triggering_ifos:
                curr_time = results[f'foreground/{ifo}/end_time']
                hdfp[f'coinc_times/{ifo}'] = curr_time
            f_end = self.bank.end_frequency(template_id)
            if f_end is None or f_end >= (flen * delta_f):
                f_end = (flen-1) * delta_f
            hdfp['flen'] = flen
            hdfp['delta_f'] = delta_f
            hdfp['f_end'] = f_end
            hdfp['sample_rate'] = self.bank.sample_rate
            hdfp['flow'] = self.bank.table[template_id].f_lower
            hdfp['mass1'] = self.bank.table[template_id].mass1
            hdfp['mass2'] = self.bank.table[template_id].mass2
            hdfp['spin1z'] = self.bank.table[template_id].spin1z
            hdfp['spin2z'] = self.bank.table[template_id].spin2z
            hdfp['template_duration'] = \
                self.bank.table[template_id].template_duration
            hdfp['ifar'] = results['foreground/ifar']
            if 'p_terr' in results:
                hdfp['p_terr'] = results['p_terr']

            for ifo in args.channel_name:
                hdfp[f'channel_names/{ifo}'] = args.channel_name[ifo]

            recursively_save_dict_contents_to_group(hdfp,
                                                    'mc_area_args/',
                                                    self.mc_area_args)

        cmd += f'--params-file {curr_fname} '
        cmd += f'--approximant {apr} '
        cmd += f'--gracedb-server {self.gracedb_server} '
        cmd += f'--gracedb-search {self.gracedb_search} '

        labels = self.snr_opt_label
        labels += ' '.join(self.gracedb_labels or [])
        cmd += f'--gracedb-labels {labels} '

        if not self.gracedb_testing:
            cmd += '--production '
        cmd += '--verbose '

        # set up a place for storing the SNR optimization results and log
        out_dir_path = os.path.join(os.path.dirname(fname), 'optimize_snr')
        makedir(out_dir_path)
        cmd += f'--output-path {out_dir_path} '

        if self.enable_gracedb_upload:
            cmd += '--enable-gracedb-upload '

        if self.fu_cores:
            cmd += f'--cores {self.fu_cores} '

        if args.processing_scheme:
            # we will use the cores for multiple workers of the
            # optimization routine, so we force the processing scheme
            # to a single core here.  This may be enforcing some
            # assumptions about the optimal way to do ffts on the
            # machine. However, the dominant cost of pycbc_optimize_snr
            # is expected to be in waveform generation, which is
            # unlikely to benefit from a processing scheme with more
            # than 1 thread anyway.
            opt_scheme = args.processing_scheme.split(':')[0]
            cmd += f'--processing-scheme {opt_scheme}:1 '

        # Save the command which would be used:
        snroc_fname = os.path.join(out_dir_path, 'snr_optimize_command.txt')
        with open(snroc_fname, 'w') as snroc_file:
            snroc_file.write(cmd)

        return cmd, out_dir_path

    def run_optimize_snr(self, cmd, out_dir_path, attribute_fname, gid):
        """
        Run the optimize_snr instance setup up by setup_optimize_snr

        Parameters
        ----------
        cmd: str
            Command to tell subprocess what to run for the
            optimize_snr process
        out_dir_path: os.path or str
            Place for storing the SNR optimization results and log
        attribute_fname: str
            The filename of the attributes of the candidate
        gid: str
            GraceDB ID of the candidate
        """
        if gid is not None:
            with h5py.File(attribute_fname, 'a') as hdfp:
                hdfp['gid'] = gid
        log_fname = os.path.join(out_dir_path, 'optimize_snr.log')

        logging.info('running %s with log to %s', cmd, log_fname)
        with open(log_fname, "w") as logfile:
            subprocess.Popen(
                cmd, shell=True, stdout=logfile, stderr=logfile
            )

    def create_gdb(self):
        """
        Create a GraceDB session that will persist throughout the execution
        of PyCBC Live.
        """
        from ligo.gracedb.rest import GraceDb

        logging.info('Creating GraceDB session')
        # Set up dict for args that reload the certificate if it will expire in the
        # reload_buffer time. These args are documented here:
        # https://ligo-gracedb.readthedocs.io/en/latest/api.html#ligo.gracedb.rest.GraceDb
        # Because we do not change any of the request session values when running the
        # code, it should remain thread safe.
        gdbargs = {'reload_certificate': True, 'reload_buffer': 300}
        if self.gracedb_server:
            gdbargs['service_url'] = self.gracedb_server
        self.gracedb = GraceDb(**gdbargs)

    def upload_in_thread(self, event, fname, comment, cmd, out_dir_path,
                         upload_checks, optimize_snr_checks):
        gid = None
        if upload_checks:
            gid = event.upload(
                fname,
                gracedb_server=self.gracedb_server,
                testing=self.gracedb_testing,
                extra_strings=[comment],
                search=self.gracedb_search,
                labels=self.gracedb_labels
            )

        if optimize_snr_checks:
            logging.info('Optimizing SNR for event above threshold ..')
            self.run_optimize_snr(
                cmd,
                out_dir_path,
                fname.replace('.xml.gz', '_attributes.hdf'),
                gid
            )

    def check_coincs(self, ifos, coinc_results, psds):
        """ Perform any followup and save zerolag triggers to a coinc xml file
        """
        # Check for hardware injection
        for ifo in ifos:
            if self.data_readers[ifo].near_hwinj():
                coinc_results['HWINJ'] = True
                break

        if 'foreground/ifar' not in coinc_results:
            return

        logging.info('computing followup data for coinc')
        coinc_ifos = coinc_results['foreground/type'].split('-')
        followup_ifos = set(ifos) - set(coinc_ifos)
        followup_ifos = list(followup_ifos | self.skymap_only_ifos)

        double_ifar = coinc_results['foreground/ifar']
        if double_ifar < args.ifar_double_followup_threshold:
            coinc_results['foreground/NO_FOLLOWUP'] = True
            return

        sld = self.compute_followup_data(
            coinc_ifos,
            coinc_results,
            followup_ifos=followup_ifos,
            recalculate_ifar=True
        )

        event = CandidateForGraceDB(
            coinc_ifos,
            ifos,
            coinc_results,
            gracedb=self.gracedb,
            padata=self.padata,
            bank=self.bank,
            psds=psds,
            skyloc_data=sld,
            low_frequency_cutoff=self.low_frequency_cutoff,
            channel_names=args.channel_name,
            mc_area_args=self.mc_area_args,
        )

        fname = os.path.join(
            self.get_candidate_path(event.merger_time),
            f'coinc-{event.merger_time:.3f}.xml.gz'
        )
        logging.info('Coincident candidate! Saving as %s', fname)

        # Which IFOs were active?
        live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]

        # Verbally explain some details not obvious from the other info
        comment = (
            'Trigger produced as a {} coincidence.<br />'
            'Two-detector ranking statistic: {}<br />'
            'Detectors used for FAR calculation: {}.<br />'
            'Detectors used for localization: {}.<br />'
            'Detectors used only for localization: {}.'
        )
        comment = comment.format(
            ppdets(coinc_ifos),
            args.ranking_statistic,
            ppdets(set(ifos) - self.skymap_only_ifos),
            ppdets(live_ifos),
            ppdets(set(live_ifos) & self.skymap_only_ifos)
        )

        ifar = coinc_results['foreground/ifar']
        upload_checks = self.enable_gracedb_upload and self.ifar_upload_threshold < ifar
        optimize_snr_checks = self.run_snr_optimization and self.ifar_upload_threshold < ifar

        # Keep track of the last few coincs uploaded in order to
        # prevent singles being uploaded as well for coinc events
        self.last_few_coincs_uploaded.append(event.merger_time)
        # Only need to keep a few (10) events
        self.last_few_coincs_uploaded = self.last_few_coincs_uploaded[-10:]

        # Save the event
        if not upload_checks:
            event.save(fname)

        # Save the event info/data and what _would_ be run for SNR optimizer,
        # even if not running it - do this before the thread so no
        # data buffers move on in a possible interim period

        # Tell SNR optimized event about p_terr
        if hasattr(event, 'p_terr') and event.p_terr is not None:
            coinc_results['p_terr'] = event.p_terr

        # Do the optimizer setup
        cmd, out_dir_path = self.setup_optimize_snr(
            coinc_results,
            live_ifos,
            coinc_ifos,
            fname,
        )

        # Now start the thread to run the SNR optimizer
        if upload_checks or optimize_snr_checks:
            thread_args = (
                event, fname, comment, cmd, out_dir_path,
                upload_checks, optimize_snr_checks
            )
            gdb_upload_thread = threading.Thread(target=self.upload_in_thread,
                                                 args=thread_args)
            gdb_upload_thread.start()

    def check_singles(self, results, psds):
        active = [k for k in results if results[k] is not None]
        for ifo in active:
            single = sngl_estimator[ifo].check(
                results[ifo],
                self.data_readers[ifo]
            )

            if single is None:
                continue

            sifar = single['foreground/ifar']
            logging.info(f'Found {ifo} single with ifar {sifar}')

            followup_ifos = [i for i in active if i is not ifo]
            followup_ifos = list(set(followup_ifos) | self.skymap_only_ifos)
            # Don't recompute ifar considering other ifos
            sld = self.compute_followup_data(
                [ifo],
                single,
                followup_ifos=followup_ifos,
                recalculate_ifar=False
            )
            # Apply a trials factor of the number of active detectors
            single['foreground/ifar'] = sifar / len(active)

            event = CandidateForGraceDB(
                [ifo],
                active,
                single,
                gracedb=self.gracedb,
                padata=self.padata,
                bank=self.bank,
                psds=psds,
                skyloc_data=sld,
                low_frequency_cutoff=self.low_frequency_cutoff,
                channel_names=args.channel_name,
                mc_area_args=self.mc_area_args,
            )

            fname = os.path.join(
                self.get_candidate_path(event.merger_time),
                f'single-{ifo}-{event.merger_time:.3f}.xml.gz'
            )
            logging.info('Single-detector candidate! Saving as %s', fname)

            # Which IFOs were active?
            live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]

            # Verbally explain some details not obvious from the other info
            comment = (
                'Trigger produced as a {0} single.<br />'
                'Detectors used for FAR calculation: {0}.<br />'
                'Detectors used for localization: {1}.<br />'
                'Detectors used only for localization: {2}.'
            )
            comment = comment.format(
                ifo,
                ppdets(live_ifos),
                ppdets(set(live_ifos) & self.skymap_only_ifos)
            )

            # Has a coinc event at this time been uploaded recently?
            # If so, skip upload - Note that this means that we _always_
            # prefer an uploaded coincident event over a single
            coinc_tdiffs = abs(event.merger_time -
                               numpy.array(self.last_few_coincs_uploaded))
            nearby_coincs = coinc_tdiffs < 0.1

            if any(nearby_coincs):
                logging.info(
                    "Single-detector candidate at time %.3f was already "
                    "reported as coinc, skipping upload",
                    event.merger_time
                )

            upload_checks = args.enable_single_detector_upload and \
                self.ifar_upload_threshold < single['foreground/ifar'] and \
                not any(nearby_coincs)
            optimize_snr_checks = \
                self.ifar_upload_threshold < single['foreground/ifar'] and \
                not any(nearby_coincs) and \
                self.run_snr_optimization

            # Save the event
            if not upload_checks:
                event.save(fname)

            # Save the event info/data and what _would_ be run for SNR optimizer,
            # even if not running it - do this before the thread so no
            # data buffers move on in a possible interim period

            if any(nearby_coincs):
                # Don't even save the snr optimization stuff for singles
                # where there is already a coinc
                continue

            # Tell SNR optimized event about p_terr
            if hasattr(event, 'p_terr') and event.p_terr is not None:
                single['p_terr'] = event.p_terr

            # Do the optimizer setup
            cmd, out_dir_path = self.setup_optimize_snr(
                single,
                live_ifos,
                [ifo],
                fname,
            )

            if upload_checks or optimize_snr_checks:
                thread_args = (
                    event, fname, comment, cmd, out_dir_path,
                    upload_checks, optimize_snr_checks
                )
                gdb_upload_thread = threading.Thread(target=self.upload_in_thread,
                                                     args=thread_args)
                gdb_upload_thread.start()

    def get_out_dir_path(self, time_gps):
        """Compose and return the path to a directory to store files associated
        with times in a given day (specified via a GPS time).
        Create the directory if it does not exist.
        """
        path = self.path
        if self.use_date_prefix:
            time_dt = Time(time_gps, format='gps', scale='utc').to_datetime()
            path = os.path.join(
                path,
                f'{time_dt.year:04d}_{time_dt.month:02d}_{time_dt.day:02d}'
            )
        makedir(path)
        return path

    def get_candidate_path(self, merger_time):
        """Compose and return the path to a directory to store files associated
        with a particular candidate event. Create the directory if it does not
        exist.
        """
        path = os.path.join(
            self.get_out_dir_path(merger_time),
            f'candidate_{merger_time:.3f}_{pycbc.random_string(4)}'
        )
        makedir(path)
        return path

    def dump(self, results, name, store_psd=False, time_index=None,
             store_loudest_index=False, raw_results=None, gates=None):
        """Save the results from this time block to an hdf output file.
        """
        fname = os.path.join(
            self.get_out_dir_path(time_index),
            name + '.hdf'
        )

        with h5py.File(fname, 'w') as f:
            f.attrs['pycbc_version'] = version.git_verbose_msg
            f.attrs['command_line'] = sys.argv
            f.attrs['num_live_detectors'] = len(self.live_detectors)

            def h5py_unicode_workaround(stuff):
                # workaround for the fact that h5py cannot handle Unicode
                # numpy arrays that show up with Python 3
                if hasattr(stuff, 'dtype') and stuff.dtype.kind == 'U':
                    return [s.encode() for s in stuff]
                return stuff

            for ifo in results:
                for k in results[ifo]:
                    f[f'{ifo}/{k}'] = h5py_unicode_workaround(results[ifo][k])

            for key in raw_results:
                f[key] = h5py_unicode_workaround(raw_results[key])

            if store_loudest_index:
                for ifo in results:
                    if 'snr' in results[ifo]:
                        s = numpy.array(results[ifo]['snr'], ndmin=1)
                        c = numpy.array(results[ifo]['chisq'], ndmin=1)
                        nsnr = numpy.array(newsnr(s, c), ndmin=1) if len(s) > 0 else []

                        # loudest by newsnr
                        nloudest = numpy.argsort(nsnr)[::-1][0:store_loudest_index]

                        # loudest by snr
                        sloudest = numpy.argsort(s)[::-1][0:store_loudest_index]
                        f[ifo]['loudest'] = numpy.union1d(nloudest, sloudest)

            for ifo in (gates or {}):
                gate_dtype = [('center_time', float),
                              ('zero_half_width', float),
                              ('taper_width', float)]
                f[f'{ifo}/gates'] = numpy.array(gates[ifo], dtype=gate_dtype)

        for ifo in (store_psd or {}):
            if store_psd[ifo] is not None:
                store_psd[ifo].save(fname, group=f'{ifo}/psd')


def check_max_length(args, waveforms):
    """Check that the `--max-length` option is sufficient to accomodate the
    longest template in the bank and the PSD estimation options.
    """
    lengths = numpy.array([1.0 / wf.delta_f for wf in waveforms])
    psd_len = args.psd_segment_length * (args.psd_samples // 2 + 1)
    max_length = max(lengths.max() + args.pvalue_lookback_time, psd_len)
    if max_length > args.max_length:
        raise ValueError(
            '--max-length is too short for this template bank. '
            f'Use at least {max_length}.'
        )


parser = argparse.ArgumentParser(description=__doc__)
pycbc.waveform.bank.add_approximant_arg(parser)
parser.add_argument('--verbose', action='count')
parser.add_argument('--version', action='version', version=version.git_verbose_msg)
parser.add_argument('--bank-file', required=True,
                    help="Template bank file in XML or HDF format")
parser.add_argument('--low-frequency-cutoff', help="low frequency cutoff", type=int)
parser.add_argument('--sample-rate', help="output sample rate", type=int)
parser.add_argument('--chisq-bins', help="Number of chisq bins")
parser.add_argument('--analysis-chunk', type=int, required=True,
                        help="Amount of data to produce triggers in a block")

parser.add_argument('--snr-threshold', type=float,
                    help='SNR threshold for generating a trigger')
parser.add_argument('--snr-abort-threshold', type=float)

parser.add_argument('--channel-name', action=MultiDetMultiColonOptionAction,
                    required=True)
parser.add_argument('--state-channel', action=MultiDetMultiColonOptionAction,
                    help="Channel containing frame status information. Used "
                         "to determine when to analyze the hoft data. This somewhat"
                         " corresponds to CAT1 information")
parser.add_argument('--analyze-flags', action=MultiDetOptionAction, nargs='+',
                    help='The flags that must be in the "good" state to analyze data')
parser.add_argument('--idq-channel', action=MultiDetMultiColonOptionAction,
                    help="Channel to read iDQ timeseries from. iDQ timeseries are "
                         "required by certain ranking statistics.")
parser.add_argument('--idq-state-channel', action=MultiDetMultiColonOptionAction,
                    help="Channel containing information about the state of idq."
                         "Used to determine when idq is usable for statistics. ")
parser.add_argument('--idq-threshold', type=float,
                    help='Threshold used to veto triggers at times of '
                    'low iDQ False Alarm Probability')
parser.add_argument('--data-quality-channel',
                    action=MultiDetMultiColonOptionAction,
                    help="Channel containing data quality information. Used "
                         "to determine when hoft may be suspect and may be used to "
                         "veto triggers or not analyze a segment of data. This "
                         "roughly corresponds to CAT2 information")
parser.add_argument('--data-quality-flags', action=MultiDetOptionAction, nargs='+',
                    help='Flags used to determine when to throw triggers away. '
                         'For each detector, give a comma-separated list of flags.')
parser.add_argument('--data-quality-padding', type=float, default=0,
                    help='Time in seconds around a bad dq time to additionally remove '
                         'triggers')
parser.add_argument('--frame-src', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-type', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--force-update-cache', action='store_true')
parser.add_argument('--highpass-frequency', type=float,
                    help="Frequency to apply highpass filtering")
parser.add_argument('--highpass-reduction', type=float,
                    help="DB to reduce low frequencies")
parser.add_argument('--highpass-bandwidth', type=float,
                    help="Width of the highpass turnover region in Hz")
parser.add_argument('--psd-recalculate-difference', type=float, default=.01)
parser.add_argument('--psd-abort-difference', type=float, default=.20)
parser.add_argument('--psd-samples', type=int, required=True,
                    help="Number of PSD segments to use in the rolling estimate")
parser.add_argument('--psd-segment-length', type=int, required=True,
                    help="Length in seconds of each PSD segment")
parser.add_argument('--psd-inverse-length', type=float,
                    help="Length in time for the equivalent FIR filter")
parser.add_argument('--psd-recompute-length', type=positive_int, default=1,
                    help="If given only recompute the PSD after this number of "
                         "analysis chunks, whose length is set by the option "
                         "--analysis-chunk.")
parser.add_argument('--trim-padding', type=float, default=0.25,
                    help="Padding around the overwhitened analysis block")
parser.add_argument("--enable-bank-start-frequency", action='store_true',
                    help="Read the starting frequency of template waveforms"
                         " from the template bank")

parser.add_argument('--autogating-threshold', type=float, metavar='SIGMA',
                    help='If given, find and gate glitches '
                         'producing a deviation larger than '
                         'SIGMA in the whitened strain time '
                         'series.')
parser.add_argument('--autogating-pad', type=float, default=0.5,
                    metavar='SECONDS',
                    help='Ignore the given length of whitened '
                         'strain at the ends of a segment, to '
                         'avoid filters ringing.')
parser.add_argument('--autogating-cluster', type=float, default=1,
                    metavar='SECONDS',
                    help='Length of clustering window for '
                         'detecting glitches for autogating.')
parser.add_argument('--autogating-width', type=float, default=0.25,
                    metavar='SECONDS', help='Half-width of the gating window.')
parser.add_argument('--autogating-taper', type=float, metavar='SECONDS',
                    default=0.25,
                    help='Taper the strain before and after '
                         'each gating window over a duration '
                         'of SECONDS.')
parser.add_argument('--autogating-duration', type=float, default=16,
                    metavar='SECONDS',
                    help='Amount of data in seconds to apply autogating on.')
parser.add_argument('--autogating-psd-segment-length', type=float, default=2,
                    metavar='SECONDS',
                    help='Length in seconds of each segment used to estimate the PSD '
                    'with Welchs method and median averaging.')
parser.add_argument('--autogating-psd-stride', type=float, default=1,
                    metavar='SECONDS',
                    help='Length in seconds of the overlap between each segment used '
                    'to estimate the PSD with Welchs method and median averaging.')

parser.add_argument('--sync', action='store_true',
                    help="Imposes an MPI synchronization at each transfer of"
                         " single-detector triggers. Can help with debugging"
                         " and avoiding memory issues when running offline"
                         " analyses.")
parser.add_argument('--increment-update-cache', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-read-timeout', type=float, default=30)
parser.add_argument('--increment', type=int, default=8, metavar="T",
                    help="When generating the template waveforms, their"
                         " durations are binned such that T is their greatest"
                         " common divisor.")

parser.add_argument('--start-time', type=int, default=None,
                    help='Start the analysis at the given GPS time')
parser.add_argument('--end-time', type=int, default=numpy.inf,
                    help='Stop the analysis at the given GPS time')

parser.add_argument('--output-path', required=True,
                    help='Path to a directory to store results in')
parser.add_argument('--output-status', type=str, metavar='PATH',
                    help='If given, PyCBC Live will periodically write JSON '
                         'status info to PATH, so the analysis can be '
                         'monitored via Nagios')
parser.add_argument('--day-hour-output-prefix', action='store_true')
parser.add_argument('--store-psd', action='store_true')
parser.add_argument('--output-background', type=str, nargs='+',
                    help='Takes a period in seconds and a file path and dumps '
                         'the coinc backgrounds to that path with that period')
parser.add_argument('--output-background-n-loudest', type=int, default=10000,
                    help="If given an integer (assumed positive), it stores loudest n triggers"
                    "(not sorted) for each of the coinc background. If 0, all bkg will be dumped.")

parser.add_argument('--newsnr-threshold', type=float, default=0)
parser.add_argument('--max-batch-size', type=int, default=2**27)
parser.add_argument('--store-loudest-index', type=int, default=0)
parser.add_argument('--max-psd-abort-distance', type=float, default=numpy.inf,
                    help="Safety BNS horizon distance (in Mpc) above which a "
                    "detector's data is discarded.")
parser.add_argument('--min-psd-abort-distance', type=float, default=-numpy.inf,
                    help="Safety BNS horizon distance (in Mpc) below which a "
                    "detector's data is discarded.")
parser.add_argument('--max-triggers-in-batch', type=int, metavar="N",
                    help="Tells each matched-filtering process to only report "
                         "the loudest N single-detector triggers by SNR.")
parser.add_argument('--max-length', type=float,
                    help='Maximum duration of templates, used to set the data buffer size')

parser.add_argument('--enable-profiling', type=int, metavar='RANK',
                    help='Dump out profiling information from the MPI process'
                         ' with given rank at the end of program execution')

parser.add_argument('--enable-background-estimation', default=False, action='store_true')
parser.add_argument('--ifar-double-followup-threshold', type=float, required=True,
                    help='Inverse-FAR threshold to followup double coincs with'
                         'additional detectors')
parser.add_argument('--pvalue-lookback-time', type=float, default=150,
                    metavar='SECONDS',
                    help='Lookback time for the calculation of the p-value in '
                         'followup detectors.')
parser.add_argument('--pvalue-combination-livetime', type=float, required=True,
                    help="Livetime used for p-value combination with followup "
                         "detectors, in years")
parser.add_argument('--enable-gracedb-upload', action='store_true', default=False,
                    help='Upload triggers to GraceDB')
parser.add_argument('--enable-production-gracedb-upload', action='store_true', default=False,
                    help='Do not mark triggers uploaded to GraceDB as test '
                         'events. This option should *only* be enabled in '
                         'production analyses!')
parser.add_argument('--enable-single-detector-upload', action='store_true', default=False,
                    help='Upload single ifo events to GraceDB')
parser.add_argument('--gracedb-server', metavar='URL',
                    help='URL of GraceDB server API for uploading events. '
                         'If not provided, the default URL is used.')
parser.add_argument('--gracedb-search', type=str, default='AllSky',
                    help='String going into the "search" field of the GraceDB '
                         'events')
parser.add_argument('--gracedb-labels', metavar='LABEL', nargs='+',
                    help='Apply the given list of labels to events uploaded '
                         'to GraceDB.')
parser.add_argument('--ifar-upload-threshold', type=float, required=True,
                    help='Inverse-FAR threshold for uploading candidate '
                         'triggers to GraceDB, in years.')
parser.add_argument('--coinc-window-pad', type=float,
                    help="Amount of time allowed to form a coincidence in "
                         "addition to the time of flight in seconds.",
                    default=0.002)
parser.add_argument('--file-prefix', default='Live')

parser.add_argument('--round-start-time', type=int, metavar='X',
                    help="Round up the start time to the nearest multiple of X"
                         " seconds. This is useful for forcing agreement "
                         " with frame file production.")
parser.add_argument('--size-override', type=int, metavar='N',
                    help="Override the internal MPI size layout. "
                         " Useful for debugging and running a portion of a bank")
parser.add_argument('--fftw-planning-limit', type=float,
                    help="Time in seconds to allow for a plan to be created")
parser.add_argument('--run-snr-optimization', action='store_true',
                    help='Run spawned followup processes to maximize SNR for '
                         'any trigger uploaded to GraceDB')
parser.add_argument('--snr-opt-timeout', type=int, default=400, metavar='SECONDS',
                    help='Maximum allowed duration of followup process to maximize SNR')
parser.add_argument('--snr-opt-label', default='SNR_OPTIMIZED',
                    help='Label to apply to snr-optimized GraceDB uploads')
parser.add_argument('--snr-opt-extra-opts',
                    help='Extra options to pass to the optimizer subprocess. Example: '
                        '--snr-opt-extra-opts "--snr-opt-method differential_evolution '
                        '--snr-opt-di-maxiter 50 --snr-opt-di-popsize 100 '
                        '--snr-opt-seed 42 --snr-opt-include-candidate "')

parser.add_argument('--enable-embright-has-massgap', action='store_true', default=False,
                    help='Estimate HasMassGap probability for EMBright info. Lower limit '
                         'of the mass gap is equal to the maximum NS mass used for '
                         'the source classification.')
parser.add_argument('--embright-massgap-max', type=float, default=5.0, metavar='SOLAR MASSES',
                    help='Upper limit of the mass gap, used for estimating '
                         'HasMassGap probability.')
parser.add_argument('--skymap-only-ifos', nargs='+',
                    help="Detectors that only contribute in sky localization")
parser.add_argument('--psd-variation', action='store_true',
                    help="Run the psd variation code to produce psd variation "
                         "values for each single detector triggers found by "
                         "the search. Required when using a single detector "
                         "ranking statistic that includes psd variation.")
parser.add_argument("--statistic-refresh-rate", type=float,
                    help="How often to refresh the statistic object, "
                         "in seconds. If omitted, no refreshing is done.")

scheme.insert_processing_option_group(parser)
LiveSingle.insert_args(parser)
fft.insert_fft_option_group(parser)
Coincer.insert_args(parser)
SingleDetSGChisq.insert_option_group(parser)
mchirp_area.insert_args(parser)
livepau.insert_live_pastro_option_group(parser)

args = parser.parse_args()

scheme.verify_processing_options(args, parser)
fft.verify_fft_options(args, parser)
Coincer.verify_args(args, parser)

if args.output_background is not None and len(args.output_background) != 2:
    parser.error('--output-background takes two parameters: period and path')
if not args.enable_gracedb_upload and args.enable_single_detector_upload:
    parser.error('You are not allowed to enable single ifo upload without the '
                 '--enable-gracedb-upload option!')

# Configure the log messages so that they are prefixed by the timestamp, the
# hostname of the originating node and the MPI rank of the originating process
pycbc.init_logging(
    args.verbose,
    format='%(asctime)s {} {} %(message)s'.format(
        platform.node(), mpi.COMM_WORLD.Get_rank()
    )
)

ctx = scheme.from_cli(args)
fft.from_cli(args)

# Approximant guess of the total padding
valid_pad = args.analysis_chunk
total_pad = args.trim_padding * 2 + valid_pad
lfc = None if args.enable_bank_start_frequency else args.low_frequency_cutoff
bank = waveform.LiveFilterBank(
    args.bank_file,
    args.sample_rate,
    total_pad,
    low_frequency_cutoff=lfc,
    approximant=args.approximant,
    increment=args.increment
)
if bank.min_f_lower < args.low_frequency_cutoff:
    parser.error('--low-frequency-cutoff ({} Hz) must not be larger than the '
                 'minimum f_lower across all templates '
                 '({} Hz)'.format(args.low_frequency_cutoff, bank.min_f_lower))

evnt = LiveEventManager(args, bank)

logging.info('Analyzing data from detectors %s', ppdets(evnt.ifos))
logging.info('Using %s for localization only', ppdets(evnt.skymap_only_ifos))

analyze_singles = LiveSingle.verify_args(args, parser, evnt.trigg_ifos)

# include MPI rank and functional description into proctitle
task_name = 'root' if evnt.rank == 0 else 'filtering'
setproctitle(f'PyCBC Live rank {evnt.rank:d} [{task_name}]')

sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins)

if args.size_override:
    evnt.size = args.size_override

# make sure we can talk to GraceDB
if evnt.rank == 0 and args.enable_gracedb_upload:
    evnt.create_gdb()
    response = evnt.gracedb.ping()
    logging.info('GraceDB ping response: %s %s time elapsed: %s',
            response.status, response.reason, response.elapsed.total_seconds())

# I'm not the root, so do some actual filtering.
with ctx:
    try:
        # Import system wisdom.
        if args.fftw_import_system_wisdom:
            fft.fftw.import_sys_wisdom()

        # Read specified user-provided wisdom files
        if args.fftw_input_float_wisdom_file is not None:
            fft.fftw.import_single_wisdom_from_filename(args.fftw_input_float_wisdom_file)

        if args.fftw_input_double_wisdom_file is not None:
            fft.fftw.import_double_wisdom_from_filename(args.fftw_input_double_wisdom_file)

        if args.fftw_planning_limit:
            fft.fftw.set_planning_limit(args.fftw_planning_limit)
    except (ValueError, RuntimeError) as e:
        print(e)
        exit()

    if evnt.rank > 0:
        bank.table.sort(order='mchirp')
        waveforms = list(bank[evnt.rank-1::evnt.size-1])
        check_max_length(args, waveforms)
        mf = LiveBatchMatchedFilter(
            waveforms,
            args.snr_threshold,
            args.chisq_bins,
            sg_chisq,
            snr_abort_threshold=args.snr_abort_threshold,
            newsnr_threshold=args.newsnr_threshold,
            max_triggers_in_batch=args.max_triggers_in_batch,
            maxelements=args.max_batch_size
        )

    # Synchronize start time if not provided on the command line
    if not args.start_time:
        evnt.barrier()
        tnow = int(pycbc.gps_now()) if evnt.rank == 0 else None
        args.start_time = evnt.comm.bcast(tnow, root=0)

    if args.round_start_time:
        args.start_time = int(args.start_time / args.round_start_time + 1) *\
            args.round_start_time
        logging.info('Starting from: %s', args.start_time)

    # Initialize the data readers for all detectors. For rank 0, we need data
    # from all detectors, including the localization-only ones. For higher
    # ranks, we only need the detectors that can generate candidates.
    data_reader = {
        ifo: StrainBuffer.from_cli(ifo, args)
        for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos)
    }
    evnt.data_readers = data_reader

    # create single-detector background "estimators"
    if analyze_singles and evnt.rank == 0:
        sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo)
                          for ifo in evnt.trigg_ifos}
        for estim in sngl_estimator.values():
            estim.start_refresh_thread()

    # Create double coincident background estimator
    # for every pair of triggering interferometers
    if args.enable_background_estimation and evnt.rank == 0:
        ifo_combos = itertools.combinations(evnt.trigg_ifos, 2)
        estimators = []
        for combo in ifo_combos:
            logging.info('Will calculate %s background', ppdets(combo, "-"))
            estimators.append(Coincer.from_cli(
                args, len(bank), args.analysis_chunk, list(combo)
            ))

        my_coinc_id = 999999
        def set_coinc_id(i):
            global my_coinc_id
            my_coinc_id = i
            c = estimators[my_coinc_id]
            setproctitle(
                'PyCBC Live {} bg estimator'.format(ppdets(c.ifos, '-'))
            )

        def estimator_refresh_threads(_):
            c = estimators[my_coinc_id]
            c.start_refresh_thread()

        def get_coinc(results):
            c = estimators[my_coinc_id]
            r = c.add_singles(results)
            logging.info('Coincs %i: %s-%s: %s in cbuffer', my_coinc_id,
                         c.ifos[0], c.ifos[1], c.coincs.index)
            return r

        def output_background(_):
            estim = estimators[my_coinc_id]
            bg_time = conv.sec_to_year(estim.background_time)
            return estim.ifos, estim.coincs.data, bg_time

        coinc_pool = BroadcastPool(len(estimators))
        coinc_pool.allmap(set_coinc_id, range(len(estimators)))
        coinc_pool.broadcast(estimator_refresh_threads, None)

    logging.info('Starting')

    if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
        pr = cProfile.Profile()
        pr.enable()

    # main analysis loop
    data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
    last_bg_dump_time = int(data_end())
    psd_count = {ifo:0 for ifo in evnt.ifos}

    # Create dicts to track whether the psd has been recalculated and to hold
    #  psd variation filters
    psd_recalculated = {
        ifo: True for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos)
    }
    psd_var_filts = {ifo: None for ifo in evnt.trigg_ifos}

    while data_end() < args.end_time:
        t1 = pycbc.gps_now()
        logging.info('Analyzing from %s', data_end())

        results = {}
        evnt.live_detectors = set()

        for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos):
            results[ifo] = False
            status = data_reader[ifo].advance(
                valid_pad,
                timeout=args.frame_read_timeout
            )
            if status and psd_count[ifo] == 0:
                status = data_reader[ifo].recalculate_psd()
                # If the psd has been recalculated then we need a new
                #  filter for psd variation calculation
                psd_recalculated[ifo] = True
                psd_count[ifo] = args.psd_recompute_length - 1
            elif not status:
                psd_count[ifo] = 0
            else:
                psd_count[ifo] -= 1

            status &= data_reader[ifo].check_psd_dist(
                args.min_psd_abort_distance,
                args.max_psd_abort_distance
            )

            if status is True:
                if ifo not in evnt.skymap_only_ifos:
                    evnt.live_detectors.add(ifo)
                    if evnt.rank > 0:
                        logging.info('Filtering %s', ifo)
                        results[ifo] = mf.process_data(data_reader[ifo])
            else:
                logging.info('Insufficient data for %s analysis', ifo)

        if evnt.rank > 0:
            evnt.commit_results((results, data_end()))
        else:
            psds = {ifo: data_reader[ifo].psd for ifo in data_reader
                         if data_reader[ifo].psd is not None}

            # Collect together the single detector triggers
            if evnt.size > 1:
                results, valid_end = evnt.gather_results()

            # veto detectors with different state between the root
            # and worker nodes (e.g. late frame files on one node only)
            detectors_with_results = set(results.keys())
            evnt.live_detectors.intersection_update(detectors_with_results)
            for ifo in detectors_with_results - evnt.live_detectors:
                results.pop(ifo)

            # Veto single detector triggers that fail the DQ vector
            # or have iDQ value below the threshold
            for ifo in results:
                if data_reader[ifo].dq is not None:
                    logging.info("Checking %s's DQ vector", ifo)
                    start = data_reader[ifo].start_time
                    times = results[ifo]['end_time']
                    idx = data_reader[ifo].dq.indices_of_flag(
                            start, valid_pad, times,
                            padding=data_reader[ifo].dq_padding)
                    logging.info('Keeping %d/%d %s triggers after DQ flags',
                                 len(idx), len(times), ifo)
                    for key in results[ifo]:
                        if len(results[ifo][key]):
                            results[ifo][key] = results[ifo][key][idx]
                if data_reader[ifo].idq is not None:
                    logging.info("Checking %s's iDQ information", ifo)
                    start = data_reader[ifo].start_time
                    times = results[ifo]['end_time']
                    idx = data_reader[ifo].idq.indices_of_flag(
                            start, valid_pad, times,
                            padding=data_reader[ifo].dq_padding)
                    logging.info('Keeping %d/%d %s triggers after iDQ',
                                 len(idx), len(times), ifo)
                    for key in results[ifo]:
                        if len(results[ifo][key]):
                            results[ifo][key] = results[ifo][key][idx]

            # Calculate and add the psd variation for the results
            if args.psd_variation:
                for ifo in results:
                    logging.info("Calculating PSD Variation Statistic for %s", ifo)

                    # A new filter is needed if the PSD has been recalculated
                    if psd_recalculated[ifo] is True:
                        psd_var_filts[ifo] = variation.live_create_filter(
                            data_reader[ifo].psd,
                            args.psd_segment_length,
                            int(args.sample_rate)
                        )
                        psd_recalculated[ifo] = False

                    psd_var_ts = variation.live_calc_psd_variation(
                        data_reader[ifo].strain,
                        psd_var_filts[ifo],
                        args.increment
                    )

                    psd_var_vals = variation.live_find_var_value(
                        results[ifo], psd_var_ts
                    )

                    results[ifo]['psd_var_val'] = psd_var_vals

            # Look for coincident triggers and do background estimation
            if args.enable_background_estimation:
                coinc_results = coinc_pool.broadcast(get_coinc, results)

                # Pick the best coinc in this chunk
                best_coinc = Coincer.pick_best_coinc(coinc_results)

                evnt.check_coincs(list(results.keys()), best_coinc, psds)

            # Check for singles
            if analyze_singles:
                evnt.check_singles(results, psds)

            gates = {ifo: data_reader[ifo].gate_params for ifo in data_reader}

            # map the results file to an hdf file
            prefix = '{}-{}-{}-{}'.format(''.join(sorted(evnt.ifos)),
                                          args.file_prefix,
                                          data_end() - args.analysis_chunk,
                                          valid_pad)

            evnt.dump(results, prefix, time_index=data_end(),
                      store_psd=(psds if args.store_psd else False),
                      store_loudest_index=args.store_loudest_index,
                      raw_results=best_coinc, gates=gates)

            # dump the background if needed
            if args.output_background and \
                    data_end() - last_bg_dump_time > float(args.output_background[0]):
                last_bg_dump_time = int(data_end())
                bg_dists = coinc_pool.broadcast(output_background, None)
                bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(
                    ''.join(sorted(evnt.trigg_ifos)), last_bg_dump_time
                )
                bg_fn = os.path.join(args.output_background[1], bg_fn)
                with h5py.File(bg_fn, 'w') as bgf:
                    for bg_ifos, bg_data, bg_time in bg_dists:
                        if args.output_background_n_loudest and \
                                args.output_background_n_loudest < (len(bg_data) - 1):
                            n_loudest = args.output_background_n_loudest
                            assert (n_loudest > 0), \
                                "We can only store positive int loudest triggers."
                            ds = bgf.create_dataset(
                                ','.join(sorted(bg_ifos)),
                                data=-numpy.partition(-bg_data, n_loudest)[:n_loudest],
                                compression='gzip'
                            )
                        else:
                            ds = bgf.create_dataset(
                                ','.join(sorted(bg_ifos)),
                                data=bg_data,
                                compression='gzip'
                            )
                        ds.attrs['background_time'] = bg_time
                    bgf.attrs['gps_time'] = last_bg_dump_time

            logging.info('Finished analyzing up to %s', data_end())

        if args.sync:
            evnt.barrier()
        tdiff = pycbc.gps_now() - t1
        lag = float(pycbc.gps_now() - data_end())
        logging.info('Took %1.2f, duty factor of %.2f, lag %.2f s, %d live detectors',
                     tdiff, tdiff / valid_pad, lag, len(evnt.live_detectors)
        )

        if args.output_status is not None and evnt.rank == 0:
            if lag > 120:
                status_intervals = [{'num_status': 2,
                                     'txt_status': 'CRITICAL: lag greater than 2 min',
                                     'start_sec': 0}]
            else:
                status_intervals = [{'num_status': 0,
                                     'txt_status': 'OK: No reported problems',
                                     'start_sec': 0},
                                    {'num_status': 1,
                                     'txt_status': 'WARNING: last report between 2 and 4 min ago',
                                     'start_sec': 120}]
            status_intervals.append({'num_status': 2,
                                     'txt_status': 'CRITICAL: last report more than 4 min ago',
                                     'start_sec': 240})
            status = {'author': 'Tito Dal Canton',
                      'email': 'tito.canton@ligo.org',
                      'created_gps': int(pycbc.gps_now()),
                      'status_intervals': status_intervals}
            try:
                with open(args.output_status, 'w') as status_fp:
                    json.dump(status, status_fp)
            except IOError:
                logging.error('I/O error writing status JSON file!')

if evnt.rank == 1:
    if args.fftw_output_float_wisdom_file:
        fft.fftw.export_single_wisdom_to_filename(args.fftw_output_float_wisdom_file)

    if args.fftw_output_double_wisdom_file:
        fft.fftw.export_double_wisdom_to_filename(args.fftw_output_double_wisdom_file)

if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
    pr.dump_stats(f'profiling_rank_{evnt.rank:03d}')

logging.info("Exiting as the end time has been reached")
