diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index fb6f92e741..8b3499e5fe 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,529 +1,523 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory", "set_cts_path"] __copyright__ = "Copyright 2014-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse from contextlib import suppress from glob import glob import os import random import shlex import socket import sys from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). # @TODO See if type annotations fix this. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["DoFencing"] = True self["CIBResource"] = False self["log_kind"] = None self["scenario"] = "random" # Hard-coded since there is only one supported cluster manager/stack self["Name"] = "crm-corosync" self["Stack"] = "corosync 2+" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def dump(self): """Print the current environment.""" for key in sorted(self.data.keys()): self._logger.debug(f"{f'Environment[{key}]':35}: {str(self[key])}") def __contains__(self, key): """Return True if the given key exists in the environment.""" return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "nodes": self.data["nodes"] = [] for node in value: node = node.strip() # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: # @TODO This only handles IPv4, use getaddrinfo() instead # (here and in _discover()) socket.gethostbyname_ex(node) self.data["nodes"].append(node) except socket.herror: self._logger.log(f"{node} not found in DNS... aborting") raise else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def _detect_systemd(self, node): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(node, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self, node): """Detect the syslog variant in use on the target node (if any).""" if "syslogd" in self.data: return if self["have_systemd"]: # Systemd (_, lines) = self._rsh(node, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) else: # SYS-V (_, lines) = self._rsh(node, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) with suppress(IndexError): self["syslogd"] = lines[0].strip() def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl disable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} off") return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl enable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} on") return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, f"systemctl is-enabled {service} | grep enabled") return rc == 0 # SYS-V (rc, _) = self._rsh(node, f"chkconfig --list | grep -e {service}.*on") return rc == 0 def _detect_at_boot(self, node): """Detect if the cluster starts at boot.""" if "at-boot" not in self.data: self["at-boot"] = self.service_is_enabled(node, "corosync") \ or self.service_is_enabled(node, "pacemaker") def _detect_ip_offset(self, node): """Detect the offset for IPaddr resources.""" if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(node, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(node, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") return last_part = self["IPBase"].split('.')[3] if int(last_part) >= 240: self._logger.log(f"Could not determine an offset for IPaddr resources. Upper bound is too high: {self['IPBase']} {last_part}") self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser node = self["nodes"][0] self._detect_systemd(node) self._detect_syslog(node) self._detect_at_boot(node) self._detect_ip_offset(node) def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog=f"{sys.argv[0]} -g virt1 -r --stonith ssh --schema pacemaker-2.0 500") grp1 = parser.add_argument_group("Common options") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", default="", metavar="NODES", help="List of cluster nodes separated by whitespace") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)") grp2.add_argument("--at-boot", "--cluster-starts-at-boot", choices=["1", "0", "yes", "no"], help="Does the cluster software start at boot time?") grp2.add_argument("--facility", "--syslog-facility", default="daemon", metavar="NAME", help="Which syslog facility to log to") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named tests, separated by whitespace") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--experimental-tests", action="store_true", help="Include experimental tests") grp4.add_argument("--loop-minutes", type=int, default=60, help="") grp4.add_argument("--no-loop-tests", action="store_true", help="Don't run looping/time-based tests") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--oprofile", default="", metavar="NODES", help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") - grp4.add_argument("--qarsh", - action="store_true", - help="Use QARSH to access nodes instead of SSH") grp4.add_argument("--schema", metavar="SCHEMA", default=f"pacemaker-{BuildOptions.CIB_SCHEMA_VERSION}", help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--set", action="append", metavar="ARG", default=[], help="Set key=value pairs (can be specified multiple times)") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. Most get a default from the add_argument # calls, they only do one thing, and they do not have any side effects. self["CIBfilename"] = args.cib_filename if args.cib_filename else None self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["SyslogFacility"] = args.facility self["TruncateLog"] = args.truncate self["at-boot"] = args.at_boot in ["1", "yes"] self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["experimental-tests"] = args.experimental_tests self["iterations"] = args.iterations self["loop-minutes"] = args.loop_minutes self["loop-tests"] = not args.no_loop_tests self["nodes"] = shlex.split(args.nodes) self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["oprofile"] = shlex.split(args.oprofile) self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["warn-inactive"] = args.warn_inactive # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.choose: self["scenario"] = "sequence" self["tests"].extend(shlex.split(args.choose)) self["iterations"] = len(self["tests"]) if args.fencing in ["0", "no"]: self["DoFencing"] = False elif args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") region = os.environ['OS_REGION_NAME'] tenant = os.environ['OS_TENANT_NAME'] auth = os.environ['OS_AUTH_URL'] user = os.environ['OS_USERNAME'] password = os.environ['OS_PASSWORD'] self["stonith-params"] = f"region={region},tenant={tenant},auth={auth},user={user},password={password}" elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") user = os.environ['RHEVM_USERNAME'] password = os.environ['RHEVM_PASSWORD'] server = os.environ['RHEVM_SERVER'] port = os.environ['RHEVM_PORT'] self["stonith-params"] = f"login={user},passwd={password},ipaddr={server},ipport={port},ssl=1,shell_timeout=10" if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile == "journal": self["LogAuditDisabled"] = True self["log_kind"] = LogKind.JOURNAL elif args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["log_kind"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True - if args.qarsh: - self._rsh.enable_qarsh() - self.random_gen.seed(args.seed) for kv in args.set: (name, value) = kv.split("=") self[name] = value print(f"Setting {name} = {value}") class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance def set_cts_path(extra=None): """Set the PATH environment variable appropriately for the tests.""" new_path = os.environ['PATH'] # Add any search paths given on the command line if extra is not None: for p in extra: new_path = f"{p}:{new_path}" cwd = os.getcwd() if os.path.exists(f"{cwd}/cts/cts-attrd.in"): # pylint: disable=protected-access print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR}") for d in glob(f"{BuildOptions._BUILD_DIR}/daemons/*/"): new_path = f"{d}:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}" print(f"Using local schemas from: {cwd}/xml") os.environ["PCMK_schema_directory"] = f"{cwd}/xml" else: print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {cwd})") new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}" os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR print(f'Using PATH="{new_path}"') os.environ['PATH'] = new_path diff --git a/python/pacemaker/_cts/remote.py b/python/pacemaker/_cts/remote.py index 4745ca2777..9605a139c1 100644 --- a/python/pacemaker/_cts/remote.py +++ b/python/pacemaker/_cts/remote.py @@ -1,290 +1,282 @@ """Remote command runner for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["RemoteExec", "RemoteFactory"] __copyright__ = "Copyright 2014-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import os from subprocess import Popen, PIPE from threading import Thread from pacemaker._cts.logging import LogFactory def convert2string(lines): """ Convert byte strings to UTF-8 strings. Lists of byte strings are converted to a list of UTF-8 strings. All other text formats are passed through. """ if isinstance(lines, bytes): return lines.decode("utf-8") if isinstance(lines, list): lst = [] for line in lines: if isinstance(line, bytes): line = line.decode("utf-8") lst.append(line) return lst return lines class AsyncCmd(Thread): """A class for doing the hard work of running a command on another machine.""" def __init__(self, node, command, proc=None, delegate=None): """ Create a new AsyncCmd instance. Arguments: node -- The remote machine to run on command -- The ssh command string to use for remote execution proc -- If not None, a process object previously created with Popen. Instead of spawning a new process, we will then wait on this process to finish and handle its output. delegate -- When the command completes, call the async_complete method on this object """ self._command = command self._delegate = delegate self._logger = LogFactory() self._node = node self._proc = proc Thread.__init__(self) def run(self): """Run the previously instantiated AsyncCmd object.""" out = None err = None if not self._proc: # pylint: disable=consider-using-with self._proc = Popen(self._command, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True) self._logger.debug(f"cmd: async: target={self._node}, pid={self._proc.pid}: {self._command}") self._proc.wait() if self._delegate: self._logger.debug(f"cmd: pid {self._proc.pid} returned {self._proc.returncode} to {self._delegate!r}") else: self._logger.debug(f"cmd: pid {self._proc.pid} returned {self._proc.returncode}") if self._proc.stderr: err = self._proc.stderr.readlines() self._proc.stderr.close() for line in err: self._logger.debug(f"cmd: stderr[{self._proc.pid}]: {line}") err = convert2string(err) if self._proc.stdout: out = self._proc.stdout.readlines() self._proc.stdout.close() out = convert2string(out) if self._delegate: self._delegate.async_complete(self._proc.pid, self._proc.returncode, out, err) class RemoteExec: """ An abstract class for remote execution. It runs a command on another machine using ssh and scp. """ def __init__(self, command, cp_command, silent=False): """ Create a new RemoteExec instance. Arguments: command -- The ssh command string to use for remote execution cp_command -- The scp command string to use for copying files silent -- Should we log command status? """ self._command = command self._cp_command = cp_command self._logger = LogFactory() self._silent = silent self._our_node = os.uname()[1].lower() def _fixcmd(self, cmd): """Perform shell escapes on certain characters in the input cmd string.""" return re.sub("\'", "'\\''", cmd) def _cmd(self, args): """Given a list of arguments, return the string that will be run on the remote system.""" sysname = args[0] command = args[1] if sysname is None or sysname.lower() in [self._our_node, "localhost"]: ret = command else: ret = f"{self._command} {sysname} '{self._fixcmd(command)}'" return ret def _log(self, args): """Log a message.""" if not self._silent: self._logger.log(args) def _debug(self, args): """Log a message at the debug level.""" if not self._silent: self._logger.debug(args) def call_async(self, node, command, delegate=None): """ Run the given command on the given remote system and do not wait for it to complete. Arguments: node -- The remote machine to run on command -- The command to run, as a string delegate -- When the command completes, call the async_complete method on this object Returns the running process object. """ aproc = AsyncCmd(node, self._cmd([node, command]), delegate=delegate) aproc.start() return aproc def __call__(self, node, command, synchronous=True, verbose=2): """ Run the given command on the given remote system. If you call this class like a function, this is what gets called. It's approximately the same as a system() call on the remote machine. Arguments: node -- The remote machine to run on command -- The command to run, as a string synchronous -- Should we wait for the command to complete? verbose -- If 0, do not log anything. If 1, log the command and its return code but not its output. If 2, additionally log command output. Returns a tuple of (return code, command output). """ rc = 0 result = None # pylint: disable=consider-using-with proc = Popen(self._cmd([node, command]), stdout=PIPE, stderr=PIPE, close_fds=True, shell=True) if not synchronous and proc.pid > 0 and not self._silent: aproc = AsyncCmd(node, command, proc=proc) aproc.start() return (rc, result) if proc.stdout: result = proc.stdout.readlines() proc.stdout.close() else: self._log("No stdout stream") rc = proc.wait() if verbose > 0: self._debug(f"cmd: target={node}, rc={rc}: {command}") result = convert2string(result) if proc.stderr: errors = proc.stderr.readlines() proc.stderr.close() for err in errors: self._debug(f"cmd: stderr: {err}") if verbose == 2: for line in result: self._debug(f"cmd: stdout: {line}") return (rc, result) def copy(self, source, target, silent=False): """ Perform a copy of the source file to the remote target. This function uses the cp_command provided when the RemoteExec object was created. Returns the return code of the cp_command. """ # @TODO Use subprocess module with argument array instead # (self._cp_command should be an array too) cmd = f"{self._cp_command} '{source}' '{target}'" rc = os.system(cmd) if not silent: self._debug(f"cmd: rc={rc}: {cmd}") return rc def exists_on_all(self, filename, hosts): """Return True if specified file exists on all specified hosts.""" for host in hosts: (rc, _) = self(host, f"test -r {filename}") if rc != 0: return False return True def exists_on_none(self, filename, hosts): """Return True if specified file does not exist on any specified host.""" for host in hosts: (rc, _) = self(host, f"test -r {filename}") if rc == 0: return False return True class RemoteFactory: """A class for constructing a singleton instance of a RemoteExec object.""" # Class variables # -n: no stdin, -x: no X11, # -o ServerAliveInterval=5: disconnect after 3*5s if the server # stops responding command = ("ssh -l root -n -x -o ServerAliveInterval=5 " "-o ConnectTimeout=10 -o TCPKeepAlive=yes " "-o ServerAliveCountMax=3 ") # -B: batch mode, -q: no stats (quiet) cp_command = "scp -B -q" instance = None # pylint: disable=invalid-name def getInstance(self): """ Return the previously created instance of RemoteExec. If no instance exists, create one and then return that. """ if not RemoteFactory.instance: RemoteFactory.instance = RemoteExec(RemoteFactory.command, RemoteFactory.cp_command, False) return RemoteFactory.instance - - def enable_qarsh(self): - """Enable the QA remote shell.""" - # http://nstraz.wordpress.com/2008/12/03/introducing-qarsh/ - print("Using QARSH for connections to cluster nodes") - - RemoteFactory.command = "qarsh -t 300 -l root" - RemoteFactory.cp_command = "qacp -q"