#!@PYTHON@ """ Command-line interface to Pacemaker's Cluster Test Suite (CTS) """ # pylint doesn't like the module name "cts-lab" which is an invalid complaint for this file # This also disables various other invalid names - it thinks scenario and match are constants # that should have all caps names, and that cm and n are too short. # pylint: disable=invalid-name __copyright__ = "Copyright 2001-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import signal import sys from pacemaker._cts.CTS import CtsLab from pacemaker._cts.cmcorosync import Corosync2 from pacemaker._cts.audits import audit_list from pacemaker._cts.logging import LogFactory from pacemaker._cts.scenarios import AllOnce, Boot, BootCluster, LeaveBooted, RandomTests, Sequence from pacemaker._cts.tests import test_list # These are globals so they can be used by the signal handler. scenario = None LogFactory().add_stderr() def sig_handler(signum, _frame): """ Handle the given signal number """ LogFactory().log("Interrupted by signal %d" % signum) if scenario: scenario.summarize() if signum == 15: if scenario: scenario.teardown() sys.exit(1) def plural_s(n): """ Return a string suffix depending on whether or not n is > 1 """ if n == 1: return "" return "S" if __name__ == '__main__': environment = CtsLab(sys.argv[1:]) iters = environment["iterations"] tests = [] # Set the signal handler signal.signal(15, sig_handler) signal.signal(10, sig_handler) # Create the Cluster Manager object cm = None if environment["Stack"] == "corosync 2+": cm = Corosync2() else: LogFactory().log("Unknown stack: %s" % environment["stack"]) sys.exit(1) if environment["TruncateLog"]: if environment["OutputFile"] is None: LogFactory().log("Ignoring truncate request because no output file specified") else: LogFactory().log("Truncating %s" % environment["OutputFile"]) with open(environment["OutputFile"], "w", encoding="utf-8") as outputfile: outputfile.truncate(0) audits = audit_list(cm) if environment["Listtests"]: tests = test_list(cm, audits) LogFactory().log("Total %d tests" % len(tests)) for test in tests: LogFactory().log(test.name) sys.exit(0) elif len(environment["tests"]) == 0: tests = test_list(cm, audits) else: chosen = environment["tests"] for test_case in chosen: match = None for test in test_list(cm, audits): if test.name == test_case: match = test if not match: LogFactory().log("--choose: No applicable/valid tests chosen") sys.exit(1) else: tests.append(match) # Scenario selection if environment["scenario"] == "all-once": iters = len(tests) scenario = AllOnce(cm, [ BootCluster(cm, environment) ], audits, tests) elif environment["scenario"] == "sequence": scenario = Sequence(cm, [ BootCluster(cm, environment) ], audits, tests) elif environment["scenario"] == "boot": scenario = Boot(cm, [ LeaveBooted(cm, environment)], audits, []) else: scenario = RandomTests(cm, [ BootCluster(cm, environment) ], audits, tests) LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING %r TEST%s" % (iters, plural_s(iters))) LogFactory().log("Stack: %s (%s)" % (environment["Stack"], environment["Name"])) LogFactory().log("Schema: %s" % environment["Schema"]) LogFactory().log("Scenario: %s" % scenario.__doc__) LogFactory().log("CTS Exerciser: %s" % environment["cts-exerciser"]) LogFactory().log("CTS Logfile: %s" % environment["OutputFile"]) LogFactory().log("Random Seed: %s" % environment["RandSeed"]) LogFactory().log("Syslog variant: %s" % environment["syslogd"].strip()) LogFactory().log("System log files: %s" % environment["LogFileName"]) if "IPBase" in environment: LogFactory().log("Base IP for resources: %s" % environment["IPBase"]) LogFactory().log("Cluster starts at boot: %d" % environment["at-boot"]) environment.dump() rc = environment.run(scenario, iters) sys.exit(rc)