diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index 15e2289100..dba7938f4f 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,638 +1,638 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory"] __copyright__ = "Copyright 2014-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse import os import random import socket import sys import time from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). It's possible we could fix this with type annotations, # but those were introduced with python 3.5 and we only support python 3.4. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} self._nodes = [] # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["IPagent"] = "IPaddr2" self["DoFencing"] = True self["ClobberCIB"] = False self["CIBfilename"] = None self["CIBResource"] = False self["LogWatcher"] = LogKind.ANY self["node-limit"] = 0 self["scenario"] = "random" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._target = "localhost" self._seed_random() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def _seed_random(self, seed=None): """ Initialize the random number generator. Arguments: seed -- Use this to see the random number generator, or use the current time if None. """ if not seed: seed = int(time.time()) self["RandSeed"] = seed self.random_gen.seed(str(seed)) def dump(self): """Print the current environment.""" keys = [] for key in list(self.data.keys()): keys.append(key) keys.sort() for key in keys: s = "Environment[%s]" % key self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key]))) def keys(self): """Return a list of all environment keys stored in this instance.""" return list(self.data.keys()) def __contains__(self, key): """Return True if the given key exists in the environment.""" if key == "nodes": return True return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" if str(key) == "0": raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead") if key == "nodes": return self._nodes if key == "Name": return self._get_stack_short() return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "Stack": self._set_stack(value) elif key == "node-limit": self.data[key] = value self._filter_nodes() elif key == "nodes": self._nodes = [] for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: n = node.strip() socket.gethostbyname_ex(n) self._nodes.append(n) except: self._logger.log("%s not found in DNS... aborting" % node) raise self._filter_nodes() else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def get(self, key, default=None): """Return the value for key if key is in the environment, else default.""" if key == "nodes": return self._nodes return self.data.get(key, default) def _set_stack(self, name): """Normalize the given cluster stack name.""" if name in ["corosync", "cs", "mcp"]: self.data["Stack"] = "corosync 2+" else: raise ValueError("Unknown stack: %s" % name) def _get_stack_short(self): """Return the short name for the currently set cluster stack.""" if "Stack" not in self.data: return "unknown" if self.data["Stack"] == "corosync 2+": return "crm-corosync" LogFactory().log("Unknown stack: %s" % self["stack"]) raise ValueError("Unknown stack: %s" % self["stack"]) def _detect_systemd(self): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self): """Detect the syslog variant in use on the target node.""" if "syslogd" not in self.data: if self["have_systemd"]: # Systemd (_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) self["syslogd"] = lines[0].strip() else: # SYS-V (_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) self["syslogd"] = lines[0].strip() if "syslogd" not in self.data or not self["syslogd"]: # default self["syslogd"] = "rsyslog" def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl disable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s off" % service) return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl enable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s on" % service) return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service) return rc == 0 # SYS-V (rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service) return rc == 0 def _detect_at_boot(self): """Detect if the cluster starts at boot.""" if "at-boot" not in self.data: self["at-boot"] = self.service_is_enabled(self._target, "corosync") \ - or self.service_is_enabled(self._target, "pacemaker") + or self.service_is_enabled(self._target, "pacemaker") def _detect_ip_offset(self): """Detect the offset for IPaddr resources.""" if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) return # pylint thinks self["IPBase"] is a list, not a string, which causes it # to error out because a list doesn't have split(). # pylint: disable=no-member if int(self["IPBase"].split('.')[3]) >= 240: self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s" % (self["IPBase"], self["IPBase"].split('.')[3])) self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) def _filter_nodes(self): """ Filter the list of cluster nodes. If --limit-nodes is given, keep that many nodes from the front of the list of cluster nodes and drop the rest. """ if self["node-limit"] > 0: if len(self["nodes"]) > self["node-limit"]: # pylint thinks self["node-limit"] is a list even though we initialize # it as an int in __init__ and treat it as an int everywhere. # pylint: disable=bad-string-format-type self._logger.log("Limiting the number of nodes configured=%d (max=%d)" % (len(self["nodes"]), self["node-limit"])) while len(self["nodes"]) > self["node-limit"]: self["nodes"].pop(len(self["nodes"]) - 1) def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" self._target = random.Random().choice(self["nodes"]) exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser self._detect_systemd() self._detect_syslog() self._detect_at_boot() self._detect_ip_offset() def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0]) grp1 = parser.add_argument_group("Common options") grp1.add_argument("-g", "--dsh-group", "--group", metavar="GROUP", dest="group", help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)") grp1.add_argument("-l", "--limit-nodes", type=int, default=0, metavar="MAX", help="Only use the first MAX cluster nodes supplied with --nodes") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", metavar="NODES", help="List of cluster nodes separated by whitespace") grp1.add_argument("--stack", default="corosync", metavar="STACK", help="Which cluster stack is installed") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes") grp2.add_argument("--at-boot", "--cluster-starts-at-boot", choices=["1", "0", "yes", "no"], help="Does the cluster software start at boot time?") grp2.add_argument("--facility", "--syslog-facility", default="daemon", metavar="NAME", help="Which syslog facility to log to") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named test") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--experimental-tests", action="store_true", help="Include experimental tests") grp4.add_argument("--loop-minutes", type=int, default=60, help="") grp4.add_argument("--no-loop-tests", action="store_true", help="Don't run looping/time-based tests") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--oprofile", metavar="NODES", help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--qarsh", action="store_true", help="Use QARSH to access nodes instead of SSH") grp4.add_argument("--schema", metavar="SCHEMA", default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION, help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--set", action="append", metavar="ARG", default=[], help="Set key=value pairs (can be specified multiple times)") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--valgrind-procs", metavar="PROCS", default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd", help="Run valgrind against the given space-separated list of processes") grp4.add_argument("--valgrind-tests", action="store_true", help="Include tests using valgrind") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. They get a default from the add_argument # calls, only do one thing, and they do not have any side effects. self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["Stack"] = args.stack self["SyslogFacility"] = args.facility self["TruncateLog"] = args.truncate self["at-boot"] = args.at_boot in ["1", "yes"] self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["experimental-tests"] = args.experimental_tests self["iterations"] = args.iterations self["loop-minutes"] = args.loop_minutes self["loop-tests"] = not args.no_loop_tests self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["node-limit"] = args.limit_nodes self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["valgrind-procs"] = args.valgrind_procs self["valgrind-tests"] = args.valgrind_tests self["warn-inactive"] = args.warn_inactive # Nodes and groups are mutually exclusive, so their defaults cannot be # set in their add_argument calls. Additionally, groups does more than # just set a value. Here, set nodes first and then if a group is # specified, override the previous nodes value. if args.nodes: self["nodes"] = args.nodes.split(" ") else: self["nodes"] = [] if args.group: self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group) LogFactory().add_file(self["OutputFile"], "CTS") dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group) if os.path.isfile(dsh_file): self["nodes"] = [] with open(dsh_file, "r", encoding="utf-8") as f: for line in f: stripped = line.strip() if not stripped.startswith('#'): self["nodes"].append(stripped) else: print("Unknown DSH group: %s" % args.dsh_group) # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.cib_filename: self["CIBfilename"] = args.cib_filename else: self["CIBfilename"] = None if args.choose: self["scenario"] = "sequence" self["tests"].append(args.choose) if args.fencing: if args.fencing in ["0", "no"]: self["DoFencing"] = False else: self["DoFencing"] = True if args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % ( os.environ['OS_REGION_NAME'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL'], os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'] ) elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["LogWatcher"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.oprofile: self["oprofile"] = args.oprofile.split(" ") else: self["oprofile"] = [] if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True if args.qarsh: self._rsh.enable_qarsh() for kv in args.set: (name, value) = kv.split("=") self[name] = value print("Setting %s = %s" % (name, value)) class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance diff --git a/python/pacemaker/_cts/test.py b/python/pacemaker/_cts/test.py index 86bffc5683..d67abf7bca 100644 --- a/python/pacemaker/_cts/test.py +++ b/python/pacemaker/_cts/test.py @@ -1,594 +1,594 @@ """ A module providing base classes. These classes are used for defining regression tests and groups of regression tests. Everything exported here should be considered an abstract class that needs to be subclassed in order to do anything useful. Various functions will raise NotImplementedError if not overridden by a subclass. """ __copyright__ = "Copyright 2009-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+)" __all__ = ["Test", "Tests"] import io import os import re import shlex import signal import subprocess import sys import time from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError from pacemaker._cts.process import pipe_communicate from pacemaker.buildoptions import BuildOptions from pacemaker.exitstatus import ExitStatus def find_validator(rng_file): """ Return the command line used to validate XML output. If no validator is found, return None. """ if os.access("/usr/bin/xmllint", os.X_OK): if rng_file is None: return ["xmllint", "-"] return ["xmllint", "--relaxng", rng_file, "-"] return None def rng_directory(): """Return the directory containing RNG schema files.""" if "PCMK_schema_directory" in os.environ: return os.environ["PCMK_schema_directory"] if os.path.exists("%s/cts-fencing.in" % sys.path[0]): return "xml" return BuildOptions.SCHEMA_DIR class Pattern: """A class for checking log files for a given pattern.""" def __init__(self, pat, negative=False, regex=False): """ Create a new Pattern instance. Arguments: pat -- The string to search for negative -- If True, pat must not be found in any input regex -- If True, pat is a regex and not a substring """ self._pat = pat self.negative = negative self.regex = regex def __str__(self): return self._pat def match(self, line): """Return True if this pattern is found in the given line.""" if self.regex: return re.search(self._pat, line) is not None return self._pat in line class Test: """ The base class for a single regression test. A single regression test may still run multiple commands as part of its execution. """ def __init__(self, name, description, **kwargs): """ Create a new Test instance. This method must be provided by all subclasses, which must call Test.__init__ first. Arguments: description -- A user-readable description of the test, helpful in identifying what test is running or has failed. name -- The name of the test. Command line tools use this attribute to allow running only tests with the exact name, or tests whose name matches a given pattern. This should be unique among all tests. Keyword arguments: force_wait -- logdir -- The base directory under which to create a directory to store output and temporary data. timeout -- How long to wait for the test to complete. verbose -- Whether to print additional information, including verbose command output and daemon log files. """ self.description = description self.executed = False self.name = name self.force_wait = kwargs.get("force_wait", False) self.logdir = kwargs.get("logdir", "/tmp") self.timeout = kwargs.get("timeout", 2) self.verbose = kwargs.get("verbose", False) self._cmds = [] self._patterns = [] self._daemon_location = None self._daemon_output = "" self._daemon_process = None self._result_exitcode = ExitStatus.OK self._result_txt = "" # # PROPERTIES # @property def exitcode(self): """ Return the final exitcode of the Test. If all commands pass, this property will be ExitStatus.OK. Otherwise, this property will be the exitcode of the first command to fail. """ return self._result_exitcode @exitcode.setter def exitcode(self, value): self._result_exitcode = value @property def logpath(self): """ Return the path to the log for whatever daemon is being tested. Note that this requires all subclasses to set self._daemon_location before accessing this property or an exception will be raised. """ return os.path.join(self.logdir, "%s.log" % self._daemon_location) # # PRIVATE METHODS # def _kill_daemons(self): """Kill any running daemons in preparation for executing the test.""" raise NotImplementedError("_kill_daemons not provided by subclass") def _match_log_patterns(self): """ Check test output for expected patterns. Set self.exitcode and self._result_txt as appropriate. Not all subclass will need to do this. """ if len(self._patterns) == 0: return n_failed_matches = 0 n_negative_matches = 0 output = self._daemon_output.split("\n") for pat in self._patterns: positive_match = False for line in output: if pat.match(line): if pat.negative: n_negative_matches += 1 if self.verbose: print("This pattern should not have matched = '%s" % pat) break positive_match = True break if not pat.negative and not positive_match: n_failed_matches += 1 print("Pattern Not Matched = '%s'" % pat) if n_failed_matches > 0 or n_negative_matches > 0: msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches." self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches) self.exitcode = ExitStatus.ERROR def _new_cmd(self, cmd, args, exitcode, **kwargs): """ Add a command to be executed as part of this test. Arguments: cmd -- The program to run. args -- Commands line arguments to pass to cmd, as a string. exitcode -- The expected exit code of cmd. This can be used to run a command that is expected to fail. Keyword arguments: stdout_match -- If not None, a string that is expected to be present in the stdout of cmd. This can be a regular expression. no_wait -- Do not wait for cmd to complete. stdout_negative_match -- If not None, a string that is expected to be missing in the stdout of cmd. This can be a regualr expression. kill -- A command to be run after cmd, typically in order to kill a failed process. This should be the entire command line including arguments as a single string. validate -- If True, the output of cmd will be passed to xmllint for validation. If validation fails, XmlValidationError will be raised. check_rng -- If True and validate is True, command output will additionally be checked against the api-result.rng file. check_stderr -- If True, the stderr of cmd will be included in output. env -- If not None, variables to set in the environment """ self._cmds.append( { "args": args, "check_rng": kwargs.get("check_rng", True), "check_stderr": kwargs.get("check_stderr", True), "cmd": cmd, "expected_exitcode": exitcode, "kill": kwargs.get("kill", None), "no_wait": kwargs.get("no_wait", False), "stdout_match": kwargs.get("stdout_match", None), "stdout_negative_match": kwargs.get("stdout_negative_match", None), "validate": kwargs.get("validate", True), "env": kwargs.get("env", None), } ) def _start_daemons(self): """Start any necessary daemons in preparation for executing the test.""" raise NotImplementedError("_start_daemons not provided by subclass") # # PUBLIC METHODS # def add_cmd(self, cmd, args, validate=True, check_rng=True, check_stderr=True, env=None): """Add a simple command to be executed as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, validate=validate, check_rng=check_rng, check_stderr=check_stderr, env=env) def add_cmd_and_kill(self, cmd, args, kill_proc): """Add a command and system command to be executed as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, kill=kill_proc) def add_cmd_check_stdout(self, cmd, args, match, no_match=None, env=None): """Add a simple command with expected output to be executed as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, stdout_match=match, stdout_negative_match=no_match, env=env) def add_cmd_expected_fail(self, cmd, args, exitcode=ExitStatus.ERROR): """Add a command that is expected to fail to be executed as part of this test.""" self._new_cmd(cmd, args, exitcode) def add_cmd_no_wait(self, cmd, args): """Add a simple command to be executed (without waiting) as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, no_wait=True) def add_log_pattern(self, pattern, negative=False, regex=False): """Add a pattern that should appear in the test's logs.""" self._patterns.append(Pattern(pattern, negative=negative, regex=regex)) def _signal_dict(self): """Return a dictionary mapping signal numbers to their names.""" # FIXME: When we support python >= 3.5, this function can be replaced with: # signal.Signals(self.daemon_process.returncode).name return { getattr(signal, _signame): _signame - for _signame in dir(signal) - if _signame.startswith("SIG") and not _signame.startswith("SIG_") + for _signame in dir(signal) + if _signame.startswith("SIG") and not _signame.startswith("SIG_") } def clean_environment(self): """Clean up the host after executing a test.""" if self._daemon_process: if self._daemon_process.poll() is None: self._daemon_process.terminate() self._daemon_process.wait() else: rc = self._daemon_process.returncode signame = self._signal_dict().get(-rc, "RET=%s" % rc) msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)." self._result_txt = msg % (self.name, self._daemon_location, signame) self.exitcode = ExitStatus.ERROR self._daemon_process = None self._daemon_output = "" # the default for utf-8 encoding would error out if e.g. memory corruption # makes fenced output any kind of 8 bit value - while still interesting # for debugging and we'd still like the regression-test to go over the # full set of test-cases with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile: for line in logfile.readlines(): self._daemon_output += line if self.verbose: print("Daemon Output Start") print(self._daemon_output) print("Daemon Output End") def print_result(self, filler): """Print the result of the last test execution.""" print("%s%s" % (filler, self._result_txt)) def run(self): """Execute this test.""" i = 1 self.start_environment() if self.verbose: print("\n--- START TEST - %s" % self.name) self._result_txt = "SUCCESS - '%s'" % (self.name) self.exitcode = ExitStatus.OK for cmd in self._cmds: try: self.run_cmd(cmd) except ExitCodeError as e: print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode'])) self.set_error(i, cmd) break except OutputNotFoundError as e: print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e)) self.set_error(i, cmd) break except OutputFoundError as e: print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e)) self.set_error(i, cmd) break except XmlValidationError as e: print("Step %d FAILED - xmllint failed: %s" % (i, e)) self.set_error(i, cmd) break if self.verbose: print("Step %d SUCCESS" % (i)) i += 1 self.clean_environment() if self.exitcode == ExitStatus.OK: self._match_log_patterns() print(self._result_txt) if self.verbose: print("--- END TEST - %s\n" % self.name) self.executed = True def run_cmd(self, args): """Execute a command as part of this test.""" cmd = shlex.split(args['args']) cmd.insert(0, args['cmd']) if self.verbose: print("\n\nRunning: %s" % " ".join(cmd)) # FIXME: Using "with" here breaks fencing merge tests. # pylint: disable=consider-using-with if args['env']: new_env = os.environ.copy() new_env.update(args['env']) test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=new_env) else: test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if args['kill']: if self.verbose: print("Also running: %s" % args['kill']) # Typically, the kill argument is used to detect some sort of # failure. Without yielding for a few seconds here, the process # launched earlier that is listening for the failure may not have # time to connect to pacemaker-execd. time.sleep(2) subprocess.Popen(shlex.split(args['kill'])) if not args['no_wait']: test.wait() else: return ExitStatus.OK output = pipe_communicate(test, check_stderr=args['check_stderr']) if self.verbose: print(output) if test.returncode != args['expected_exitcode']: raise ExitCodeError(test.returncode) if args['stdout_match'] is not None and \ re.search(args['stdout_match'], output) is None: raise OutputNotFoundError(output) if args['stdout_negative_match'] is not None and \ re.search(args['stdout_negative_match'], output) is not None: raise OutputFoundError(output) if args['validate']: if args['check_rng']: rng_file = "%s/api/api-result.rng" % rng_directory() else: rng_file = None cmd = find_validator(rng_file) if not cmd: raise XmlValidationError("Could not find validator for %s" % rng_file) if self.verbose: print("\nRunning: %s" % " ".join(cmd)) with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator: output = pipe_communicate(validator, check_stderr=True, stdin=output) if self.verbose: print(output) if validator.returncode != 0: raise XmlValidationError(output) return ExitStatus.OK def set_error(self, step, cmd): """Record failure of this test.""" msg = "FAILURE - '%s' failed at step %d. Command: %s %s" self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args']) self.exitcode = ExitStatus.ERROR def start_environment(self): """Prepare the host for executing a test.""" if os.path.exists(self.logpath): os.remove(self.logpath) self._kill_daemons() self._start_daemons() logfile = None init_time = time.time() update_time = init_time while True: # FIXME: Eventually use 'with' here, which seems complicated given # everything happens in a loop. # pylint: disable=consider-using-with time.sleep(0.1) if not self.force_wait and logfile is None \ and os.path.exists(self.logpath): logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1") if not self.force_wait and logfile is not None: for line in logfile.readlines(): if "successfully started" in line: return now = time.time() if self.timeout > 0 and (now - init_time) >= self.timeout: if not self.force_wait: print("\tDaemon %s doesn't seem to have been initialized within %fs." "\n\tConsider specifying a longer '--timeout' value." % (self._daemon_location, self.timeout)) return if self.verbose and (now - update_time) >= 5: print("Waiting for %s to be initialized: %fs ..." % (self._daemon_location, now - init_time)) update_time = now class Tests: """The base class for a collection of regression tests.""" def __init__(self, **kwargs): """ Create a new Tests instance. This method must be provided by all subclasses, which must call Tests.__init__ first. Keywork arguments: force_wait -- logdir -- The base directory under which to create a directory to store output and temporary data. timeout -- How long to wait for the test to complete. verbose -- Whether to print additional information, including verbose command output and daemon log files. """ self.force_wait = kwargs.get("force_wait", False) self.logdir = kwargs.get("logdir", "/tmp") self.timeout = kwargs.get("timeout", 2) self.verbose = kwargs.get("verbose", False) self._tests = [] def exit(self): """Exit (with error status code if any test failed).""" for test in self._tests: if not test.executed: continue if test.exitcode != ExitStatus.OK: sys.exit(ExitStatus.ERROR) sys.exit(ExitStatus.OK) def print_list(self): """List all registered tests.""" print("\n==== %d TESTS FOUND ====" % len(self._tests)) print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION")) print("%35s - %s" % ("--------------------", "--------------------")) for test in self._tests: print("%35s - %s" % (test.name, test.description)) print("==== END OF LIST ====\n") def print_results(self): """Print summary of results of executed tests.""" failures = 0 success = 0 print("\n\n======= FINAL RESULTS ==========") print("\n--- FAILURE RESULTS:") for test in self._tests: if not test.executed: continue if test.exitcode != ExitStatus.OK: failures += 1 test.print_result(" ") else: success += 1 if failures == 0: print(" None") print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures)) def run_single(self, name): """Run a single named test.""" for test in self._tests: if test.name == name: test.run() break def run_tests(self): """Run all tests.""" for test in self._tests: test.run() def run_tests_matching(self, pattern): """Run all tests whose name matches a pattern.""" for test in self._tests: if test.name.count(pattern) != 0: test.run() diff --git a/python/pacemaker/_cts/tests/remotedriver.py b/python/pacemaker/_cts/tests/remotedriver.py index df7f18078a..c24fe7f24d 100644 --- a/python/pacemaker/_cts/tests/remotedriver.py +++ b/python/pacemaker/_cts/tests/remotedriver.py @@ -1,548 +1,542 @@ """Base classes for CTS tests.""" __all__ = ["RemoteDriver"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import time import subprocess import tempfile from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.tests.stoptest import StopTest from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable class RemoteDriver(CTSTest): """ A specialized base class for cluster tests that run on Pacemaker Remote nodes. This builds on top of CTSTest to provide methods for starting and stopping services and resources, and managing remote nodes. This is still just an abstract class -- specific tests need to implement their own specialized behavior. """ def __init__(self, cm): """ Create a new RemoteDriver instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "RemoteDriver" self._corosync_enabled = False self._pacemaker_enabled = False self._remote_node = None self._remote_rsc = "remote-rsc" self._start = StartTest(cm) self._startall = SimulStartLite(cm) self._stop = StopTest(cm) self.reset() def reset(self): """Reset the state of this test back to what it was before the test was run.""" self.failed = False self.fail_string = "" self._pcmk_started = False self._remote_node_added = False self._remote_rsc_added = False self._remote_use_reconnect_interval = self._env.random_gen.choice([True, False]) def fail(self, msg): """Mark test as failed.""" self.failed = True # Always log the failure. self._logger.log(msg) # Use first failure as test status, as it's likely to be most useful. if not self.fail_string: self.fail_string = msg def _get_other_node(self, node): """ Get the first cluster node out of the environment that is not the given node. Typically, this is used to find some node that will still be active that we can run cluster commands on. """ for othernode in self._env["nodes"]: if othernode == node: # we don't want to try and use the cib that we just shutdown. # find a cluster node that is not our soon to be remote-node. continue return othernode def _del_rsc(self, node, rsc): """ Delete the given named resource from the cluster. The given `node` is the cluster node on which we should *not* run the delete command. """ othernode = self._get_other_node(node) (rc, _) = self._rsh(othernode, "crm_resource -D -r %s -t primitive" % rsc) if rc != 0: self.fail("Removal of resource '%s' failed" % rsc) def _add_rsc(self, node, rsc_xml): """ Add a resource given in XML format to the cluster. The given `node` is the cluster node on which we should *not* run the add command. """ othernode = self._get_other_node(node) (rc, _) = self._rsh(othernode, "cibadmin -C -o resources -X '%s'" % rsc_xml) if rc != 0: self.fail("resource creation failed") def _add_primitive_rsc(self, node): """ Add a primitive heartbeat resource for the remote node to the cluster. The given `node` is the cluster node on which we should *not* run the add command. """ rsc_xml = """ -""" % { - "node": self._remote_rsc -} +""" % {"node": self._remote_rsc} self._add_rsc(node, rsc_xml) if not self.failed: self._remote_rsc_added = True def _add_connection_rsc(self, node): """ Add a primitive connection resource for the remote node to the cluster. The given `node` is the cluster node on which we should *not* run the add command. """ rsc_xml = """ -""" % { - "node": self._remote_node, "server": node -} +""" % {"node": self._remote_node, "server": node} if self._remote_use_reconnect_interval: # Set reconnect interval on resource rsc_xml += """ """ % self._remote_node rsc_xml += """ -""" % { - "node": self._remote_node -} +""" % {"node": self._remote_node} self._add_rsc(node, rsc_xml) if not self.failed: self._remote_node_added = True def _disable_services(self, node): """Disable the corosync and pacemaker services on the given node.""" self._corosync_enabled = self._env.service_is_enabled(node, "corosync") if self._corosync_enabled: self._env.disable_service(node, "corosync") self._pacemaker_enabled = self._env.service_is_enabled(node, "pacemaker") if self._pacemaker_enabled: self._env.disable_service(node, "pacemaker") def _enable_services(self, node): """Enable the corosync and pacemaker services on the given node.""" if self._corosync_enabled: self._env.enable_service(node, "corosync") if self._pacemaker_enabled: self._env.enable_service(node, "pacemaker") def _stop_pcmk_remote(self, node): """Stop the Pacemaker Remote service on the given node.""" for _ in range(10): (rc, _) = self._rsh(node, "service pacemaker_remote stop") if rc != 0: time.sleep(6) else: break def _start_pcmk_remote(self, node): """Start the Pacemaker Remote service on the given node.""" for _ in range(10): (rc, _) = self._rsh(node, "service pacemaker_remote start") if rc != 0: time.sleep(6) else: self._pcmk_started = True break def _freeze_pcmk_remote(self, node): """Simulate a Pacemaker Remote daemon failure.""" self._rsh(node, "killall -STOP pacemaker-remoted") def _resume_pcmk_remote(self, node): """Simulate the Pacemaker Remote daemon recovering.""" self._rsh(node, "killall -CONT pacemaker-remoted") def _start_metal(self, node): """ Set up a Pacemaker Remote configuration. Remove any existing connection resources or nodes. Start the pacemaker_remote service. Create a connection resource. """ # Cluster nodes are reused as remote nodes in remote tests. If cluster # services were enabled at boot, in case the remote node got fenced, the # cluster node would join instead of the expected remote one. Meanwhile # pacemaker_remote would not be able to start. Depending on the chances, # the situations might not be able to be orchestrated gracefully any more. # # Temporarily disable any enabled cluster serivces. self._disable_services(node) # make sure the resource doesn't already exist for some reason self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_rsc) self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_node) if not self._stop(node): self.fail("Failed to shutdown cluster node %s" % node) return self._start_pcmk_remote(node) if not self._pcmk_started: self.fail("Failed to start pacemaker_remote on node %s" % node) return # Convert node to baremetal now that it has shutdown the cluster stack pats = [] watch = self.create_watch(pats, 120) watch.set_watch() pats.extend([ self.templates["Pat:RscOpOK"] % ("start", self._remote_node), self.templates["Pat:DC_IDLE"] ]) self._add_connection_rsc(node) with Timer(self._logger, self.name, "remoteMetalInit"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def migrate_connection(self, node): """Move the remote connection resource to any other available node.""" if self.failed: return pats = [ self.templates["Pat:RscOpOK"] % ("migrate_to", self._remote_node), self.templates["Pat:RscOpOK"] % ("migrate_from", self._remote_node), self.templates["Pat:DC_IDLE"] ] watch = self.create_watch(pats, 120) watch.set_watch() (rc, _) = self._rsh(node, "crm_resource -M -r %s" % self._remote_node, verbose=1) if rc != 0: self.fail("failed to move remote node connection resource") return with Timer(self._logger, self.name, "remoteMetalMigrate"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def fail_rsc(self, node): """ Cause the dummy resource running on a Pacemaker Remote node to fail. Verify that the failure is logged correctly. """ if self.failed: return watchpats = [ self.templates["Pat:RscRemoteOpOK"] % ("stop", self._remote_rsc, self._remote_node), self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node), self.templates["Pat:DC_IDLE"] ] watch = self.create_watch(watchpats, 120) watch.set_watch() self.debug("causing dummy rsc to fail.") self._rsh(node, "rm -f /var/run/resource-agents/Dummy*") with Timer(self._logger, self.name, "remoteRscFail"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched) def fail_connection(self, node): """ Cause the remote connection resource to fail. Verify that the node is fenced and the connection resource is restarted on another node. """ if self.failed: return watchpats = [ self.templates["Pat:Fencing_ok"] % self._remote_node, self.templates["Pat:NodeFenced"] % self._remote_node ] watch = self.create_watch(watchpats, 120) watch.set_watch() # freeze the pcmk remote daemon. this will result in fencing self.debug("Force stopped active remote node") self._freeze_pcmk_remote(node) self.debug("Waiting for remote node to be fenced.") with Timer(self._logger, self.name, "remoteMetalFence"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return self.debug("Waiting for the remote node to come back up") self._cm.ns.wait_for_node(node, 120) pats = [] watch = self.create_watch(pats, 240) watch.set_watch() pats.append(self.templates["Pat:RscOpOK"] % ("start", self._remote_node)) if self._remote_rsc_added: pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node)) # start the remote node again watch it integrate back into cluster. self._start_pcmk_remote(node) if not self._pcmk_started: self.fail("Failed to start pacemaker_remote on node %s" % node) return self.debug("Waiting for remote node to rejoin cluster after being fenced.") with Timer(self._logger, self.name, "remoteMetalRestart"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def _add_dummy_rsc(self, node): """Add a dummy resource that runs on the Pacemaker Remote node.""" if self.failed: return # verify we can put a resource on the remote node pats = [] watch = self.create_watch(pats, 120) watch.set_watch() pats.extend([ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node), self.templates["Pat:DC_IDLE"] ]) # Add a resource that must live on remote-node self._add_primitive_rsc(node) # force that rsc to prefer the remote node. (rc, _) = self._cm.rsh(node, "crm_resource -M -r %s -N %s -f" % (self._remote_rsc, self._remote_node), verbose=1) if rc != 0: self.fail("Failed to place remote resource on remote node.") return with Timer(self._logger, self.name, "remoteMetalRsc"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def test_attributes(self, node): """Verify that attributes can be set on the Pacemaker Remote node.""" if self.failed: return # This verifies permanent attributes can be set on a remote-node. It also # verifies the remote-node can edit its own cib node section remotely. (rc, line) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % self._remote_node, verbose=1) if rc != 0: self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line)) return (rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % self._remote_node, verbose=1) if rc != 0: self.fail("Failed to get remote-node attribute") return (rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % self._remote_node, verbose=1) if rc != 0: self.fail("Failed to delete remote-node attribute") def cleanup_metal(self, node): """ Clean up the Pacemaker Remote node configuration previously created by _setup_metal. Stop and remove dummy resources and connection resources. Stop the pacemaker_remote service. Remove the remote node itself. """ self._enable_services(node) if not self._pcmk_started: return pats = [] watch = self.create_watch(pats, 120) watch.set_watch() if self._remote_rsc_added: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_rsc)) if self._remote_node_added: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_node)) with Timer(self._logger, self.name, "remoteMetalCleanup"): self._resume_pcmk_remote(node) if self._remote_rsc_added: # Remove dummy resource added for remote node tests self.debug("Cleaning up dummy rsc put on remote node") self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_rsc) self._del_rsc(node, self._remote_rsc) if self._remote_node_added: # Remove remote node's connection resource self.debug("Cleaning up remote node connection resource") self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_node) self._del_rsc(node, self._remote_node) watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) self._stop_pcmk_remote(node) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() if self._remote_node_added: # Remove remote node itself self.debug("Cleaning up node entry for remote node") self._rsh(self._get_other_node(node), "crm_node --force --remove %s" % self._remote_node) def _setup_env(self, node): """ Set up the environment to allow Pacemaker Remote to function. This involves generating a key and copying it to all nodes in the cluster. """ self._remote_node = "remote-%s" % node # we are assuming if all nodes have a key, that it is # the right key... If any node doesn't have a remote # key, we regenerate it everywhere. if self._rsh.exists_on_all("/etc/pacemaker/authkey", self._env["nodes"]): return # create key locally (handle, keyfile) = tempfile.mkstemp(".cts") os.close(handle) subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # sync key throughout the cluster for n in self._env["nodes"]: self._rsh(n, "mkdir -p --mode=0750 /etc/pacemaker") self._rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % n) self._rsh(n, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey") self._rsh(n, "chmod 0640 /etc/pacemaker/authkey") os.unlink(keyfile) def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" if not CTSTest.is_applicable(self): return False for node in self._env["nodes"]: (rc, _) = self._rsh(node, "which pacemaker-remoted >/dev/null 2>&1") if rc != 0: return False return True def start_new_test(self, node): """Prepare a remote test for running by setting up its environment and resources.""" self.incr("calls") self.reset() ret = self._startall(None) if not ret: return self.failure("setup failed: could not start all nodes") self._setup_env(node) self._start_metal(node) self._add_dummy_rsc(node) return True def __call__(self, node): """Perform this test.""" raise NotImplementedError @property def errors_to_ignore(self): """Return list of errors which should be ignored.""" return [ r"""is running on remote.*which isn't allowed""", r"""Connection terminated""", r"""Could not send remote""" ]