diff --git a/cts/cts-lab.in b/cts/cts-lab.in index 31d3a63f6e..01bf9aa3bd 100644 --- a/cts/cts-lab.in +++ b/cts/cts-lab.in @@ -1,131 +1,136 @@ #!@PYTHON@ """ Command-line interface to Pacemaker's Cluster Test Suite (CTS) """ +# pylint doesn't like the module name "cts-lab" which is an invalid complaint for this file +# This also disables various other invalid names - it thinks scenario and match are constants +# that should have all caps names, and that cm and n are too short. +# pylint: disable=invalid-name + __copyright__ = "Copyright 2001-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" -import sys, signal, os - -pdir = os.path.dirname(sys.path[0]) -sys.path.insert(0, pdir) # So that things work from the source directory +import signal +import sys -try: - from pacemaker._cts.CTS import CtsLab - from pacemaker._cts.cmcorosync import Corosync2 - from pacemaker._cts.audits import audit_list - from pacemaker._cts.logging import LogFactory - from pacemaker._cts.scenarios import AllOnce, Boot, BootCluster, LeaveBooted, RandomTests, Sequence - from pacemaker._cts.tests import test_list -except ImportError as e: - sys.stderr.write("abort: %s\n" % e) - sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" % - ' '.join(sys.path)) - sys.exit(1) +from pacemaker._cts.CTS import CtsLab +from pacemaker._cts.cmcorosync import Corosync2 +from pacemaker._cts.audits import audit_list +from pacemaker._cts.logging import LogFactory +from pacemaker._cts.scenarios import AllOnce, Boot, BootCluster, LeaveBooted, RandomTests, Sequence +from pacemaker._cts.tests import test_list # These are globals so they can be used by the signal handler. scenario = None LogFactory().add_stderr() -def sig_handler(signum, frame) : - LogFactory().log("Interrupted by signal %d"%signum) - if scenario: scenario.summarize() - if signum == 15 : - if scenario: scenario.teardown() +def sig_handler(signum, _frame): + """ Handle the given signal number """ + + LogFactory().log("Interrupted by signal %d" % signum) + + if scenario: + scenario.summarize() + + if signum == 15: + if scenario: + scenario.teardown() + sys.exit(1) -def plural_s(n, uppercase=False): +def plural_s(n): + """ Return a string suffix depending on whether or not n is > 1 """ + if n == 1: return "" - elif uppercase: - return "S" - else: - return "s" + return "S" -if __name__ == '__main__': - Environment = CtsLab(sys.argv[1:]) - NumIter = Environment["iterations"] - Tests = [] +if __name__ == '__main__': + environment = CtsLab(sys.argv[1:]) + iters = environment["iterations"] + tests = [] # Set the signal handler signal.signal(15, sig_handler) signal.signal(10, sig_handler) # Create the Cluster Manager object cm = None - if Environment["Stack"] == "corosync 2+": + + if environment["Stack"] == "corosync 2+": cm = Corosync2() - else: - LogFactory().log("Unknown stack: "+Environment["stack"]) + LogFactory().log("Unknown stack: %s" % environment["stack"]) sys.exit(1) - if Environment["TruncateLog"]: - if Environment["OutputFile"] is None: + if environment["TruncateLog"]: + if environment["OutputFile"] is None: LogFactory().log("Ignoring truncate request because no output file specified") else: - LogFactory().log("Truncating %s" % Environment["OutputFile"]) - with open(Environment["OutputFile"], "w") as outputfile: + LogFactory().log("Truncating %s" % environment["OutputFile"]) + + with open(environment["OutputFile"], "w", encoding="utf-8") as outputfile: outputfile.truncate(0) - Audits = audit_list(cm) + audits = audit_list(cm) + + if environment["Listtests"]: + tests = test_list(cm, audits) + LogFactory().log("Total %d tests" % len(tests)) + + for test in tests: + LogFactory().log(test.name) - if Environment["ListTests"]: - Tests = test_list(cm, Audits) - LogFactory().log("Total %d tests"%len(Tests)) - for test in Tests : - LogFactory().log(str(test.name)); sys.exit(0) - elif len(Environment["tests"]) == 0: - Tests = test_list(cm, Audits) + elif len(environment["tests"]) == 0: + tests = test_list(cm, audits) else: - Chosen = Environment["tests"] - for TestCase in Chosen: - match = None + chosen = environment["tests"] + for test_case in chosen: + match = None - for test in test_list(cm, Audits): - if test.name == TestCase: - match = test + for test in test_list(cm, audits): + if test.name == test_case: + match = test - if not match: - LogFactory().log("--choose: No applicable/valid tests chosen") - sys.exit(1) - else: - Tests.append(match) + if not match: + LogFactory().log("--choose: No applicable/valid tests chosen") + sys.exit(1) + else: + tests.append(match) # Scenario selection - if Environment["scenario"] == "all-once": - NumIter = len(Tests) - scenario = AllOnce( - cm, [ BootCluster(cm, Environment) ], Audits, Tests) - elif Environment["scenario"] == "sequence": - scenario = Sequence( - cm, [ BootCluster(cm, Environment) ], Audits, Tests) - elif Environment["scenario"] == "boot": - scenario = Boot(cm, [ LeaveBooted(cm, Environment)], Audits, []) + if environment["scenario"] == "all-once": + iters = len(tests) + scenario = AllOnce(cm, [ BootCluster(cm, environment) ], audits, tests) + elif environment["scenario"] == "sequence": + scenario = Sequence(cm, [ BootCluster(cm, environment) ], audits, tests) + elif environment["scenario"] == "boot": + scenario = Boot(cm, [ LeaveBooted(cm, environment)], audits, []) else: - scenario = RandomTests( - cm, [ BootCluster(cm, Environment) ], Audits, Tests) + scenario = RandomTests(cm, [ BootCluster(cm, environment) ], audits, tests) - LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TEST" + plural_s(NumIter, True) + " ") - LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"])) - LogFactory().log("Schema: %s" % Environment["Schema"]) + LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING %r TEST%s" % (iters, plural_s(iters))) + LogFactory().log("Stack: %s (%s)" % (environment["Stack"], environment["Name"])) + LogFactory().log("Schema: %s" % environment["Schema"]) LogFactory().log("Scenario: %s" % scenario.__doc__) - LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"]) - LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"]) - LogFactory().log("Random Seed: %s" % Environment["RandSeed"]) - LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip()) - LogFactory().log("System log files: %s" % Environment["LogFileName"]) - if "IPBase" in Environment: - LogFactory().log("Base IP for resources: %s" % Environment["IPBase"]) - LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"]) - - Environment.dump() - rc = Environment.run(scenario, NumIter) + LogFactory().log("CTS Exerciser: %s" % environment["cts-exerciser"]) + LogFactory().log("CTS Logfile: %s" % environment["OutputFile"]) + LogFactory().log("Random Seed: %s" % environment["RandSeed"]) + LogFactory().log("Syslog variant: %s" % environment["syslogd"].strip()) + LogFactory().log("System log files: %s" % environment["LogFileName"]) + + if "IPBase" in environment: + LogFactory().log("Base IP for resources: %s" % environment["IPBase"]) + + LogFactory().log("Cluster starts at boot: %d" % environment["at-boot"]) + + environment.dump() + rc = environment.run(scenario, iters) sys.exit(rc)