#!/usr/bin/python3.13
"""Regression tests for Pacemaker's attribute daemon."""

# pylint doesn't like the module name "cts-attrd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position

__copyright__ = "Copyright 2023-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"

import argparse
import os
import subprocess
import sys
import tempfile

# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("/home/abuild/rpmbuild/BUILD/pacemaker-3.0.1+20251208.f7f28ab3-build/pacemaker-3.0.1+20251208.f7f28ab3/python"):
    sys.path.insert(0, "/home/abuild/rpmbuild/BUILD/pacemaker-3.0.1+20251208.f7f28ab3-build/pacemaker-3.0.1+20251208.f7f28ab3/python")

# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("/home/abuild/rpmbuild/BUILD/pacemaker-3.0.1+20251208.f7f28ab3-build/pacemaker-3.0.1+20251208.f7f28ab3/python") and "/home/abuild/rpmbuild/BUILD/pacemaker-3.0.1+20251208.f7f28ab3-build/pacemaker-3.0.1+20251208.f7f28ab3" != "/home/abuild/rpmbuild/BUILD/pacemaker-3.0.1+20251208.f7f28ab3-build/pacemaker-3.0.1+20251208.f7f28ab3":
    sys.path.insert(0, "/home/abuild/rpmbuild/BUILD/pacemaker-3.0.1+20251208.f7f28ab3-build/pacemaker-3.0.1+20251208.f7f28ab3/python")

from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
from pacemaker._cts.environment import set_cts_path
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests


class AttributeTest(Test):
    """Executor for a single test."""

    def __init__(self, name, description, **kwargs):
        """
        Create a new AttributeTest instance.

        Arguments:
        name        -- A unique name for this test.  This can be used on the
                       command line to specify that only a specific test should
                       be executed.
        description -- A meaningful description for the test.
        """
        Test.__init__(self, name, description, **kwargs)

        self._daemon_location = "pacemaker-attrd"
        self._enable_corosync = True

    def _kill_daemons(self):
        killall([self._daemon_location])

    def _start_daemons(self):
        if self.verbose:
            print(f"Starting {self._daemon_location}")

        cmd = [self._daemon_location, "-s", "-l", self.logpath]
        # pylint: disable=consider-using-with
        self._daemon_process = subprocess.Popen(cmd)


class AttributeTests(Tests):
    """Collection of all attribute regression tests."""

    def __init__(self, **kwargs):
        """Create a new AttributeTests instance."""
        Tests.__init__(self, **kwargs)

        self._corosync = Corosync(self.verbose, self.logdir, "cts-attrd")

    def new_test(self, name, description):
        """Create a named test."""
        test = AttributeTest(name, description, verbose=self.verbose, logdir=self.logdir)
        self._tests.append(test)
        return test

    def setup_environment(self, use_corosync):
        """Prepare the host before executing any tests."""
        if use_corosync:
            self._corosync.start(kill_first=True)

    def cleanup_environment(self, use_corosync):
        """Clean up the host after executing desired tests."""
        if use_corosync:
            self._corosync.stop()

    def build_basic_tests(self):
        """Add basic tests - setting, querying, updating, and deleting attributes."""
        test = self.new_test("set_attr_1",
                             "Set and query an attribute")
        test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
                     stdout_match='name="AAA" value="111"')
        test.add_cmd("attrd_updater", args="--name AAA -Q",
                     stdout_match='name="AAA" host="[^"]+" value="111"',
                     validate=False)
        test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
                             regex=True)

        # Setting the delay on an attribute that doesn't exist fails, but the failure is
        # not passed back to attrd_updater.
        test = self.new_test("set_attr_2",
                             "Set an attribute's delay")
        test.add_cmd("attrd_updater", args="--name AAA -Y -d 5 --output-as=xml")
        test.add_log_pattern(r"Processed update-delay request from client .*: Error \(Attribute AAA does not exist\)",
                             regex=True)

        test = self.new_test("set_attr_3",
                             "Set and query an attribute's delay and value")
        test.add_cmd("attrd_updater", args="--name AAA -B 111 -d 5 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
                     stdout_match='name="AAA" value="111"')
        test.add_cmd("attrd_updater", args="--name AAA -Q",
                     stdout_match='name="AAA" host="[^"]+" value="111"',
                     validate=False)
        test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111 \| from .* with 5s write delay",
                             regex=True)

        test = self.new_test("set_attr_4",
                             "Update an attribute that does not exist with a delay")
        test.add_cmd("attrd_updater", args="--name BBB -U 999 -d 10 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
                     stdout_match='name="BBB" value="999"')
        test.add_cmd("attrd_updater", args="--name BBB -Q",
                     stdout_match='name="BBB" host="[^"]+" value="999"',
                     validate=False)
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 999 \| from .* with 10s write delay",
                             regex=True)

        test = self.new_test("update_attr_1",
                             "Update an attribute that already exists")
        test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -U 333 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
                     stdout_match='name="BBB" value="333"')
        test.add_cmd("attrd_updater", args="--name BBB -Q",
                     stdout_match='name="BBB" host="[^"]+" value="333"',
                     validate=False)
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
                             regex=True)
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
                             regex=True)

        test = self.new_test("update_attr_2",
                             "Update an attribute using a delay other than its default")
        test.add_cmd("attrd_updater", args="--name BBB -U 777 -d 10 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -U 888 -d 7 --output-as=xml")
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 777 -> 888 \| from .* with 10s write delay",
                             regex=True)

        test = self.new_test("update_attr_delay_1",
                             "Update the delay of an attribute that already exists")
        test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -Y -d 5 --output-as=xml")
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
                             regex=True)
        test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")

        test = self.new_test("update_attr_delay_2",
                             "Update the delay and value of an attribute that already exists")
        test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -B 333 -d 5 --output-as=xml")
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
                             regex=True)
        test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
        test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
                             regex=True)

        test = self.new_test("missing_attr_1",
                             "Query an attribute that does not exist")
        test.add_cmd("attrd_updater", args="--name NOSUCH --output-as=xml",
                     expected_exitcode=ExitStatus.CONFIG)

        test = self.new_test("delete_attr_1",
                             "Delete an existing attribute")
        test.add_cmd("attrd_updater", args="--name CCC -U 444 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name CCC -D --output-as=xml")
        test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: \(unset\) -> 444",
                             regex=True)
        test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: 444 -> \(unset\)",
                             regex=True)

        test = self.new_test("missing_attr_2",
                             "Delete an attribute that does not exist")
        test.add_cmd("attrd_updater", args="--name NOSUCH2 -D --output-as=xml")

        test = self.new_test("attr_in_set_1",
                             "Set and query an attribute in a specific set")
        test.add_cmd("attrd_updater", args="--name DDD -U 555 --set=foo --output-as=xml")
        test.add_cmd("attrd_updater", args="--name DDD -Q --output-as=xml",
                     stdout_match='name="DDD" value="555"')
        test.add_cmd("attrd_updater", args="--name DDD -Q",
                     stdout_match='name="DDD" host="[^"]+" value="555"',
                     validate=False)
        test.add_log_pattern("Processed 1 private change for DDD (set foo)")

    def build_multiple_query_tests(self):
        """Add tests that set and query an attribute across multiple nodes."""
        # NOTE:  These tests make use of the fact that nothing in attrd actually
        # cares about whether a node exists when you set or query an attribute.
        # It just keeps creating new hash tables for each node you ask it about.

        test = self.new_test("multi_query_1",
                             "Query an attribute set across multiple nodes")
        test.add_cmd("attrd_updater", args="--name AAA -U 111 --node cluster1 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -U 222 --node cluster2 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
                     stdout_match=r'<attribute name="AAA" value="111" host="cluster1"/>\n.*<attribute name="AAA" value="222" host="cluster2"/>')
        test.add_cmd("attrd_updater", args="--name AAA -QA",
                     stdout_match='name="AAA" host="cluster1" value="111"\nname="AAA" host="cluster2" value="222"',
                     validate=False)
        test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1 --output-as=xml",
                     stdout_match='<attribute name="AAA" value="111" host="cluster1"/>')
        test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1",
                     stdout_match='name="AAA" host="cluster1" value="111"',
                     validate=False)
        test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
                     stdout_match='<attribute name="AAA" value="222" host="cluster2"/>')
        test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2",
                     stdout_match='name="AAA" host="cluster2" value="222"',
                     validate=False)
        test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
                     stdout_match=r'<attribute name="AAA" value="111" host="cluster1"/>\n.*<attribute name="AAA" value="222" host="cluster2"/>',
                     env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
        test.add_cmd("attrd_updater", args="--name AAA -QA",
                     stdout_match='name="AAA" host="cluster1" value="111"\nname="AAA" host="cluster2" value="222"',
                     validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
        test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
                     stdout_match='<attribute name="AAA" value="111" host="cluster1"/>',
                     env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
        test.add_cmd("attrd_updater", args="--name AAA -Q",
                     stdout_match='name="AAA" host="cluster1" value="111"',
                     validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
        test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
                     stdout_match='<attribute name="AAA" value="222" host="cluster2"/>',
                     env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
        test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2",
                     stdout_match='name="AAA" host="cluster2" value="222"',
                     validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})

    def build_regex_tests(self):
        """Add tests that use regexes."""
        test = self.new_test("regex_update_1",
                             "Update attributes using a regex")
        test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name ABB -U 222 --output-as=xml")
        test.add_cmd("attrd_updater", args="-P 'A.*' -U 333 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
                     stdout_match='name="AAA" value="333"')
        test.add_cmd("attrd_updater", args="--name ABB -Q --output-as=xml",
                     stdout_match='name="ABB" value="333"')
        test.add_cmd("attrd_updater", args="--name AAA -Q",
                     stdout_match='name="AAA" host="[^"]+" value="333"',
                     validate=False)
        test.add_cmd("attrd_updater", args="--name ABB -Q",
                     stdout_match='name="ABB" host="[^"]+" value="333"',
                     validate=False)
        test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
                             regex=True)
        test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: \(unset\) -> 222",
                             regex=True)
        test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: 222 -> 333",
                             regex=True)
        test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: 111 -> 333",
                             regex=True)

        test = self.new_test("regex_delete_1",
                             "Delete attributes using a regex")
        test.add_cmd("attrd_updater", args="--name XAX -U 444 --output-as=xml")
        test.add_cmd("attrd_updater", args="--name XBX -U 555 --output-as=xml")
        test.add_cmd("attrd_updater", args="-P 'X[A|B]X' -D --output-as=xml")
        test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: \(unset\) -> 444",
                             regex=True)
        test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: \(unset\) -> 555",
                             regex=True)
        test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: 555 -> \(unset\)",
                             regex=True)
        test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: 444 -> \(unset\)",
                             regex=True)

    def build_utilization_tests(self):
        """Add tests that involve utilization attributes."""
        test = self.new_test("utilization_1",
                             "Set and query a utilization attribute")
        test.add_cmd("attrd_updater", args="--name AAA -U ABC -z --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
                     stdout_match='name="AAA" value="ABC"')
        test.add_cmd("attrd_updater", args="--name AAA -Q",
                     stdout_match='name="AAA" host="[^"]+" value="ABC"',
                     validate=False)
        test.add_log_pattern(r"Setting AAA\[.*\] in utilization: \(unset\) -> ABC",
                             regex=True)

    def build_sync_point_tests(self):
        """Add tests that involve sync points."""
        test = self.new_test("local_sync_point",
                             "Wait for a local sync point")
        test.add_cmd("attrd_updater", args="--name AAA -U 123 --wait=local --output-as=xml")
        test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
                     stdout_match='name="AAA" value="123"')
        test.add_cmd("attrd_updater", args="--name AAA -Q",
                     stdout_match='name="AAA" host="[^"]+" value="123"',
                     validate=False)
        test.add_log_pattern(r"Alerting client .* for reached local sync point",
                             regex=True)

        test = self.new_test("cluster_sync_point",
                             "Wait for a cluster-wide sync point")
        test.add_cmd("attrd_updater", args="--name BBB -U 456 --wait=cluster --output-as=xml")
        test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
                     stdout_match='name="BBB" value="456"')
        test.add_cmd("attrd_updater", args="--name BBB -Q",
                     stdout_match='name="BBB" host="[^"]+" value="456"',
                     validate=False)
        test.add_log_pattern(r"Alerting client .* for reached cluster sync point",
                             regex=True)


def build_options():
    """Handle command line arguments."""
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
                                     description="Run pacemaker-attrd regression tests",
                                     epilog="Example: Run only the test 'start_stop'\n"
                                            f"\t {sys.argv[0]} --run-only start_stop\n\n"
                                            "Example: Run only the tests with the string 'systemd' present in them\n"
                                            f"\t {sys.argv[0]} --run-only-pattern systemd")
    parser.add_argument("-l", "--list-tests", action="store_true",
                        help="Print out all registered tests")
    parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
                        help="Run only tests matching the given pattern")
    parser.add_argument("-r", "--run-only", metavar='TEST',
                        help="Run a specific test")
    parser.add_argument("-V", "--verbose", action="store_true",
                        help="Verbose output")

    args = parser.parse_args()
    return args


def main():
    """Run attrd regression tests as specified by arguments."""
    set_cts_path()

    # Ensure all command output is in portable locale for comparison
    os.environ['LC_ALL'] = "C"

    opts = build_options()

    exit_if_proc_running("pacemaker-attrd")

    # Create a temporary directory for log files (the directory and its
    # contents will automatically be erased when done)
    with tempfile.TemporaryDirectory(prefix="cts-attrd-") as logdir:
        tests = AttributeTests(verbose=opts.verbose, logdir=logdir)

        tests.build_basic_tests()
        tests.build_multiple_query_tests()
        tests.build_regex_tests()
        tests.build_utilization_tests()
        tests.build_sync_point_tests()

        if opts.list_tests:
            tests.print_list()
            sys.exit(ExitStatus.OK)

        print("Starting ...")

        try:
            tests.setup_environment(True)
        except TimeoutError:
            print("corosync did not start in time, exiting")
            sys.exit(ExitStatus.TIMEOUT)

        if opts.run_only_pattern:
            tests.run_tests_matching(opts.run_only_pattern)
            tests.print_results()
        elif opts.run_only:
            tests.run_single(opts.run_only)
            tests.print_results()
        else:
            tests.run_tests()
            tests.print_results()

        tests.cleanup_environment(True)

    tests.exit()


if __name__ == "__main__":
    main()

# vim: set filetype=python:
