#!/usr/bin/python3.13
"""Dummy fence agent for testing."""

__copyright__ = "Copyright 2012-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"

import io
import os
import re
import sys
import time
import random
import atexit
import getopt
import contextlib

AGENT_VERSION = "4.1.0"
OCF_VERSION = "1.0"
SHORT_DESC = "Dummy fence agent"
LONG_DESC = """fence_dummy is a fake fencing agent which reports success
based on its mode (pass|fail|random) without doing anything."""

# Short options used: difhmnoqsvBDHMRUV
ALL_OPT = {
    "quiet": {
        "getopt": "q",
        "help": "",
        "order": 50
    },
    "verbose": {
        "getopt": "v",
        "longopt": "verbose",
        "help": "-v, --verbose                  Verbose mode",
        "required": "0",
        "shortdesc": "Verbose mode",
        "order": 51
    },
    "debug": {
        "getopt": "D:",
        "longopt": "debug-file",
        "help": "-D, --debug-file=[debugfile]   Debugging to output file",
        "required": "0",
        "shortdesc": "Write debug information to given file",
        "order": 52
    },
    "version": {
        "getopt": "V",
        "longopt": "version",
        "help": "-V, --version                  Display version information and exit",
        "required": "0",
        "shortdesc": "Display version information and exit",
        "order": 53
    },
    "help": {
        "getopt": "h",
        "longopt": "help",
        "help": "-h, --help                     Display this help and exit",
        "required": "0",
        "shortdesc": "Display help and exit",
        "order": 54
    },
    "action": {
        "getopt": "o:",
        "longopt": "action",
        "help": "-o, --action=[action]          Action: validate-all, status, list, reboot (default), off or on",
        "required": "1",
        "shortdesc": "Fencing Action",
        "default": "reboot",
        "order": 1
    },
    "nodename": {
        "getopt": "N:",
        "longopt": "nodename",
        "help": "-N, --nodename                 Node name of fence target (ignored)",
        "required": "0",
        "shortdesc": "The node name of fence target (ignored)",
        "order": 2
    },
    "mode": {
        "getopt": "M:",
        "longopt": "mode",
        "required": "0",
        "help": "-M, --mode=(pass|fail|random)  Exit status to return for non-monitor operations",
        "shortdesc": "Whether fence operations should always pass, always fail, or fail at random",
        "order": 3
    },
    "monitor_mode": {
        "getopt": "m:",
        "longopt": "monitor_mode",
        "help": "-m, --monitor_mode=(pass|fail|random) Exit status to return for monitor operations",
        "required": "0",
        "shortdesc": "Whether monitor operations should always pass, always fail, or fail at random",
        "order": 3
    },
    "random_sleep_range": {
        "getopt": "R:",
        "required": "0",
        "longopt": "random_sleep_range",
        "help": "-R, --random_sleep_range=[seconds] Sleep between 1 and [seconds] before returning",
        "shortdesc": "Wait randomly between 1 and [seconds]",
        "order": 3
    },
    "mock_dynamic_hosts": {
        "getopt": "H:",
        "longopt": "mock_dynamic_hosts",
        "help": "-H, --mock_dynamic_hosts=[list] What to return when dynamically queried for possible targets",
        "required": "0",
        "shortdesc": "A list of hosts we can fence",
        "order": 3
    },
    "delay": {
        "getopt": "f:",
        "longopt": "delay",
        "help": "-f, --delay [seconds]          Wait X seconds before fencing is started",
        "required": "0",
        "shortdesc": "Wait X seconds before fencing is started",
        "default": "0",
        "order": 3
    },
    "monitor_delay": {
        "getopt": "d:",
        "longopt": "monitor_delay",
        "help": "-d, --monitor_delay [seconds]  Wait X seconds before monitor completes",
        "required": "0",
        "shortdesc": "Wait X seconds before monitor completes",
        "default": "0",
        "order": 3
    },
    "off_delay": {
        "getopt": "F:",
        "longopt": "off_delay",
        "help": "-F, --off_delay [seconds]      Wait additional X seconds before off action",
        "required": "0",
        "shortdesc": "Wait additional X seconds before off action",
        "default": "0",
        "order": 3
    },
    "plug": {
        "getopt": "n:",
        "longopt": "plug",
        "help": "-n, --plug=[id]                Physical plug number on device (ignored)",
        "required": "1",
        "shortdesc": "Ignored",
        "order": 4
    },
    "port": {
        "getopt": "n:",
        "longopt": "plug",
        "help": "-n, --plug=[id]                Physical plug number on device (ignored)",
        "required": "1",
        "shortdesc": "Ignored",
        "order": 4
    },
    "switch": {
        "getopt": "s:",
        "longopt": "switch",
        "help": "-s, --switch=[id]              Physical switch number on device (ignored)",
        "required": "0",
        "shortdesc": "Ignored",
        "order": 4
    },
    "uuid": {
        "getopt": "U:",
        "longopt": "uuid",
        "help": "-U, --uuid                     UUID of the VM to fence (ignored)",
        "required": "0",
        "shortdesc": "Ignored",
        "order": 4
    }
}


def agent():
    """Return name this file was run as."""
    return os.path.basename(sys.argv[0])


def fail_usage(message):
    """Print a usage message and exit."""
    sys.exit("%s\nPlease use '-h' for usage" % message)


def show_docs(options, auto_unfence, no_reboot, no_on):
    """Handle informational options (display info and exit)."""
    device_opt = options["device_opt"]

    if "-h" in options:
        usage(device_opt)
        sys.exit(0)

    if "-o" in options and options["-o"].lower() == "metadata":
        f = "%s.fail" % __file__

        if not os.path.exists(f):
            metadata(device_opt, options, auto_unfence, no_reboot, no_on)
        else:
            os.remove(f)

        sys.exit(0)

    if "-V" in options:
        print(AGENT_VERSION)
        sys.exit(0)


def sorted_options(avail_opt):
    """Return a list of all options, in their internally specified order."""
    sorted_list = [(key, ALL_OPT[key]) for key in avail_opt]
    sorted_list.sort(key=lambda x: x[1]["order"])
    return sorted_list


def usage(avail_opt):
    """Print a usage message."""
    print("Usage:")
    print("\t%s [options]" % agent())
    print("Options:")

    for (_, value) in sorted_options(avail_opt):
        if len(value["help"]) != 0:
            print("   %s" % value["help"])


def metadata(avail_opt, options, auto_unfence, no_reboot, no_on):
    """Print agent metadata."""
    # This log is just for testing handling of stderr output
    print("asked for fence_dummy metadata", file=sys.stderr)

    print("""<?xml version="1.0" ?>
<resource-agent name="%s" shortdesc="%s" version="%s">
  <version>%s</version>
  <longdesc>%s</longdesc>
  <parameters>""" % (agent(), SHORT_DESC, AGENT_VERSION, OCF_VERSION, LONG_DESC))

    for (option, _) in sorted_options(avail_opt):
        if "shortdesc" not in ALL_OPT[option]:
            continue

        print('    <parameter name="%s" unique="0" required="%s">' %
              (option, ALL_OPT[option]["required"]))

        default = ""
        default_name_arg = "-%s" % ALL_OPT[option]["getopt"][:-1]
        default_name_no_arg = "-%s" % ALL_OPT[option]["getopt"]

        if "default" in ALL_OPT[option]:
            default = 'default="%s"' % ALL_OPT[option]["default"]
        elif options.get(default_name_arg) is not None:
            try:
                default = 'default="%s"' % options[default_name_arg]
            except TypeError:
                # @todo/@note: Currently there is no clean way how to handle lists
                # we can create a string from it but we can't set it on command line
                default = 'default="%s"' % str(options[default_name_arg])
        elif default_name_no_arg in options:
            default = 'default="true"'

        mixed = ALL_OPT[option]["help"]
        # split it between option and help text
        res = re.compile(r"^(.*--\S+)\s+", re.IGNORECASE | re.S).search(mixed)
        if res is not None:
            mixed = res.group(1)
        mixed = mixed.replace("<", "&lt;").replace(">", "&gt;")
        print('      <getopt mixed="%s" />' % mixed)

        if ALL_OPT[option]["getopt"].count(":") > 0:
            print('      <content type="string" %s />' % default)
        else:
            print('      <content type="boolean" %s />' % default)

        print('      <shortdesc lang="en">%s</shortdesc>' % ALL_OPT[option]["shortdesc"])
        print('    </parameter>')

    print('  </parameters>\n  <actions>')
    if not no_on:
        if auto_unfence:
            attr_name = 'automatic'
        else:
            attr_name = 'on_target'
        print('    <action name="on" %s="1" />' % attr_name)
    print('    <action name="off" />')
    if not no_reboot:
        print('    <action name="reboot" />')
    print('    <action name="status" />')
    print('    <action name="monitor" />')
    print('    <action name="metadata" />')
    print('    <action name="list" />')
    print('  </actions>')
    print('</resource-agent>')


def option_longopt(option):
    """Return the getopt-compatible long-option name of the given option."""
    if ALL_OPT[option]["getopt"].endswith(":"):
        return ALL_OPT[option]["longopt"] + "="

    return ALL_OPT[option]["longopt"]


def opts_from_command_line(argv, avail_opt):
    """Read options from command-line arguments."""
    # Prepare list of options for getopt
    getopt_string = ""
    longopt_list = []
    for k in avail_opt:
        if k in ALL_OPT:
            getopt_string += ALL_OPT[k]["getopt"]
        else:
            fail_usage("Parse error: unknown option '%s'" % k)

        if k in ALL_OPT and "longopt" in ALL_OPT[k]:
            longopt_list.append(option_longopt(k))

    try:
        (opt, _) = getopt.gnu_getopt(argv, getopt_string, longopt_list)
    except getopt.GetoptError as error:
        fail_usage("Parse error: %s" % error.msg)

    # Transform longopt to short one which are used in fencing agents
    old_opt = opt
    opt = {}
    for old_option in dict(old_opt):
        if old_option.startswith("--"):
            for rec in ALL_OPT.values():
                if rec.get("longopt") is None:
                    continue

                long = "--%s" % rec["longopt"]

                if long == old_option:
                    short = "-%s" % rec["getopt"][0]
                    opt[short] = dict(old_opt)[old_option]
        else:
            opt[old_option] = dict(old_opt)[old_option]

    # Compatibility Layer (with what? probably not needed for fence_dummy)
    new_opt = dict(opt)
    if "-T" in new_opt:
        new_opt["-o"] = "status"
    if "-n" in new_opt:
        new_opt["-m"] = new_opt["-n"]
    opt = new_opt

    return opt


def opts_from_stdin(avail_opt):
    """Read options from standard input."""
    opt = {}
    name = ""
    for line in sys.stdin.readlines():
        line = line.strip()
        if line.startswith("#") or (len(line) == 0):
            continue

        (name, value) = (line + "=").split("=", 1)
        value = value[:-1]

        # Compatibility Layer (with what? probably not needed for fence_dummy)
        if name == "option":
            name = "action"

        if name not in avail_opt:
            print("Parse error: Ignoring unknown option '%s'" % line,
                  file=sys.stderr)
            continue

        if ALL_OPT[name]["getopt"].endswith(":"):
            short = "-%s" % ALL_OPT[name]["getopt"][0]
            opt[short] = value
        elif value.lower() in ["1", "yes", "on", "true"]:
            short = "-%s" % ALL_OPT[name]["getopt"]
            opt[short] = "1"

    return opt


def process_input(avail_opt):
    """Set standard environment variables, and parse all options."""
    # Set standard environment
    os.putenv("LANG", "C")
    os.putenv("LC_ALL", "C")

    # Read options from command line or standard input
    if len(sys.argv) > 1:
        return opts_from_command_line(sys.argv[1:], avail_opt)

    return opts_from_stdin(avail_opt)


def atexit_handler():
    """Close stdout on exit."""
    try:
        sys.stdout.close()
        os.close(1)
    except IOError:
        sys.exit("%s failed to close standard output" % agent())


def success_mode(options, option, default_value):
    """Return exit code specified by option."""
    if option in options:
        test_value = options[option]
    else:
        test_value = default_value

    if test_value == "pass":
        exitcode = 0
    elif test_value == "fail":
        exitcode = 1
    else:
        exitcode = random.randint(0, 1)

    return exitcode


def write_options(options):
    """Write out all options to debug file."""
    with contextlib.suppress(IOError):
        with io.open(options["-D"], "at", encoding="utf-8") as debugfile:
            debugfile.write("### %s ###\n" % time.strftime("%Y-%m-%d %H:%M:%S"))
            for option in sorted(options):
                debugfile.write("%s=%s\n" % (option, options[option]))
            debugfile.write("###\n")


def main():
    """Run the dummy fencing agent."""
    auto_unfence = False
    no_reboot = False
    no_on = False

    # Meta-data can't take parameters, so we simulate different meta-data
    # behavior based on the executable name (which can be a symbolic link).
    if sys.argv[0].endswith("_auto_unfence"):
        auto_unfence = True
    elif sys.argv[0].endswith("_no_reboot"):
        no_reboot = True
    elif sys.argv[0].endswith("_no_on"):
        no_on = True

    device_opt = ALL_OPT.keys()

    # Defaults for fence agent
    atexit.register(atexit_handler)
    options = process_input(device_opt)
    options["device_opt"] = device_opt
    show_docs(options, auto_unfence, no_reboot, no_on)

    action = options.get("-o", "reboot")

    # dump input to file
    if "-D" in options and action != "validate-all":
        write_options(options)

    if "-f" in options and action != "validate-all":
        val = int(options["-f"])
        print("delay sleep for %d seconds" % val, file=sys.stderr)
        time.sleep(val)

    # random sleep for testing
    if "-R" in options and action != "validate-all":
        val = int(options["-R"])
        ran = random.randint(1, val)
        print("random sleep for %d seconds" % ran, file=sys.stderr)
        time.sleep(ran)

    if action == "monitor":
        if "-d" in options:
            time.sleep(int(options["-d"]))
        exitcode = success_mode(options, "-m", "pass")

    elif action == "list":
        print("fence_dummy action (list) called", file=sys.stderr)
        if "-H" in options:
            print(options["-H"])
            exitcode = 0
        else:
            print("dynamic hostlist requires mock_dynamic_hosts to be set",
                  file=sys.stderr)
            exitcode = 1

    elif action == "validate-all":
        if "-f" in options:
            val = int(options["-f"])
            if val > 10:
                exitcode = 1
            else:
                exitcode = 0
        else:
            exitcode = 1

    elif action == "off":
        if "-F" in options:
            time.sleep(int(options["-F"]))
        exitcode = success_mode(options, "-M", "random")

    else:
        exitcode = success_mode(options, "-M", "random")

    # Ensure we generate some error output on failure exit.
    if exitcode == 1:
        print("simulated %s failure" % action, file=sys.stderr)

    sys.exit(exitcode)


if __name__ == "__main__":
    main()

# vim: set filetype=python:
