diff --git a/cts/cts-cli.in b/cts/cts-cli.in index ae9d19e37c..0b39af8476 100644 --- a/cts/cts-cli.in +++ b/cts/cts-cli.in @@ -1,763 +1,767 @@ #!@PYTHON@ """Regression tests for Pacemaker's command line tools.""" # pylint doesn't like the module name "cts-cli" which is an invalid complaint for this file # but probably something we want to continue warning about elsewhere # pylint: disable=invalid-name # pacemaker imports need to come after we modify sys.path, which pylint will complain about. # pylint: disable=wrong-import-position __copyright__ = "Copyright 2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse from contextlib import contextmanager from functools import partial from multiprocessing import Pool, cpu_count import os import pathlib import re from shutil import copyfile import signal import subprocess import sys from tempfile import NamedTemporaryFile, mkstemp import types # These imports allow running from a source checkout after running `make`. if os.path.exists("@abs_top_srcdir@/python"): sys.path.insert(0, "@abs_top_srcdir@/python") # pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@": sys.path.insert(0, "@abs_top_builddir@/python") from pacemaker._cts.errors import XmlValidationError from pacemaker._cts.validate import validate from pacemaker.buildoptions import BuildOptions from pacemaker.exitstatus import ExitStatus # The default list of tests to run, in the order they should be run default_tests = ["access_render", "daemons", "dates", "error_codes", "tools", "crm_mon", "acls", "validity", "upgrade", "rules", "feature_set"] other_tests = ["agents"] # The directory containing this program test_home = os.path.dirname(os.path.realpath(__file__)) # Arguments to pass to valgrind VALGRIND_ARGS = ["-q", "--gen-suppressions=all", "--show-reachable=no", "--leak-check=full", "--trace-children=no", "--time-stamp=yes", "--num-callers=20", "--suppressions=%s/valgrind-pcmk.suppressions" % test_home] def apply_substitutions(s): """Apply text substitutions to an input string and return it.""" substitutions = { "test_home": test_home, } return s.format(**substitutions) def current_cib(): """Return the complete current CIB.""" with environ({"CIB_user": "root"}): return subprocess.check_output(["cibadmin", "-Q"], encoding="utf-8") def run_cmd_list(cmds): """ Run one or more shell commands. cmds can be: * A string * A Python function * A list of the above Raises subprocess.CalledProcessError on error. """ if cmds is None: return if isinstance(cmds, (str, types.FunctionType)): cmds = [cmds] for c in cmds: if isinstance(c, types.FunctionType): c() else: subprocess.run(apply_substitutions(c), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True, check=True) def sanitize_output(s): """ Replace content in the output expected to change between test runs. This is stuff like version numbers, timestamps, source line numbers, build options, system names and messages, etc. """ # A list of tuples of regular expressions and their replacements. replacements = [ (r' default="[^"]*"', r' default=""'), (r' version="[^"]*"', r' version=""') ] new_output = [] for line in s: for (pattern, repl) in replacements: line = re.sub(pattern, repl, line) new_output.append(line) return new_output @contextmanager def environ(env): """ Run code in an environment modified with the provided dict. This context manager augments the current process environment with the provided dict, allowing code to be constructed like so: e = {"CIB_user": "xyx"} with environ(e): ... When the context manager exits, the previous environment will be restored. It is possible to remove an environment key (whether it was in the environment by default, or given with a nested call to this context) by passing None for the value. Additionally, this context manager accepts None for the env parameter, in which case nothing will be done. Finally, note that values in env will be passed to apply_substitutions before being set in the environment. """ if env is None: env = {} original_env = {} else: original_env = os.environ.copy() for k, v in env.items(): if v is None: os.environ.pop(k) else: os.environ[k] = apply_substitutions(v) try: yield finally: for k, v in original_env.items(): if v is None: os.environ.pop(k) else: os.environ[k] = v class StdinCmd: """ A class for defining a command that should be run later. subprocess.Popen (and its various helper functions) start running the command immediately, which doesn't work if we want to provide the command when a Test is created, but delay its execution until the environment is defined when the Test is run. This class allows us to do that. """ def __init__(self, cmd): """Create a new StdinCmd instance. Arguments: cmd -- The command string to run later. This string will be passed to apply_substitutions before being executed. """ self._cmd = cmd def run(self): """Run this command, returning a subprocess.Popen object.""" return subprocess.Popen(apply_substitutions(self._cmd), shell=True, encoding="utf-8", stdout=subprocess.PIPE) class Test: """A base class for defining a single command line regression test.""" def __init__(self, desc, cmd, expected_rc=ExitStatus.OK, update_cib=False, setup=None, teardown=None, stdin=None, env=None): """ Create a new Test instance. Arguments: desc -- A short human-readable description of this test cmd -- The command to run for this test, as a string. This string will be passed to apply_substitutions before being executed. Keyword arguments: expected_rc -- The expected return value of cmd update_cib -- If True, the resulting CIB will be printed after performing the test setup -- A shell command to be run in the same environment as cmd, immediately before the test. Valid types are: a string, a Python function, or a list of the above teardown -- Like setup, but runs immediately after the test stdin -- If not None, the text to feed to cmd as its stdin env -- If not None, a dict of values to be added to the test environment. This will be added when the test is run and will override anything given to the TestGroup. """ self.desc = desc self.cmd = cmd self.expected_rc = expected_rc self.update_cib = update_cib self._setup = setup self._teardown = teardown self._stdin = stdin if env is None: self._env = {} else: self._env = env self._output = None @property def output(self): """Return the test's detailed output.""" return self._output def _log_end_test(self, rc): """Log a message when a test ends.""" if isinstance(rc, ExitStatus): rc_str = str(rc) else: if rc < 0: rc = abs(rc) rc_str = signal.strsignal(rc) else: rc = ExitStatus(rc) rc_str = str(rc) self._output.append("=#=#=#= End test: %s - %s (%d) =#=#=#=" % (self.desc, rc_str, rc)) def _log_start_test(self): """Log a message when a test starts.""" self._output.append("=#=#=#= Begin test: %s =#=#=#=" % self.desc) def _log_test_failed(self, app, rc): """Log a message when a test fails.""" self._output.append("* Failed (rc=%.3d): %-23s - %s" % (rc, app, self.desc)) def _log_test_passed(self, app): """Log a message when a test passes.""" self._output.append("* Passed: %-21s - %s" % (app, self.desc)) # pylint: disable=unused-argument def _validate_hook(self, rc, _stdout, _stderr, valgrind=False): """Validate test output.""" self._log_end_test(rc) return rc def _run_setup_teardown(self, cmd, app): """ Run any setup or teardown command required by this test. On success (or if no command is present), return True. On failure, return False and log the stdout/stderr of the command for debugging. Arguments: cmd -- The setup/teardown command(s) to run app -- The base name of the test command, for logging purposes """ try: run_cmd_list(cmd) return True except subprocess.CalledProcessError as exn: rc = exn.returncode self._output.extend(exn.stderr.splitlines()) self._output.extend(exn.stdout.splitlines()) self._log_test_failed(app, rc) return False def run(self, group, env=None, valgrind=False): """ Run this test. Basic output is printed to stdout, while detailed output is available in the self.output property after this function has been run. Return True if the return code matches self.expected_rc, and False otherwise. Arguments: group -- The name of the group this test is a part of, for logging purposes Keyword arguments: env -- If not None, a dict of values to be added to the test environment """ self._output = [] cmd = apply_substitutions(self.cmd) app = cmd.split(" ")[0] test_id = "%s(%s)" % (app, group) print("* Running: %-31s - %s" % (test_id, self.desc)) self._log_start_test() # Add any environment variables specified in Test.__init__ if env is None: env = self._env else: env = env.update(self._env) with environ(env): # Run the setup hook, if any if not self._run_setup_teardown(self._setup, app): return False # Define basic arguments for all forms of running this test. kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "shell": True, "universal_newlines": True, "check": False} stdin_p = None # Handle the stdin= parameter. if isinstance(self._stdin, StdinCmd): stdin_p = self._stdin.run() kwargs["stdin"] = stdin_p.stdout elif isinstance(self._stdin, pathlib.Path): kwargs["input"] = self._stdin.read_text() else: kwargs["input"] = self._stdin if valgrind: cmd = "valgrind %s %s" % (" ".join(VALGRIND_ARGS), cmd) # Run the test command # We handle the "check" argument above in the kwargs dict. # pylint: disable-msg=subprocess-run-check cmd_p = subprocess.run(cmd, **kwargs) rc = cmd_p.returncode if stdin_p is not None: stdin_p.stdout.close() self._output.extend(cmd_p.stderr.splitlines()) self._output.extend(cmd_p.stdout.splitlines()) # Run the teardown hook, if any if not self._run_setup_teardown(self._teardown, app): return False if self.update_cib: self._output.append("=#=#=#= Current cib after: %s =#=#=#=" % self.desc) self._output.extend(current_cib().splitlines()) self._validate_hook(rc, cmd_p.stdout, cmd_p.stderr, valgrind=valgrind) if rc == self.expected_rc: self._log_test_passed(app) return True self._log_test_failed(app, rc) return False class ValidatingTest(Test): """A Test subclass that additionally runs test results through xmllint.""" + def __init__(self, desc, cmd, **kwargs): + """Create a new ValidatingTest instance.""" + Test.__init__(self, desc + " (XML)", cmd, **kwargs) + def _validate_hook(self, rc, stdout, stderr, valgrind=False): """Validate test output with xmllint.""" # Do not validate if running under valgrind, even if told to do so. Valgrind # will output a lot more stuff that is not XML, so it wouldn't validate # anyway. if valgrind: return Test._validate_hook(self, rc, stdout, stderr, valgrind=valgrind) try: validate(stdout) # We only care about the return code from validation if there was an error, # which will be dealt with below. Here, we want to log the original return # code from the test itself. self._log_end_test(rc) return 0 except XmlValidationError as e: self._output.append("=#=#=#= End test: %s - Failed to validate (%d) =#=#=#=" % (self.desc, e.exit_code)) self._output.extend(e.output.splitlines()) return e.exit_code class RegressionTest: """A base class for testing a single command line tool.""" def __init__(self): """Create a new RegressionTest instance.""" self._identical = None self._successes = None self._failures = None self._tempfile = None self._output = None @property def failures(self): """Return the number of member tests that failed.""" return self._failures @property def identical(self): """Return whether the expected output matches the actual output.""" return self._identical @property def name(self): """ Return the name of this regression test. This should be a unique, very short, single word name without any special characters. It must match the name of some word in the default_tests list because it may be given with the -r option on the command line to select only certain tests to run. All subclasses must define this property. """ raise NotImplementedError @property def results_file(self): """Return the location where the regression test results are stored.""" return self._tempfile @property def successes(self): """Return the number of member tests that succeeded.""" return self._successes @property def summary(self): """Return a list of all Passed/Failed lines for tests in this regression test.""" retval = [] for line in self._output: if line.startswith("* Failed") or line.startswith("* Passed"): retval.append(line) return retval @property def tests(self): """A list of Test instances to be run as part of this regression test.""" return [] def cleanup(self): """Remove the temp file where test output is stored.""" os.remove(self._tempfile) self._tempfile = None def diff(self, verbose=False): """ Compare the results of this regression test to the expected results. Arguments: verbose -- If True, the diff will be written to stdout """ args = ["diff", "-wu", "%s/cli/regression.%s.exp" % (test_home, self.name), self.results_file] try: if verbose: subprocess.run(args, check=True) else: subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) self._identical = True except subprocess.CalledProcessError: self._identical = False def process_results(self, verbose): """If actual output differs from expected output, print the actual output.""" if self.identical: self.cleanup() return print(" %s" % self.results_file) if verbose: print("======================================================") with open(self.results_file, encoding="utf-8") as f: print(f.read()) print("======================================================") def run(self, valgrind=False): """ Run all Test instances that are a part of this regression test. Additionally, record their stdout and stderr in the self.output property and the total number of tests that passed and failed. """ self._failures = 0 self._successes = 0 self._output = [] for t in self.tests: rc = t.run(self.name, valgrind=valgrind) if rc: self._successes += 1 else: self._failures += 1 self._output.extend(t.output) self._output = sanitize_output(self._output) def write(self): """ Write test results to a temporary file and set self.results to its location. If self.run() has not yet been called, or there is otherwise no output, self.results will be None """ if not self._output: self._tempfile = None return s = "\n".join(self._output).encode() s += b"\n" (fp, self._tempfile) = mkstemp(prefix="cts-cli.%s." % self.name) os.write(fp, s) os.close(fp) class DaemonsRegressionTest(RegressionTest): """A class for testing command line options of pacemaker daemons.""" @property def name(self): """Return the name of this regression test.""" return "daemons" @property def tests(self): """A list of Test instances to be run as part of this regression test.""" return [ Test("Get CIB manager metadata", "pacemaker-based metadata"), Test("Get controller metadata", "pacemaker-controld metadata"), Test("Get fencer metadata", "pacemaker-fenced metadata"), Test("Get scheduler metadata", "pacemaker-schedulerd metadata"), ] def build_options(): """Handle command line arguments.""" parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description="Command line tool regression tests", epilog="Default tests: %s\n" "Other tests: agents (must be run in an installed environment)" % " ".join(default_tests)) parser.add_argument("-j", "--jobs", metavar="JOBS", default=cpu_count() - 1, type=int, help="The number of tests to run simultaneously") parser.add_argument("-p", "--path", metavar="DIR", action="append", help="Look for executables in DIR (may be specified multiple times)") parser.add_argument("-r", "--run-only", metavar="TEST", choices=default_tests + other_tests, action="append", help="Run only specified tests (may be specified multiple times)") parser.add_argument("-s", "--save", action="store_true", help="Save actual output as expected output") parser.add_argument("-v", "--valgrind", action="store_true", help="Run all commands under valgrind") parser.add_argument("-V", "--verbose", action="store_true", help="Display any differences from expected output") args = parser.parse_args() if args.path is None: args.path = [] return args def setup_environment(valgrind): """Set various environment variables needed for operation.""" if valgrind: os.environ["G_SLICE"] = "always-malloc" # Ensure all command output is in portable locale for comparison os.environ["LC_ALL"] = "C" # Log test errors to stderr os.environ["PCMK_stderr"] = "1" # Because we will change the value of PCMK_trace_functions and then reset it # back to some initial value at various points, it's easiest to assume it is # defined but empty by default if "PCMK_trace_functions" not in os.environ: os.environ["PCMK_trace_functions"] = "" def path_prepend(p): """Add another directory to the front of $PATH.""" old = os.environ["PATH"] os.environ["PATH"] = "%s:%s" % (p, old) def setup_path(opts_path): """Set the PATH environment variable appropriately for the tests.""" srcdir = os.path.dirname(test_home) # Add any search paths given on the command line for p in opts_path: path_prepend(p) if os.path.exists("%s/tools/crm_simulate" % srcdir): print("Using local binaries from: %s" % srcdir) path_prepend("%s/tools" % srcdir) for daemon in ["based", "controld", "fenced", "schedulerd"]: path_prepend("%s/daemons/%s" % (srcdir, daemon)) print("Using local schemas from: %s/xml" % srcdir) os.environ["PCMK_schema_directory"] = "%s/xml" % srcdir else: path_prepend(BuildOptions.DAEMON_DIR) os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR def _run_one(valgrind, r): """Run and return a TestGroup object.""" # See comments in run_regression_tests. r.run(valgrind=valgrind) return r def run_regression_tests(regs, jobs, valgrind=False): """Run the given tests and return the modified objects.""" executed = [] with Pool(processes=jobs) as pool: # What we really want to do here is: # pool.map(lambda r: r.run(),regs) # # However, multiprocessing uses pickle somehow in its operation, and python # doesn't want to pickle a lambda (nor a nested function within this one). # Thus, we need to use the _run_one wrapper at the file level just to call # run(). Further, if we don't return the modified object from that and then # return the list of modified objects here, it looks like the rest of the # program will use the originals, before this was ever run. executed = pool.map(partial(_run_one, valgrind), regs) return executed def results(regs, save, verbose): """Print the output from each regression test, returning the number whose output differs.""" output_differs = 0 if verbose: print("\n\nResults") for r in regs: r.write() r.diff() if not r.identical: output_differs += 1 if save: dest = "%s/cli/regression.%s.exp" % (test_home, r.name) copyfile(r.results_file, dest) return output_differs def summary(regs, output_differs, verbose): """Print the summary output for the entire test run.""" test_failures = 0 test_successes = 0 for r in regs: test_failures += r.failures test_successes += r.successes print("\n\nSummary") # First, print all the Passed/Failed lines from each Test run. for r in regs: print("\n".join(r.summary)) # Then, print information specific to each result possibility. Basically, # if there were failures then we print the output differences, leave the # failed output files in place, and exit with an error. Otherwise, clean up # anything that passed. if test_failures > 0 and output_differs > 0: print("%d test failed; see output in:" % test_failures) for r in regs: r.process_results(verbose) return ExitStatus.ERROR if test_failures > 0: print("%d tests failed" % test_failures) for r in regs: r.process_results(verbose) return ExitStatus.ERROR if output_differs: print("%d tests passed but output was unexpected; see output in:" % test_successes) for r in regs: r.process_results(verbose) return ExitStatus.DIGEST print("%d tests passed" % test_successes) for r in regs: r.cleanup() return ExitStatus.OK regression_classes = [ DaemonsRegressionTest, ] def main(): """Run command line regression tests as specified by arguments.""" opts = build_options() setup_environment(opts.valgrind) setup_path(opts.path) # Filter the list of all regression test classes to include only those that # were requested on the command line. If empty, this defaults to default_tests. if not opts.run_only: opts.run_only = default_tests regs = [] for cls in regression_classes: obj = cls() if obj.name in opts.run_only: regs.append(obj) regs = run_regression_tests(regs, max(1, opts.jobs), valgrind=opts.valgrind) output_differs = results(regs, opts.save, opts.verbose) rc = summary(regs, output_differs, opts.verbose) sys.exit(rc) if __name__ == "__main__": main()