diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py index ab0a21e93d..2a4ff1125a 100644 --- a/python/pacemaker/_cts/clustermanager.py +++ b/python/pacemaker/_cts/clustermanager.py @@ -1,817 +1,817 @@ """ClusterManager class for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["ClusterManager"] __copyright__ = """Copyright 2000-2025 the Pacemaker project contributors. Certain portions by Huang Zhen are copyright 2004 International Business Machines. The version control history for this file may have further details.""" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import time from collections import UserDict from pacemaker.buildoptions import BuildOptions from pacemaker.exitstatus import ExitStatus from pacemaker._cts.CTS import NodeStatus from pacemaker._cts.audits import AuditResource from pacemaker._cts.cib import ConfigFactory from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogWatcher # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). # @TODO See if type annotations fix this. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable # ClusterManager has a lot of methods. # pylint: disable=too-many-public-methods class ClusterManager(UserDict): """ An abstract base class for managing the cluster. This class implements high-level operations on the cluster and/or its cluster managers. Actual cluster-specific management classes should be subclassed from this one. Among other things, this class tracks the state every node is expected to be in. """ def __init__(self): """Create a new ClusterManager instance.""" # Eventually, ClusterManager should not be a UserDict subclass. Until # that point... # pylint: disable=super-init-not-called self.__instance_errors_to_ignore = [] self._cib_installed = False self._logger = LogFactory() self.env = EnvFactory().getInstance() self.expected_status = {} self.name = self.env["Name"] # pylint: disable=invalid-name self.ns = NodeStatus(self.env) self.our_node = os.uname()[1].lower() self.partitions_expected = 1 self.rsh = RemoteFactory().getInstance() self.templates = PatternSelector(self.name) self._cib_factory = ConfigFactory(self) self._cib = self._cib_factory.create_config(self.env["Schema"]) self._cib_sync = {} def clear_instance_errors_to_ignore(self): """Reset instance-specific errors to ignore on each iteration.""" self.__instance_errors_to_ignore = [] @property def instance_errors_to_ignore(self): """Return a list of known errors that should be ignored for a specific test instance.""" return self.__instance_errors_to_ignore @property def errors_to_ignore(self): """Return a list of known error messages that should be ignored.""" return self.templates.get_patterns("BadNewsIgnore") def log(self, args): """Log a message.""" self._logger.log(args) def debug(self, args): """Log a debug message.""" self._logger.debug(args) def upcount(self): """Return how many nodes are up.""" count = 0 for node in self.env["nodes"]: if self.expected_status[node] == "up": count += 1 return count def install_support(self, command="install"): """ Install or uninstall the CTS support files. This includes various init scripts and data, daemons, fencing agents, etc. """ for node in self.env["nodes"]: self.rsh(node, f"{BuildOptions.DAEMON_DIR}/cts-support {command}") def prepare_fencing_watcher(self): """Return a LogWatcher object that watches for fencing log messages.""" # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced if self.has_quorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonith_pats = [] for peer in self.env["nodes"]: if self.expected_status[peer] == "up": continue stonith_pats.extend([ self.templates["Pat:Fencing_ok"] % peer, self.templates["Pat:Fencing_start"] % peer, ]) stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"], self.env["log_kind"], "StartupFencing", 0) stonith.set_watch() return stonith def fencing_cleanup(self, node, stonith): """Wait for a previously fenced node to return to the cluster.""" peer_list = [] peer_state = {} self.debug(f"Looking for nodes that were fenced as a result of {node} starting") # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.has_quorum(None) if not q and len(self.env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug(f"Quorum: {q} Len: {len(self.env['nodes'])}") return peer_list for n in self.env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug(f"looking for: {stonith.regexes!r}") shot = stonith.look(0) while shot: self.debug(f"Found: {shot!r}") del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self._logger.log(f"ERROR: Unknown stonith match: {shot!r}") elif peer not in peer_list: self.debug(f"Found peer: {peer}") peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(f" Peer {peer} was fenced as a result of {node} starting: {peer_state[peer]}") if self.env["at-boot"]: self.expected_status[peer] = "up" else: self.expected_status[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while stonith.regexes and shot: self.debug(f"Found: {shot!r}") del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too - self.ns.wait_for_node(peer, self.env["DeadTime"]) + self.ns.wait_for_node(peer, self.env["dead_time"]) # Poll until it comes up if self.env["at-boot"]: if not self.stat_cm(peer): time.sleep(self.env["StartTime"]) if not self.stat_cm(peer): self._logger.log(f"ERROR: Peer {peer} failed to restart after being fenced") return None return peer_list def _install_config(self, node): """Remove and re-install the CIB on the first node in the cluster.""" if not self.ns.wait_for_node(node): self.log(f"Node {node} is not up.") return if node in self._cib_sync or not self.env["ClobberCIB"]: return self._cib_sync[node] = True self.rsh(node, f"rm -f {BuildOptions.CIB_DIR}/cib*") # Only install the CIB on the first node, all the other ones will pick it up from there if self._cib_installed: return self._cib_installed = True if self.env["CIBfilename"]: self.log(f"Installing CIB ({self.env['CIBfilename']}) on node {node}") rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) if rc != 0: raise ValueError(f"Can not scp file to {node} {rc}") else: self.log(f"Installing Generated CIB on node {node}") self._cib.install(node) self.rsh(node, f"chown {BuildOptions.DAEMON_USER} {BuildOptions.CIB_DIR}/cib.xml") def start_cm(self, node, verbose=False): """Start up the cluster manager on a given node.""" log_fn = self._logger.log if verbose else self.debug log_fn(f"Starting {self.name} on node {node}") if node not in self.expected_status: self.expected_status[node] = "down" if self.expected_status[node] != "down": return True # Technically we should always be able to notice ourselves starting patterns = [ self.templates["Pat:Local_started"] % node, ] if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) watch = LogWatcher(self.env["LogFileName"], patterns, self.env["nodes"], self.env["log_kind"], "StartaCM", self.env["StartTime"] + 10) self._install_config(node) self.expected_status[node] = "any" - if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): + if self.stat_cm(node) and self.cluster_stable(self.env["dead_time"]): self._logger.log(f"{node} was already started") return True stonith = self.prepare_fencing_watcher() watch.set_watch() (rc, _) = self.rsh(node, self.templates["StartCmd"]) if rc != 0: self._logger.log(f"Warn: Start command failed on node {node}") self.fencing_cleanup(node, stonith) return False self.expected_status[node] = "up" watch_result = watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self._logger.log(f"Warn: Startup pattern not found: {regex}") - if watch_result and self.cluster_stable(self.env["DeadTime"]): + if watch_result and self.cluster_stable(self.env["dead_time"]): self.fencing_cleanup(node, stonith) return True - if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): + if self.stat_cm(node) and self.cluster_stable(self.env["dead_time"]): self.fencing_cleanup(node, stonith) return True self._logger.log(f"Warn: Start failed for node {node}") return False def start_cm_async(self, node, verbose=False): """Start up the cluster manager on a given node without blocking.""" log_fn = self._logger.log if verbose else self.debug log_fn(f"Starting {self.name} on node {node}") self._install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=False) self.expected_status[node] = "up" def stop_cm(self, node, verbose=False, force=False): """Stop the cluster manager on a given node.""" log_fn = self._logger.log if verbose else self.debug log_fn(f"Stopping {self.name} on node {node}") if self.expected_status[node] != "up" and not force: return True (rc, _) = self.rsh(node, self.templates["StopCmd"]) if rc == 0: # Make sure we can continue even if corosync leaks self.expected_status[node] = "down" - self.cluster_stable(self.env["DeadTime"]) + self.cluster_stable(self.env["dead_time"]) return True self._logger.log(f"ERROR: Could not stop {self.name} on node {node}") return False def stop_cm_async(self, node): """Stop the cluster manager on a given node without blocking.""" self.debug(f"Stopping {self.name} on node {node}") self.rsh(node, self.templates["StopCmd"], synchronous=False) self.expected_status[node] = "down" def startall(self, nodelist=None, verbose=False, quick=False): """Start the cluster manager on every node in the cluster, or on every node in nodelist.""" if not nodelist: nodelist = self.env["nodes"] for node in nodelist: if self.expected_status[node] == "down": self.ns.wait_for_all_nodes(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... return self.start_cm(nodelist[0], verbose=verbose) # Approximation of SimulStartList for --boot watchpats = [ self.templates["Pat:DC_IDLE"], ] for node in nodelist: watchpats.extend([ self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node, self.templates["Pat:Local_started"] % node, self.templates["Pat:They_up"] % (nodelist[0], node), ]) # Start all the nodes - at about the same time... watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"], self.env["log_kind"], "fast-start", - self.env["DeadTime"] + 10) + self.env["dead_time"] + 10) watch.set_watch() if not self.start_cm(nodelist[0], verbose=verbose): return False for node in nodelist: self.start_cm_async(node, verbose=verbose) watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self._logger.log(f"Warn: Startup pattern not found: {regex}") if not self.cluster_stable(): self._logger.log("Cluster did not stabilize") return False return True def stopall(self, nodelist=None, verbose=False, force=False): """Stop the cluster manager on every node in the cluster, or on every node in nodelist.""" ret = True if not nodelist: nodelist = self.env["nodes"] for node in self.env["nodes"]: if self.expected_status[node] == "up" or force: if not self.stop_cm(node, verbose=verbose, force=force): ret = False return ret def statall(self, nodelist=None): """Return the status of the cluster manager on every node in the cluster, or on every node in nodelist.""" result = {} if not nodelist: nodelist = self.env["nodes"] for node in nodelist: if self.stat_cm(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): """Break communication between the target node and all other nodes in the cluster, or nodes.""" if not nodes: nodes = self.env["nodes"] for node in nodes: if node == target: continue (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node) if rc != 0: self._logger.log(f"Could not break the communication between {target} and {node}: {rc}") return False self.debug(f"Communication cut between {target} and {node}") return True def unisolate_node(self, target, nodes=None): """Re-establish communication between the target node and all other nodes in the cluster, or nodes.""" if not nodes: nodes = self.env["nodes"] for node in nodes: if node == target: continue # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False) self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False) self.debug(f"Communication restored between {target} and {node}") def prepare(self): """ Finish initialization. Clear out the expected status and record the current status of every node in the cluster. """ self.partitions_expected = 1 for node in self.env["nodes"]: self.expected_status[node] = "" # This used to be conditional on whether SplitBrainTest was # allowed to run. SplitBrainTest is supposed to unisolate # the nodes, so this shouldn't be necessary here. However, # SplitBrainTest was flagged as "experimental" from 2009 to # 2025 and wasn't allowed to run by default. So uncomment # this if problems emerge. # # @COMPAT Delete this comment if no problems emerge after a # while. # # self.unisolate_node(node) self.stat_cm(node) def test_node_cm(self, node): """ Check the status of a given node. Returns 0 if the node is down, 1 if the node is up but unstable, and 2 if the node is up and stable. """ watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)", self.templates["Pat:NonDC_started"] % node, self.templates["Pat:DC_started"] % node, ] idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node], self.env["log_kind"], "ClusterIdle") idle_watch.set_watch() (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if not out: out = "" else: out = out[0].strip() self.debug(f"Node {node} status: '{out}'") if out.find('ok') < 0: if self.expected_status[node] == "up": self.log(f"Node status for {node} is down but we think it should be {self.expected_status[node]}") self.expected_status[node] = "down" return 0 if self.expected_status[node] == "down": self.log(f"Node status for {node} is up but we think it should be {self.expected_status[node]}: {out}") self.expected_status[node] = "up" # check the output first - because syslog-ng loses messages if out.find('S_NOT_DC') != -1: # Up and stable return 2 if out.find('S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug(f"Warn: Node {node} is unstable: {out}") return 1 # Up and stable return 2 def stat_cm(self, node): """Report the status of the cluster manager on a given node.""" return self.test_node_cm(node) > 0 # Being up and being stable is not the same question... def node_stable(self, node): """Return whether or not the given node is stable.""" if self.test_node_cm(node) == 2: return True self.log(f"Warn: Node {node} not stable") return False def _partition_stable(self, nodes, timeout=None): """Return whether or not all nodes in the given partition are stable.""" watchpats = [ "Current ping state: S_IDLE", self.templates["Pat:DC_IDLE"], ] self.debug("Waiting for cluster stability...") if timeout is None: - timeout = self.env["DeadTime"] + timeout = self.env["dead_time"] if len(nodes) < 3: self.debug("Cluster is inactive") return True idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(), self.env["log_kind"], "ClusterStable", timeout) idle_watch.set_watch() for node in nodes.split(): # have each node dump its current state self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) ret = idle_watch.look() while ret: self.debug(ret) for node in nodes.split(): if re.search(node, ret): return True ret = idle_watch.look() self.debug(f"Warn: Partition {nodes!r} not IDLE after {timeout}s") return False def cluster_stable(self, timeout=None, double_check=False): """Return whether or not all nodes in the cluster are stable.""" partitions = self.find_partitions() for partition in partitions: if not self._partition_stable(partition, timeout): return False if not double_check: return True # Make sure we are really stable and that all resources, # including those that depend on transient node attributes, # are started if they were going to be time.sleep(5) for partition in partitions: if not self._partition_stable(partition, timeout): return False return True def is_node_dc(self, node, status_line=None): """ Return whether or not the given node is the cluster DC. Check the given status_line, or query the cluster if None. """ if not status_line: (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if out: status_line = out[0].strip() if not status_line: return False if status_line.find('S_IDLE') != -1: return True if status_line.find('S_INTEGRATION') != -1: return True if status_line.find('S_FINALIZE_JOIN') != -1: return True if status_line.find('S_POLICY_ENGINE') != -1: return True if status_line.find('S_TRANSITION_ENGINE') != -1: return True return False def active_resources(self, node): """Return a list of primitive resources active on the given node.""" (_, output) = self.rsh(node, "crm_resource -c", verbose=1) resources = [] for line in output: if not re.search("^Resource", line): continue tmp = AuditResource(self, line) if tmp.type == "primitive" and tmp.host == node: resources.append(tmp.id) return resources def resource_location(self, rid): """Return a list of nodes on which the given resource is running.""" resource_nodes = [] for node in self.env["nodes"]: if self.expected_status[node] != "up": continue cmd = self.templates["RscRunning"] % rid (rc, lines) = self.rsh(node, cmd) if rc == 127: self.log(f"Command '{cmd}' failed. Binary or pacemaker-cts package not installed?") for line in lines: self.log(f"Output: {line} ") elif rc == 0: resource_nodes.append(node) return resource_nodes def find_partitions(self): """ Return a list of all partitions in the cluster. Each element of the list is itself a list of all active nodes in that partition. """ ccm_partitions = [] for node in self.env["nodes"]: if self.expected_status[node] != "up": self.debug(f"Node {node} is down... skipping") continue (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) if not out: self.log(f"no partition details for {node}") continue partition = out[0].strip() if len(partition) <= 2: self.log(f"bad partition details for {node}") continue nodes = partition.split() nodes.sort() partition = ' '.join(nodes) found = 0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug(f"Adding partition from {node}: {partition}") ccm_partitions.append(partition) else: self.debug(f"Partition '{partition}' from {node} is consistent with existing entries") self.debug(f"Found partitions: {ccm_partitions!r}") return ccm_partitions def has_quorum(self, node_list): """Return whether or not the cluster has quorum.""" # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.env["nodes"] for node in node_list: if self.expected_status[node] != "up": continue (rc, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) if rc != ExitStatus.OK: self.debug(f"WARN: Quorum check on {node} returned error ({rc})") continue quorum = quorum[0].strip() if quorum.find("1") != -1: return True if quorum.find("0") != -1: return False self.debug(f"WARN: Unexpected quorum test result from {node}:{quorum}") return False @property def components(self): """ Return a list of all patterns that should be ignored for the cluster's components. This must be provided by all subclasses. """ raise NotImplementedError def in_standby_mode(self, node): """Return whether or not the node is in Standby.""" (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) if not out: return False out = out[0].strip() self.debug(f"Standby result: {out}") return out == "on" def set_standby_mode(self, node, status): """ Set node to Standby if status is True, or Active if status is False. Return whether the node is now in the requested status. """ current_status = self.in_standby_mode(node) if current_status == status: return True if status: cmd = self.templates["StandbyCmd"] % (node, "on") else: cmd = self.templates["StandbyCmd"] % (node, "off") (rc, _) = self.rsh(node, cmd) return rc == 0 def add_dummy_rsc(self, node, rid): """Add a dummy resource with the given ID to the given node.""" rsc_xml = f""" ' '""" constraint_xml = f""" ' '""" self.rsh(node, self.templates['CibAddXml'] % rsc_xml) self.rsh(node, self.templates['CibAddXml'] % constraint_xml) def remove_dummy_rsc(self, node, rid): """Remove the previously added dummy resource given by rid on the given node.""" constraint = f"\"//rsc_location[@rsc='{rid}']\"" rsc = f"\"//primitive[@id='{rid}']\"" self.rsh(node, self.templates['CibDelXpath'] % constraint) self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index e4137ddd7b..c14020547c 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,446 +1,446 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory", "set_cts_path"] __copyright__ = "Copyright 2014-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse from contextlib import suppress from glob import glob import os import random import shlex import socket import sys from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). # @TODO See if type annotations fix this. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. - self["DeadTime"] = 300 + self["dead_time"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["CIBResource"] = False self["log_kind"] = None self["scenario"] = "random" self["syslog_facility"] = "daemon" # Hard-coded since there is only one supported cluster manager/stack self["Name"] = "crm-corosync" self["Stack"] = "corosync 2+" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def dump(self): """Print the current environment.""" for key in sorted(self.data.keys()): self._logger.debug(f"{f'Environment[{key}]':35}: {str(self[key])}") def __contains__(self, key): """Return True if the given key exists in the environment.""" return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "nodes": self.data["nodes"] = [] for node in value: node = node.strip() # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: # @TODO This only handles IPv4, use getaddrinfo() instead # (here and in _discover()) socket.gethostbyname_ex(node) self.data["nodes"].append(node) except socket.herror: self._logger.log(f"{node} not found in DNS... aborting") raise else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def _detect_systemd(self, node): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(node, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self, node): """Detect the syslog variant in use on the target node (if any).""" if "syslogd" in self.data: return if self["have_systemd"]: # Systemd (_, lines) = self._rsh(node, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) else: # SYS-V (_, lines) = self._rsh(node, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) with suppress(IndexError): self["syslogd"] = lines[0].strip() def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl disable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} off") return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl enable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} on") return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, f"systemctl is-enabled {service} | grep enabled") return rc == 0 # SYS-V (rc, _) = self._rsh(node, f"chkconfig --list | grep -e {service}.*on") return rc == 0 def _detect_at_boot(self, node): """Detect if the cluster starts at boot.""" self["at-boot"] = any(self.service_is_enabled(node, service) for service in ("pacemaker", "corosync")) def _detect_ip_offset(self, node): """Detect the offset for IPaddr resources.""" if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(node, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(node, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") return last_part = self["IPBase"].split('.')[3] if int(last_part) >= 240: self._logger.log(f"Could not determine an offset for IPaddr resources. Upper bound is too high: {self['IPBase']} {last_part}") self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser node = self["nodes"][0] self._detect_systemd(node) self._detect_syslog(node) self._detect_at_boot(node) self._detect_ip_offset(node) def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser() grp1 = parser.add_argument_group("Common options") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", default="", metavar="NODES", help="List of cluster nodes separated by whitespace") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named tests, separated by whitespace") grp3.add_argument("--disable-fencing", action="store_false", dest="fencing_enabled", help="Whether to disable fencing") grp3.add_argument("--fencing-agent", metavar="AGENT", default="external/ssh", help="Agent to use for a fencing resource") grp3.add_argument("--fencing-params", metavar="PARAMS", help="Parameters for the fencing resource (as NAME=VALUE), separated by whitespace") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--schema", metavar="SCHEMA", default=f"pacemaker-{BuildOptions.CIB_SCHEMA_VERSION}", help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. Most get a default from the add_argument # calls, they only do one thing, and they do not have any side effects. self["CIBfilename"] = args.cib_filename if args.cib_filename else None self["ClobberCIB"] = args.clobber_cib self["fencing_agent"] = args.fencing_agent self["fencing_enabled"] = args.fencing_enabled self["fencing_params"] = shlex.split(args.fencing_params) self["ListTests"] = args.list_tests self["Schema"] = args.schema self["TruncateLog"] = args.truncate self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["iterations"] = args.iterations self["nodes"] = shlex.split(args.nodes) self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["unsafe-tests"] = not args.no_unsafe_tests # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.choose: self["scenario"] = "sequence" self["tests"].extend(shlex.split(args.choose)) self["iterations"] = len(self["tests"]) if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile == "journal": self["LogAuditDisabled"] = True self["log_kind"] = LogKind.JOURNAL elif args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["log_kind"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True self.random_gen.seed(args.seed) class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance def set_cts_path(extra=None): """Set the PATH environment variable appropriately for the tests.""" new_path = os.environ['PATH'] # Add any search paths given on the command line if extra is not None: for p in extra: new_path = f"{p}:{new_path}" cwd = os.getcwd() if os.path.exists(f"{cwd}/cts/cts-attrd.in"): # pylint: disable=protected-access print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR}") for d in glob(f"{BuildOptions._BUILD_DIR}/daemons/*/"): new_path = f"{d}:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}" print(f"Using local schemas from: {cwd}/xml") os.environ["PCMK_schema_directory"] = f"{cwd}/xml" else: print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {cwd})") new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}" os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR print(f'Using PATH="{new_path}"') os.environ['PATH'] = new_path diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py index c39782ab45..2bceac8452 100644 --- a/python/pacemaker/_cts/tests/componentfail.py +++ b/python/pacemaker/_cts/tests/componentfail.py @@ -1,161 +1,161 @@ """Kill a pacemaker daemon and test how the cluster recovers.""" __all__ = ["ComponentFail"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re from pacemaker._cts.audits import AuditResource from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object # @TODO Separate this into a separate test for each component, so the patterns # can be made specific to each component, investigating failures is a little # easier, and specific testing can be done for each component (for example, # set attributes before and after killing pacemaker-attrd and check values). class ComponentFail(CTSTest): """Kill a random pacemaker daemon and wait for the cluster to recover.""" def __init__(self, cm): """ Create a new ComponentFail instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.is_unsafe = True self.name = "ComponentFail" self._complist = cm.components self._okerrpatterns = [] self._patterns = [] self._startall = SimulStartLite(cm) def __call__(self, node): """Perform this test.""" self.incr("calls") self._patterns = [] self._okerrpatterns = [] # start all nodes ret = self._startall(None) if not ret: return self.failure("Setup failed") if not self._cm.cluster_stable(self._env["StableTime"]): return self.failure("Setup failed - unstable") # select a component to kill chosen = self._env.random_gen.choice(self._complist) node_is_dc = self._cm.is_node_dc(node, None) self.debug(f"...component {chosen.name} (dc={node_is_dc})") self.incr(chosen.name) if chosen.name != "corosync": self._patterns.extend([ self._cm.templates["Pat:ChildKilled"] % (node, chosen.name), self._cm.templates["Pat:ChildRespawn"] % (node, chosen.name), ]) self._patterns.extend(chosen.pats) # @TODO this should be a flag in the Component if chosen.name in ["corosync", "pacemaker-based", "pacemaker-fenced"]: # Ignore actions for fence devices if fencer will respawn # (their registration will be lost, and probes will fail) self._okerrpatterns = [ self._cm.templates["Pat:Fencing_active"], ] (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self._cm, line) if r.rclass == "stonith": self._okerrpatterns.extend([ self._cm.templates["Pat:Fencing_recover"] % r.id, self._cm.templates["Pat:Fencing_probe"] % r.id, ]) # supply a copy so self.patterns doesn't end up empty tmp_pats = self._patterns.copy() self._patterns.extend(chosen.badnews_ignore) # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status stonith_pats = [ self._cm.templates["Pat:Fencing_ok"] % node ] stonith = self.create_watch(stonith_pats, 0) stonith.set_watch() # set the watch for stable watch = self.create_watch( - tmp_pats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"]) + tmp_pats, self._env["dead_time"] + self._env["StableTime"] + self._env["StartTime"]) watch.set_watch() # kill the component chosen.kill(node) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for any fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") self._cm.cluster_stable(self._env["StartTime"]) self.debug(f"Checking if {node} was shot") shot = stonith.look(60) if shot: self.debug(f"Found: {shot!r}") self._okerrpatterns.append(self._cm.templates["Pat:Fencing_start"] % node) if not self._env["at-boot"]: self._cm.expected_status[node] = "down" # If fencing occurred, chances are many (if not all) the expected logs # will not be sent - or will be lost when the node reboots return self.success() # check for logs indicating a graceful recovery matched = watch.look_for_all(allow_multiple_matches=True) if watch.unmatched: self._logger.log(f"Patterns not found: {watch.unmatched!r}") self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["StartTime"]) if not matched: return self.failure(f"Didn't find all expected {chosen.name} patterns") if not is_stable: return self.failure(f"Cluster did not become stable after killing {chosen.name}") return self.success() @property def errors_to_ignore(self): """Return a list of errors which should be ignored.""" # Note that okerrpatterns refers to the last time we ran this test # The good news is that this works fine for us... self._okerrpatterns.extend(self._patterns) return self._okerrpatterns diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py index 74d85230cf..f2ede2a5db 100644 --- a/python/pacemaker/_cts/tests/nearquorumpointtest.py +++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py @@ -1,121 +1,121 @@ """Randomly start and stop nodes to bring the cluster close to the quorum point.""" __all__ = ["NearQuorumPointTest"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class NearQuorumPointTest(CTSTest): """Randomly start and stop nodes to bring the cluster close to the quorum point.""" def __init__(self, cm): """ Create a new NearQuorumPointTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "NearQuorumPoint" def __call__(self, dummy): """Perform this test.""" self.incr("calls") startset = [] stopset = [] stonith = self._cm.prepare_fencing_watcher() # decide what to do with each node for node in self._env["nodes"]: action = self._env.random_gen.choice(["start", "stop"]) if action == "start": startset.append(node) elif action == "stop": stopset.append(node) self.debug(f"start nodes:{startset!r}") self.debug(f"stop nodes:{stopset!r}") # add search patterns watchpats = [] for node in stopset: if self._cm.expected_status[node] == "up": watchpats.append(self._cm.templates["Pat:We_stopped"] % node) for node in startset: if self._cm.expected_status[node] == "down": watchpats.append(self._cm.templates["Pat:Local_started"] % node) else: for stopping in stopset: if self._cm.expected_status[stopping] == "up": watchpats.append(self._cm.templates["Pat:They_stopped"] % (node, stopping)) if not watchpats: return self.skipped() if startset: watchpats.append(self._cm.templates["Pat:DC_IDLE"]) - watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) + watch = self.create_watch(watchpats, self._env["dead_time"] + 10) watch.set_watch() # begin actions for node in stopset: if self._cm.expected_status[node] == "up": self._cm.stop_cm_async(node) for node in startset: if self._cm.expected_status[node] == "down": self._cm.start_cm_async(node) # get the result if watch.look_for_all(): self._cm.cluster_stable() self._cm.fencing_cleanup("NearQuorumPoint", stonith) return self.success() self._logger.log(f"Warn: Patterns not found: {watch.unmatched!r}") # get the "bad" nodes upnodes = [] for node in stopset: if self._cm.stat_cm(node): upnodes.append(node) downnodes = [] for node in startset: if not self._cm.stat_cm(node): downnodes.append(node) self._cm.fencing_cleanup("NearQuorumPoint", stonith) if not upnodes and not downnodes: self._cm.cluster_stable() # Make sure they're completely down with no residule for node in stopset: self._rsh(node, self._cm.templates["StopCmd"]) return self.success() if upnodes: self._logger.log(f"Warn: Unstoppable nodes: {upnodes!r}") if downnodes: self._logger.log(f"Warn: Unstartable nodes: {downnodes!r}") return self.failure() diff --git a/python/pacemaker/_cts/tests/partialstart.py b/python/pacemaker/_cts/tests/partialstart.py index 9455a7b486..8ef649ab02 100644 --- a/python/pacemaker/_cts/tests/partialstart.py +++ b/python/pacemaker/_cts/tests/partialstart.py @@ -1,72 +1,72 @@ """Start a node and then tell it to stop before it is fully running.""" __all__ = ["PartialStart"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.simulstoplite import SimulStopLite from pacemaker._cts.tests.stoptest import StopTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class PartialStart(CTSTest): """Interrupt a node before it's finished starting up.""" def __init__(self, cm): """ Create a new PartialStart instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "PartialStart" self._startall = SimulStartLite(cm) self._stop = StopTest(cm) self._stopall = SimulStopLite(cm) def __call__(self, node): """Perform this test.""" self.incr("calls") ret = self._stopall(None) if not ret: return self.failure("Setup failed") watchpats = [ "pacemaker-controld.*Connecting to .* cluster layer" ] - watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) + watch = self.create_watch(watchpats, self._env["dead_time"] + 10) watch.set_watch() self._cm.start_cm_async(node) ret = watch.look_for_all() if not ret: self._logger.log(f"Patterns not found: {watch.unmatched!r}") return self.failure(f"Setup of {node} failed") ret = self._stop(node) if not ret: return self.failure(f"{node} did not stop in time") return self.success() @property def errors_to_ignore(self): """Return a list of errors which should be ignored.""" # We might do some fencing in the 2-node case if we make it up far enough return [ r"Executing reboot fencing operation", r"Requesting fencing \([^)]+\) targeting node " ] diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py index 3c14050601..fb9bdef26e 100644 --- a/python/pacemaker/_cts/tests/simulstartlite.py +++ b/python/pacemaker/_cts/tests/simulstartlite.py @@ -1,128 +1,128 @@ """Simultaneously start stopped nodes.""" __all__ = ["SimulStartLite"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStartLite(CTSTest): """ A pseudo-test that sets up conditions before running some other test. This class starts any stopped nodes more or less simultaneously. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStartLite instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "SimulStartLite" def __call__(self, dummy): """Return whether starting all stopped nodes more or less simultaneously succeeds.""" self.incr("calls") self.debug(f"Setup: {self.name}") # We ignore the "node" parameter... node_list = [] for node in self._env["nodes"]: if self._cm.expected_status[node] == "down": self.incr("WasStopped") node_list.append(node) self.set_timer() while len(node_list) > 0: # Repeat until all nodes come up uppat = self._cm.templates["Pat:NonDC_started"] if self._cm.upcount() == 0: uppat = self._cm.templates["Pat:Local_started"] watchpats = [ self._cm.templates["Pat:DC_IDLE"] ] for node in node_list: watchpats.extend([uppat % node, self._cm.templates["Pat:InfraUp"] % node, self._cm.templates["Pat:PacemakerUp"] % node]) # Start all the nodes - at about the same time... - watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) + watch = self.create_watch(watchpats, self._env["dead_time"] + 10) watch.set_watch() stonith = self._cm.prepare_fencing_watcher() for node in node_list: self._cm.start_cm_async(node) watch.look_for_all() node_list = self._cm.fencing_cleanup(self.name, stonith) if node_list is None: return self.failure("Cluster did not stabilize") # Remove node_list messages from watch.unmatched for node in node_list: self._logger.debug(f"Dealing with stonith operations for {node_list}") if watch.unmatched: try: watch.unmatched.remove(uppat % node) except ValueError: self.debug(f"Already matched: {uppat % node}") try: watch.unmatched.remove(self._cm.templates["Pat:InfraUp"] % node) except ValueError: self.debug(f"Already matched: {self._cm.templates['Pat:InfraUp'] % node}") try: watch.unmatched.remove(self._cm.templates["Pat:PacemakerUp"] % node) except ValueError: self.debug(f"Already matched: {self._cm.templates['Pat:PacemakerUp'] % node}") if watch.unmatched: for regex in watch.unmatched: self._logger.log(f"Warn: Startup pattern not found: {regex}") if not self._cm.cluster_stable(): return self.failure("Cluster did not stabilize") did_fail = False unstable = [] for node in self._env["nodes"]: if not self._cm.stat_cm(node): did_fail = True unstable.append(node) if did_fail: return self.failure(f"Unstarted nodes exist: {unstable}") unstable = [] for node in self._env["nodes"]: if not self._cm.node_stable(node): did_fail = True unstable.append(node) if did_fail: return self.failure(f"Unstable cluster nodes exist: {unstable}") return self.success() def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" return False diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py index b3a2f87ffe..232fc58203 100644 --- a/python/pacemaker/_cts/tests/simulstoplite.py +++ b/python/pacemaker/_cts/tests/simulstoplite.py @@ -1,86 +1,86 @@ """Simultaneously stop running nodes.""" __all__ = ["SimulStopLite"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStopLite(CTSTest): """ A pseudo-test that sets up conditions before running some other test. This class stops any running nodes more or less simultaneously. It can be used both to set up a test or to clean up a test. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStopLite instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "SimulStopLite" def __call__(self, dummy): """Return whether stopping all running nodes more or less simultaneously succeeds.""" self.incr("calls") self.debug(f"Setup: {self.name}") # We ignore the "node" parameter... watchpats = [] for node in self._env["nodes"]: if self._cm.expected_status[node] == "up": self.incr("WasStarted") watchpats.append(self._cm.templates["Pat:We_stopped"] % node) if len(watchpats) == 0: return self.success() # Stop all the nodes - at about the same time... - watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) + watch = self.create_watch(watchpats, self._env["dead_time"] + 10) watch.set_watch() self.set_timer() for node in self._env["nodes"]: if self._cm.expected_status[node] == "up": self._cm.stop_cm_async(node) if watch.look_for_all(): # Make sure they're completely down with no residule for node in self._env["nodes"]: self._rsh(node, self._cm.templates["StopCmd"]) return self.success() did_fail = False up_nodes = [] for node in self._env["nodes"]: if self._cm.stat_cm(node): did_fail = True up_nodes.append(node) if did_fail: return self.failure(f"Active nodes exist: {up_nodes}") self._logger.log(f"Warn: All nodes stopped but CTS didn't detect: {watch.unmatched}") return self.failure(f"Missing log message: {watch.unmatched}") def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" return False diff --git a/python/pacemaker/_cts/tests/standbytest.py b/python/pacemaker/_cts/tests/standbytest.py index 7992dd0bc4..e70e5febcc 100644 --- a/python/pacemaker/_cts/tests/standbytest.py +++ b/python/pacemaker/_cts/tests/standbytest.py @@ -1,106 +1,106 @@ """Put a node into standby mode and check that resources migrate.""" __all__ = ["StandbyTest"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.starttest import StartTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StandbyTest(CTSTest): """Put a node into standby and check that resources migrate away from it.""" def __init__(self, cm): """ Create a new StandbyTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Standby" self._start = StartTest(cm) self._startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resources, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): """Perform this test.""" self.incr("calls") ret = self._startall(None) if not ret: return self.failure("Start all nodes failed") self.debug(f"Make sure node {node} is active") if self._cm.in_standby_mode(node): if not self._cm.set_standby_mode(node, False): return self.failure(f"can't set node {node} to active mode") self._cm.cluster_stable() if self._cm.in_standby_mode(node): return self.failure(f"standby status of {node} is [on] but we expect [off]") watchpats = [ r"State transition .* -> S_POLICY_ENGINE", ] - watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) + watch = self.create_watch(watchpats, self._env["dead_time"] + 10) watch.set_watch() self.debug(f"Setting node {node} to standby mode") if not self._cm.set_standby_mode(node, True): return self.failure(f"can't set node {node} to standby mode") self.set_timer("on") ret = watch.look_for_all() if not ret: self._logger.log(f"Patterns not found: {watch.unmatched!r}") self._cm.set_standby_mode(node, False) return self.failure(f"cluster didn't react to standby change on {node}") self._cm.cluster_stable() if not self._cm.in_standby_mode(node): return self.failure(f"standby status of {node} is [off] but we expect [on]") self.log_timer("on") self.debug("Checking resources") rscs_on_node = self._cm.active_resources(node) if rscs_on_node: rc = self.failure(f"{node} set to standby, {rscs_on_node!r} is still running on it") self.debug(f"Setting node {node} to active mode") self._cm.set_standby_mode(node, False) return rc self.debug(f"Setting node {node} to active mode") if not self._cm.set_standby_mode(node, False): return self.failure(f"can't set node {node} to active mode") self.set_timer("off") self._cm.cluster_stable() if self._cm.in_standby_mode(node): return self.failure(f"standby status of {node} is [on] but we expect [off]") self.log_timer("off") return self.success() diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py index 4ab0d73fc8..b294349df5 100644 --- a/python/pacemaker/_cts/tests/stonithdtest.py +++ b/python/pacemaker/_cts/tests/stonithdtest.py @@ -1,133 +1,134 @@ """Fence a running node and wait for it to restart.""" __all__ = ["StonithdTest"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker.exitstatus import ExitStatus from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StonithdTest(CTSTest): """Fence a running node and wait for it to restart.""" def __init__(self, cm): """ Create a new StonithdTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Stonithd" self._startall = SimulStartLite(cm) def __call__(self, node): """Perform this test.""" self.incr("calls") if len(self._env["nodes"]) < 2: return self.skipped() ret = self._startall(None) if not ret: return self.failure("Setup failed") watchpats = [ self._cm.templates["Pat:Fencing_ok"] % node, self._cm.templates["Pat:NodeFenced"] % node, ] if not self._env["at-boot"]: self.debug(f"Expecting {node} to stay down") self._cm.expected_status[node] = "down" else: self.debug(f"Expecting {node} to come up again {self._env['at-boot']}") watchpats.extend([ f"{node}.* S_STARTING -> S_PENDING", f"{node}.* S_PENDING -> S_NOT_DC", ]) - watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"]) + watch = self.create_watch(watchpats, + 30 + self._env["dead_time"] + self._env["StableTime"] + self._env["StartTime"]) watch.set_watch() origin = self._env.random_gen.choice(self._env["nodes"]) (rc, _) = self._rsh(origin, f"stonith_admin --reboot {node} -VVVVVV") if rc == ExitStatus.TIMEOUT: # Look for the patterns, usually this means the required # device was running on the node to be fenced - or that # the required devices were in the process of being loaded # and/or moved # # Effectively the node committed suicide so there will be # no confirmation, but pacemaker should be watching and # fence the node again self._logger.log(f"Fencing command on {origin} to fence {node} timed out") elif origin != node and rc != 0: self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self._logger.log(f"Fencing command on {origin} failed to fence {node} (rc={rc})") elif origin == node and rc != 255: # 255 == broken pipe, ie. the node was fenced as expected self._logger.log(f"Locally originated fencing returned {rc}") with Timer(self._logger, self.name, "fence"): matched = watch.look_for_all() self.set_timer("reform") if watch.unmatched: self._logger.log(f"Patterns not found: {watch.unmatched!r}") self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["StartTime"]) if not matched: return self.failure("Didn't find all expected patterns") if not is_stable: return self.failure("Cluster did not become stable") self.log_timer("reform") return self.success() @property def errors_to_ignore(self): """Return a list of errors which should be ignored.""" return [ self._cm.templates["Pat:Fencing_start"] % ".*", self._cm.templates["Pat:Fencing_ok"] % ".*", self._cm.templates["Pat:Fencing_active"], r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ] def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" return self._env["fencing_enabled"] and CTSTest.is_applicable(self) diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py index ddac1cc2b3..0588f4897b 100644 --- a/python/pacemaker/_cts/tests/stoptest.py +++ b/python/pacemaker/_cts/tests/stoptest.py @@ -1,97 +1,97 @@ """Stop the cluster manager on a given node.""" __all__ = ["StopTest"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StopTest(CTSTest): """ A pseudo-test that sets up conditions before running some other test. This class stops the cluster manager on a given node. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new StopTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Stop" def __call__(self, node): """Stop the given node, returning whether this succeeded or not.""" self.incr("calls") if self._cm.expected_status[node] != "up": return self.skipped() # Technically we should always be able to notice ourselves stopping patterns = [ self._cm.templates["Pat:We_stopped"] % node, ] # Any active node needs to notice this one left # (note that this won't work if we have multiple partitions) for other in self._env["nodes"]: if self._cm.expected_status[other] == "up" and other != node: patterns.append(self._cm.templates["Pat:They_stopped"] % (other, node)) - watch = self.create_watch(patterns, self._env["DeadTime"]) + watch = self.create_watch(patterns, self._env["dead_time"]) watch.set_watch() if node == self._cm.our_node: self.incr("us") else: if self._cm.upcount() <= 1: self.incr("all") else: self.incr("them") self._cm.stop_cm(node) watch.look_for_all() failreason = None unmatched_str = "||" if watch.unmatched: (_, output) = self._rsh(node, "/bin/ps axf", verbose=1) for line in output: self.debug(line) (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) for line in output: self.debug(line) for regex in watch.unmatched: self._logger.log(f"ERROR: Shutdown pattern not found: {regex}") unmatched_str += f"{regex}||" failreason = "Missing shutdown pattern" - self._cm.cluster_stable(self._env["DeadTime"]) + self._cm.cluster_stable(self._env["dead_time"]) if not watch.unmatched or self._cm.upcount() == 0: return self.success() if len(watch.unmatched) >= self._cm.upcount(): return self.failure(f"no match against ({unmatched_str})") if failreason is None: return self.success() return self.failure(failreason)