Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/cts/cts-cli.in b/cts/cts-cli.in
index 6491991f32..62bf15cf70 100644
--- a/cts/cts-cli.in
+++ b/cts/cts-cli.in
@@ -1,686 +1,731 @@
#!@PYTHON@
"""Regression tests for Pacemaker's command line tools."""
# pylint doesn't like the module name "cts-cli" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
from contextlib import contextmanager
from functools import partial
from multiprocessing import Pool, cpu_count
import os
+import pathlib
import re
from shutil import copyfile
import signal
import subprocess
import sys
from tempfile import mkstemp
import types
# These imports allow running from a source checkout after running `make`.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
# The default list of tests to run, in the order they should be run
default_tests = ["access_render", "daemons", "dates", "error_codes", "tools",
"crm_mon", "acls", "validity", "upgrade", "rules", "feature_set"]
other_tests = ["agents"]
# The directory containing this program
test_home = os.path.dirname(os.path.realpath(__file__))
# Arguments to pass to valgrind
VALGRIND_ARGS = ["-q", "--gen-suppressions=all", "--show-reachable=no", "--leak-check=full",
"--trace-children=no", "--time-stamp=yes", "--num-callers=20",
"--suppressions=%s/valgrind-pcmk.suppressions" % test_home]
def apply_substitutions(s):
"""Apply text substitutions to an input string and return it."""
substitutions = {
"test_home": test_home,
}
return s.format(**substitutions)
def current_cib():
"""Return the complete current CIB."""
with environ({"CIB_user": "root"}):
return subprocess.check_output(["cibadmin", "-Q"], encoding="utf-8")
def run_cmd_list(cmds):
"""
Run one or more shell commands.
cmds can be:
* A string
* A Python function
* A list of the above
Raises subprocess.CalledProcessError on error.
"""
if cmds is None:
return
if isinstance(cmds, (str, types.FunctionType)):
cmds = [cmds]
for c in cmds:
if isinstance(c, types.FunctionType):
c()
else:
subprocess.run(apply_substitutions(c), stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, universal_newlines=True, check=True)
def sanitize_output(s):
"""
Replace content in the output expected to change between test runs.
This is stuff like version numbers, timestamps, source line numbers,
build options, system names and messages, etc.
"""
# A list of tuples of regular expressions and their replacements.
replacements = [
(r' default="[^"]*"', r' default=""'),
(r' version="[^"]*"', r' version=""')
]
new_output = []
for line in s:
for (pattern, repl) in replacements:
line = re.sub(pattern, repl, line)
new_output.append(line)
return new_output
@contextmanager
def environ(env):
"""
Run code in an environment modified with the provided dict.
This context manager augments the current process environment with the provided
dict, allowing code to be constructed like so:
e = {"CIB_user": "xyx"}
with environ(e):
...
When the context manager exits, the previous environment will be restored.
It is possible to remove an environment key (whether it was in the environment by
default, or given with a nested call to this context) by passing None for the
value. Additionally, this context manager accepts None for the env parameter,
in which case nothing will be done.
Finally, note that values in env will be passed to apply_substitutions before
being set in the environment.
"""
if env is None:
env = {}
original_env = {}
else:
original_env = os.environ.copy()
for k, v in env.items():
if v is None:
os.environ.pop(k)
else:
os.environ[k] = apply_substitutions(v)
try:
yield
finally:
for k, v in original_env.items():
if v is None:
os.environ.pop(k)
else:
os.environ[k] = v
+class StdinCmd:
+ """
+ A class for defining a command that should be run later.
+
+ subprocess.Popen (and its various helper functions) start running the command
+ immediately, which doesn't work if we want to provide the command when a Test
+ is created, but delay its execution until the environment is defined when the
+ Test is run.
+
+ This class allows us to do that.
+ """
+
+ def __init__(self, cmd):
+ """Create a new StdinCmd instance.
+
+ Arguments:
+ cmd -- The command string to run later. This string will be passed
+ to apply_substitutions before being executed.
+ """
+ self._cmd = cmd
+
+ def run(self):
+ """Run this command, returning a subprocess.Popen object."""
+ return subprocess.Popen(apply_substitutions(self._cmd), shell=True,
+ encoding="utf-8", stdout=subprocess.PIPE)
+
+
class Test:
"""A base class for defining a single command line regression test."""
def __init__(self, desc, cmd, expected_rc=ExitStatus.OK, update_cib=False,
setup=None, teardown=None, stdin=None, env=None):
"""
Create a new Test instance.
Arguments:
desc -- A short human-readable description of this test
cmd -- The command to run for this test, as a string. This string
will be passed to apply_substitutions before being executed.
Keyword arguments:
expected_rc -- The expected return value of cmd
update_cib -- If True, the resulting CIB will be printed after
performing the test
setup -- A shell command to be run in the same environment as
cmd, immediately before the test. Valid types are:
a string, a Python function, or a list of the above
teardown -- Like setup, but runs immediately after the test
stdin -- If not None, the text to feed to cmd as its stdin
env -- If not None, a dict of values to be added to the test
environment. This will be added when the test is run
and will override anything given to the TestGroup.
"""
self.desc = desc
self.cmd = cmd
self.expected_rc = expected_rc
self.update_cib = update_cib
self._setup = setup
self._teardown = teardown
self._stdin = stdin
if env is None:
self._env = {}
else:
self._env = env
self._output = None
@property
def output(self):
"""Return the test's detailed output."""
return self._output
def _log_end_test(self, rc):
"""Log a message when a test ends."""
if isinstance(rc, ExitStatus):
rc_str = str(rc)
else:
if rc < 0:
rc = abs(rc)
rc_str = signal.strsignal(rc)
else:
rc = ExitStatus(rc)
rc_str = str(rc)
self._output.append("=#=#=#= End test: %s - %s (%d) =#=#=#=" % (self.desc, rc_str, rc))
def _log_start_test(self):
"""Log a message when a test starts."""
self._output.append("=#=#=#= Begin test: %s =#=#=#=" % self.desc)
def _log_test_failed(self, app, rc):
"""Log a message when a test fails."""
self._output.append("* Failed (rc=%.3d): %-23s - %s" % (rc, app, self.desc))
def _log_test_passed(self, app):
"""Log a message when a test passes."""
self._output.append("* Passed: %-21s - %s" % (app, self.desc))
def _run_setup_teardown(self, cmd, app):
"""
Run any setup or teardown command required by this test.
On success (or if no command is present), return True. On failure,
return False and log the stdout/stderr of the command for debugging.
Arguments:
cmd -- The setup/teardown command(s) to run
app -- The base name of the test command, for logging purposes
"""
try:
run_cmd_list(cmd)
return True
except subprocess.CalledProcessError as exn:
rc = exn.returncode
self._output.extend(exn.stderr.splitlines())
self._output.extend(exn.stdout.splitlines())
self._log_test_failed(app, rc)
return False
def run(self, group, env=None, valgrind=False):
"""
Run this test.
Basic output is printed to stdout, while detailed output is available
in the self.output property after this function has been run. Return
True if the return code matches self.expected_rc, and False otherwise.
Arguments:
group -- The name of the group this test is a part of, for logging purposes
Keyword arguments:
env -- If not None, a dict of values to be added to the test environment
"""
self._output = []
cmd = apply_substitutions(self.cmd)
app = cmd.split(" ")[0]
test_id = "%s(%s)" % (app, group)
print("* Running: %-31s - %s" % (test_id, self.desc))
self._log_start_test()
# Add any environment variables specified in Test.__init__
if env is None:
env = self._env
else:
env = env.update(self._env)
with environ(env):
# Run the setup hook, if any
if not self._run_setup_teardown(self._setup, app):
return False
+ # Define basic arguments for all forms of running this test.
+ kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.PIPE,
+ "shell": True, "universal_newlines": True, "check": False}
+ stdin_p = None
+
+ # Handle the stdin= parameter.
+ if isinstance(self._stdin, StdinCmd):
+ stdin_p = self._stdin.run()
+ kwargs["stdin"] = stdin_p.stdout
+ elif isinstance(self._stdin, pathlib.Path):
+ kwargs["input"] = self._stdin.read_text()
+ else:
+ kwargs["input"] = self._stdin
+
if valgrind:
cmd = "valgrind %s %s" % (" ".join(VALGRIND_ARGS), cmd)
# Run the test command
- cmd_p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- shell=True, universal_newlines=True, check=False,
- input=self._stdin)
+ # We handle the "check" argument above in the kwargs dict.
+ # pylint: disable-msg=subprocess-run-check
+ cmd_p = subprocess.run(cmd, **kwargs)
rc = cmd_p.returncode
+ if stdin_p is not None:
+ stdin_p.stdout.close()
+
self._output.extend(cmd_p.stderr.splitlines())
self._output.extend(cmd_p.stdout.splitlines())
# Run the teardown hook, if any
if not self._run_setup_teardown(self._teardown, app):
return False
if self.update_cib:
self._output.append("=#=#=#= Current cib after: %s =#=#=#=" % self.desc)
self._output.extend(current_cib().splitlines())
self._log_end_test(rc)
if rc == self.expected_rc:
self._log_test_passed(app)
return True
self._log_test_failed(app, rc)
return False
class RegressionTest:
"""A base class for testing a single command line tool."""
def __init__(self):
"""Create a new RegressionTest instance."""
self._identical = None
self._successes = None
self._failures = None
self._tempfile = None
self._output = None
@property
def failures(self):
"""Return the number of member tests that failed."""
return self._failures
@property
def identical(self):
"""Return whether the expected output matches the actual output."""
return self._identical
@property
def name(self):
"""
Return the name of this regression test.
This should be a unique, very short, single word name without any special
characters. It must match the name of some word in the default_tests
list because it may be given with the -r option on the command line
to select only certain tests to run.
All subclasses must define this property.
"""
raise NotImplementedError
@property
def results_file(self):
"""Return the location where the regression test results are stored."""
return self._tempfile
@property
def successes(self):
"""Return the number of member tests that succeeded."""
return self._successes
@property
def summary(self):
"""Return a list of all Passed/Failed lines for tests in this regression test."""
retval = []
for line in self._output:
if line.startswith("* Failed") or line.startswith("* Passed"):
retval.append(line)
return retval
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
return []
def cleanup(self):
"""Remove the temp file where test output is stored."""
os.remove(self._tempfile)
self._tempfile = None
def diff(self, verbose=False):
"""
Compare the results of this regression test to the expected results.
Arguments:
verbose -- If True, the diff will be written to stdout
"""
args = ["diff", "-wu", "%s/cli/regression.%s.exp" % (test_home, self.name), self.results_file]
try:
if verbose:
subprocess.run(args, check=True)
else:
subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
self._identical = True
except subprocess.CalledProcessError:
self._identical = False
def process_results(self, verbose):
"""If actual output differs from expected output, print the actual output."""
if self.identical:
self.cleanup()
return
print(" %s" % self.results_file)
if verbose:
print("======================================================")
with open(self.results_file, encoding="utf-8") as f:
print(f.read())
print("======================================================")
def run(self, valgrind=False):
"""
Run all Test instances that are a part of this regression test.
Additionally, record their stdout and stderr in the self.output property
and the total number of tests that passed and failed.
"""
self._failures = 0
self._successes = 0
self._output = []
for t in self.tests:
rc = t.run(self.name, valgrind=valgrind)
if rc:
self._successes += 1
else:
self._failures += 1
self._output.extend(t.output)
self._output = sanitize_output(self._output)
def write(self):
"""
Write test results to a temporary file and set self.results to its location.
If self.run() has not yet been called, or there is otherwise no output,
self.results will be None
"""
if not self._output:
self._tempfile = None
return
s = "\n".join(self._output).encode()
s += b"\n"
(fp, self._tempfile) = mkstemp(prefix="cts-cli.%s." % self.name)
os.write(fp, s)
os.close(fp)
class DaemonsRegressionTest(RegressionTest):
"""A class for testing command line options of pacemaker daemons."""
@property
def name(self):
"""Return the name of this regression test."""
return "daemons"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
return [
Test("Get CIB manager metadata", "pacemaker-based metadata"),
Test("Get controller metadata", "pacemaker-controld metadata"),
Test("Get fencer metadata", "pacemaker-fenced metadata"),
Test("Get scheduler metadata", "pacemaker-schedulerd metadata"),
]
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Command line tool regression tests",
epilog="Default tests: %s\n"
"Other tests: agents (must be run in an installed environment)" %
" ".join(default_tests))
parser.add_argument("-j", "--jobs", metavar="JOBS", default=cpu_count() - 1, type=int,
help="The number of tests to run simultaneously")
parser.add_argument("-p", "--path", metavar="DIR", action="append",
help="Look for executables in DIR (may be specified multiple times)")
parser.add_argument("-r", "--run-only", metavar="TEST", choices=default_tests + other_tests,
action="append",
help="Run only specified tests (may be specified multiple times)")
parser.add_argument("-s", "--save", action="store_true",
help="Save actual output as expected output")
parser.add_argument("-v", "--valgrind", action="store_true",
help="Run all commands under valgrind")
parser.add_argument("-V", "--verbose", action="store_true",
help="Display any differences from expected output")
args = parser.parse_args()
if args.path is None:
args.path = []
return args
def setup_environment(valgrind):
"""Set various environment variables needed for operation."""
if valgrind:
os.environ["G_SLICE"] = "always-malloc"
# Ensure all command output is in portable locale for comparison
os.environ["LC_ALL"] = "C"
# Log test errors to stderr
os.environ["PCMK_stderr"] = "1"
# Because we will change the value of PCMK_trace_functions and then reset it
# back to some initial value at various points, it's easiest to assume it is
# defined but empty by default
if "PCMK_trace_functions" not in os.environ:
os.environ["PCMK_trace_functions"] = ""
def path_prepend(p):
"""Add another directory to the front of $PATH."""
old = os.environ["PATH"]
os.environ["PATH"] = "%s:%s" % (p, old)
def setup_path(opts_path):
"""Set the PATH environment variable appropriately for the tests."""
srcdir = os.path.dirname(test_home)
# Add any search paths given on the command line
for p in opts_path:
path_prepend(p)
if os.path.exists("%s/tools/crm_simulate" % srcdir):
print("Using local binaries from: %s" % srcdir)
path_prepend("%s/tools" % srcdir)
for daemon in ["based", "controld", "fenced", "schedulerd"]:
path_prepend("%s/daemons/%s" % (srcdir, daemon))
print("Using local schemas from: %s/xml" % srcdir)
os.environ["PCMK_schema_directory"] = "%s/xml" % srcdir
else:
path_prepend(BuildOptions.DAEMON_DIR)
os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR
def _run_one(valgrind, r):
"""Run and return a TestGroup object."""
# See comments in run_regression_tests.
r.run(valgrind=valgrind)
return r
def run_regression_tests(regs, jobs, valgrind=False):
"""Run the given tests and return the modified objects."""
executed = []
with Pool(processes=jobs) as pool:
# What we really want to do here is:
# pool.map(lambda r: r.run(),regs)
#
# However, multiprocessing uses pickle somehow in its operation, and python
# doesn't want to pickle a lambda (nor a nested function within this one).
# Thus, we need to use the _run_one wrapper at the file level just to call
# run(). Further, if we don't return the modified object from that and then
# return the list of modified objects here, it looks like the rest of the
# program will use the originals, before this was ever run.
executed = pool.map(partial(_run_one, valgrind), regs)
return executed
def results(regs, save, verbose):
"""Print the output from each regression test, returning the number whose output differs."""
output_differs = 0
if verbose:
print("\n\nResults")
for r in regs:
r.write()
r.diff()
if not r.identical:
output_differs += 1
if save:
dest = "%s/cli/regression.%s.exp" % (test_home, r.name)
copyfile(r.results_file, dest)
return output_differs
def summary(regs, output_differs, verbose):
"""Print the summary output for the entire test run."""
test_failures = 0
test_successes = 0
for r in regs:
test_failures += r.failures
test_successes += r.successes
print("\n\nSummary")
# First, print all the Passed/Failed lines from each Test run.
for r in regs:
print("\n".join(r.summary))
# Then, print information specific to each result possibility. Basically,
# if there were failures then we print the output differences, leave the
# failed output files in place, and exit with an error. Otherwise, clean up
# anything that passed.
if test_failures > 0 and output_differs > 0:
print("%d test failed; see output in:" % test_failures)
for r in regs:
r.process_results(verbose)
return ExitStatus.ERROR
if test_failures > 0:
print("%d tests failed" % test_failures)
for r in regs:
r.process_results(verbose)
return ExitStatus.ERROR
if output_differs:
print("%d tests passed but output was unexpected; see output in:" % test_successes)
for r in regs:
r.process_results(verbose)
return ExitStatus.DIGEST
print("%d tests passed" % test_successes)
for r in regs:
r.cleanup()
return ExitStatus.OK
regression_classes = [
DaemonsRegressionTest,
]
def main():
"""Run command line regression tests as specified by arguments."""
opts = build_options()
setup_environment(opts.valgrind)
setup_path(opts.path)
# Filter the list of all regression test classes to include only those that
# were requested on the command line. If empty, this defaults to default_tests.
if not opts.run_only:
opts.run_only = default_tests
regs = []
for cls in regression_classes:
obj = cls()
if obj.name in opts.run_only:
regs.append(obj)
regs = run_regression_tests(regs, max(1, opts.jobs), valgrind=opts.valgrind)
output_differs = results(regs, opts.save, opts.verbose)
rc = summary(regs, output_differs, opts.verbose)
sys.exit(rc)
if __name__ == "__main__":
main()

File Metadata

Mime Type
text/x-diff
Expires
Mon, Apr 21, 3:03 PM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1663938
Default Alt Text
(24 KB)

Event Timeline