diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py index 26187db71e..c5521aaded 100644 --- a/python/pacemaker/_cts/audits.py +++ b/python/pacemaker/_cts/audits.py @@ -1,1029 +1,1029 @@ """ Auditing classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time import uuid from pacemaker.buildoptions import BuildOptions from pacemaker._cts.input import should_continue from pacemaker._cts.watcher import LogKind, LogWatcher class ClusterAudit: """ The base class for various kinds of auditors. Specific audit implementations should be built on top of this one. Audits can do all kinds of checks on the system. The basic interface for callers is the `__call__` method, which returns True if the audit passes and False if it fails. """ def __init__(self, cm): """ Create a new ClusterAudit instance Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.name = None def __call__(self): raise NotImplementedError def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. This method must be implemented by all subclasses. """ raise NotImplementedError def log(self, args): """ Log a message """ self._cm.log("audit: %s" % args) def debug(self, args): """ Log a debug message """ self._cm.debug("audit: %s" % args) class LogAudit(ClusterAudit): """ Audit each cluster node to verify that some logging system is usable. This is done by logging a unique test message and then verifying that we can read back that test message using logging tools. """ def __init__(self, cm): """ Create a new LogAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "LogAudit" def _restart_cluster_logging(self, nodes=None): """ Restart logging on the given nodes, or all if none are given """ if not nodes: nodes = self._cm.Env["nodes"] self._cm.debug("Restarting logging on: %r" % nodes) for node in nodes: if self._cm.Env["have_systemd"]: (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket") if rc != 0: self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service") if rc != 0: self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"]) if rc != 0: self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node)) def _create_watcher(self, patterns, kind): """ Create a new LogWatcher instance for the given patterns """ watch = LogWatcher(self._cm.Env["LogFileName"], patterns, self._cm.Env["nodes"], kind, "LogAudit", 5, silent=True) watch.set_watch() return watch def _test_logging(self): """ Perform the log audit """ patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self._cm.Env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) watch_pref = self._cm.Env["LogWatcher"] if watch_pref == LogKind.ANY: kinds = [ LogKind.FILE ] if self._cm.Env["have_systemd"]: kinds += [ LogKind.JOURNAL ] kinds += [ LogKind.REMOTE_FILE ] for k in kinds: watch[k] = self._create_watcher(patterns, k) self._cm.log("Logging test message with identifier %s" % suffix) else: watch[watch_pref] = self._create_watcher(patterns, watch_pref) for node in self._cm.Env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix) (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0) if rc != 0: self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in list(watch.keys()): w = watch[k] if watch_pref == LogKind.ANY: self._cm.log("Checking for test message in %s logs" % k) w.look_for_all(silent=True) if w.unmatched: for regex in w.unmatched: self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind)) else: if watch_pref == LogKind.ANY: self._cm.log("Found test message in %s logs" % k) self._cm.Env["LogWatcher"] = k return 1 return False def __call__(self): max_attempts = 3 attempt = 0 self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) while attempt <= max_attempts and not self._test_logging(): attempt += 1 self._restart_cluster_logging() time.sleep(60*attempt) if attempt > max_attempts: self._cm.log("ERROR: Cluster logging unrecoverable.") return False return True def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ if self._cm.Env["LogAuditDisabled"]: return False return True class DiskAudit(ClusterAudit): """ Audit disk usage on cluster nodes to verify that there is enough free space left on whichever mounted file system holds the logs. Warn on: less than 100 MB or 10% of free space Error on: less than 10 MB or 5% of free space """ def __init__(self, cm): """ Create a new DiskAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "DiskspaceAudit" def __call__(self): result = True # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) for node in self._cm.Env["nodes"]: (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1) if not dfout: self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) continue dfout = dfout[0].strip() try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = False if not should_continue(self._cm.Env): raise ValueError("Disk full on %s" % node) elif remaining_mb < 100 or used_percent > 90: self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ return True class FileAudit(ClusterAudit): """ Audit the filesystem looking for various failure conditions: * The presence of core dumps from corosync or Pacemaker daemons * Stale IPC files """ def __init__(self, cm): """ Create a new FileAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.known = [] self.name = "FileAudit" def __call__(self): result = True self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) for node in self._cm.Env["nodes"]: (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Corosync core file on %s: %s" % (node, line)) - if self._cm.ShouldBeStatus.get(node) == "down": + if self._cm.expected_status.get(node) == "down": clean = False (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) for line in lsout: result = False clean = True self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) for line in lsout: self._cm.debug("ps[%s]: %s" % (node, line)) self._cm.rsh(node, "rm -rf /dev/shm/qb-*") else: self._cm.debug("Skipping %s" % node) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ return True class AuditResource: """ A base class for storing information about a cluster resource """ def __init__(self, cm, line): """ Create a new AuditResource instance Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single resource """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None @property def unique(self): """ Is this resource unique? """ return self.flags & 0x20 @property def orphan(self): """ Is this resource an orphan? """ return self.flags & 0x01 @property def managed(self): """ Is this resource managed by the cluster? """ return self.flags & 0x02 class AuditConstraint: """ A base class for storing information about a cluster constraint """ def __init__(self, cm, line): """ Create a new AuditConstraint instance Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single constraint """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): """ Audit primitive resources to verify a variety of conditions, including that they are active and managed only when expected; they are active on the expected clusted node; and that they are not orphaned. """ def __init__(self, cm): """ Create a new PrimitiveAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PrimitiveAudit" self._active_nodes = [] self._constraints = [] self._inactive_nodes = [] self._resources = [] self._target = None def _audit_resource(self, resource, quorum): """ Perform the audit of a single resource """ rc = True active = self._cm.resource_location(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %r" % (resource.id, active)) elif resource.needs_quorum == 1: self._cm.log("Resource %s active without quorum: %r" % (resource.id, active)) rc = False elif not resource.managed: self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active)) elif not resource.unique: # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %r" % (resource.id, active)) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active)) rc = False elif resource.orphan: self.debug("Resource %s is an inactive orphan" % resource.id) elif not self._inactive_nodes: self._cm.log("WARN: Resource %s not served anywhere" % resource.id) rc = False elif self._cm.Env["warn-inactive"]: if quorum or not resource.needs_quorum: self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) else: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) return rc def _setup(self): """ Verify cluster nodes are active, and collect resource and colocation information used for performing the audit. """ for node in self._cm.Env["nodes"]: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self._active_nodes.append(node) else: self._inactive_nodes.append(node) for node in self._cm.Env["nodes"]: - if self._target is None and self._cm.ShouldBeStatus[node] == "up": + if self._target is None and self._cm.expected_status[node] == "up": self._target = node if not self._target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name) return False (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): self._resources.append(AuditResource(self._cm, line)) elif re.search("^Constraint", line): self._constraints.append(AuditConstraint(self._cm, line)) else: self._cm.log("Unknown entry: %s" % line) return True def __call__(self): result = True if not self._setup(): return result quorum = self._cm.has_quorum(None) for resource in self._resources: if resource.type == "primitive" and not self._audit_resource(resource, quorum): result = False return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class GroupAudit(PrimitiveAudit): """ Audit group resources to verify that each of its child primitive resources is active on the expected cluster node. """ def __init__(self, cm): """ Create a new GroupAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "GroupAudit" def __call__(self): result = True if not self._setup(): return result for group in self._resources: if group.type != "group": continue first_match = True group_location = None for child in self._resources: if child.parent != group.id: continue nodes = self._cm.resource_location(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = False if len(nodes) > 1: result = False self._cm.log("Child %s of %s is active more than once: %r" % (child.id, group.id, nodes)) elif not nodes: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: result = False self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return result class CloneAudit(PrimitiveAudit): """ Audit clone resources. NOTE: Currently, this class does not perform any actual audit functions. """ def __init__(self, cm): """ Create a new CloneAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "CloneAudit" def __call__(self): result = True if not self._setup(): return result for clone in self._resources: if clone.type != "clone": continue for child in self._resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return result class ColocationAudit(PrimitiveAudit): """ Audit cluster resources to verify that those that should be colocated with each other actually are. """ def __init__(self, cm): """ Create a new ColocationAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "ColocationAudit" def _crm_location(self, resource): """ Return a list of cluster nodes where a given resource is running """ (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): result = True if not self._setup(): return result for coloc in self._constraints: if coloc.type != "rsc_colocation": continue source = self._crm_location(coloc.rsc) target = self._crm_location(coloc.target) if not source: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: result = False self._cm.log("Colocation audit (%s): %s running on %s (not in %r)" % (coloc.id, coloc.rsc, node, target)) else: self.debug("Colocation audit (%s): %s running on %s (in %r)" % (coloc.id, coloc.rsc, node, target)) return result class ControllerStateAudit(ClusterAudit): """ Audit cluster nodes to verify that those we expect to be active are active, and those that are expected to be inactive are inactive. """ def __init__(self, cm): """ Create a new ControllerStateAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "ControllerStateAudit" def __call__(self): result = True up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self._cm.Env["nodes"]: - should_be = self._cm.ShouldBeStatus[node] + should_be = self._cm.expected_status[node] rc = self._cm.test_node_cm(node) if rc > 0: if should_be == "down": down_are_up += 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down += 1 if len(unstable_list) > 0: result = False self._cm.log("Cluster is not stable: %d (of %d): %r" % (len(unstable_list), self._cm.upcount(), unstable_list)) if up_are_down > 0: result = False self._cm.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self._cm.Env["nodes"]))) if down_are_up > 0: result = False self._cm.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self._cm.Env["nodes"]))) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class CIBAudit(ClusterAudit): """ Audit the CIB by verifying that it is identical across cluster nodes """ def __init__(self, cm): """ Create a new CIBAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "CibAudit" def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: self.debug("\tNo partitions to audit") return result for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) if self._audit_cib_contents(partition) == 0: result = False return result def _audit_cib_contents(self, hostlist): """ Perform the CIB audit on the given hosts """ passed = True node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self._store_remote_cib(node, node0) if node_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node) passed = False elif node0 is None: node0 = node node0_xml = node_xml elif node0_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node0) passed = False else: (rc, result) = self._cm.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) if rc != 0: self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = False for line in result: if not re.search("", line): passed = False self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) return passed def _store_remote_cib(self, node, target): """ Store a copy of the given node's CIB on the given target node. If no target is given, store the CIB on the given node. """ filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1) if rc != 0: self._cm.log("Could not retrieve configuration") return None self._cm.rsh("localhost", "rm -f %s" % filename) for line in lines: self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self._cm.log("Could not store configuration") return None return filename def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class PartitionAudit(ClusterAudit): """ Audit each partition in a cluster to verify a variety of conditions: * The number of partitions and the nodes in each is as expected * Each node is active when it should be active and inactive when it should be inactive * The status and epoch of each node is as expected * A partition has quorum * A partition has a DC when expected """ def __init__(self, cm): """ Create a new PartitionAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PartitionAudit" self._node_epoch = {} self._node_state = {} self._node_quorum = {} def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: return result self._cm.cluster_stable(double_check=True) if len(ccm_partitions) != self._cm.partitions_expected: self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) result = False for partition in ccm_partitions: self._cm.log("\t %s" % partition) for partition in ccm_partitions: if self._audit_partition(partition) == 0: result = False return result def _trim_string(self, avalue): """ Remove the last character from a multi-character string """ if not avalue: return None if len(avalue) > 1: return avalue[:-1] return avalue def _trim2int(self, avalue): """ Remove the last character from a multi-character string and convert the result to an int. """ trimmed = self._trim_string(avalue) if trimmed: return int(trimmed) return None def _audit_partition(self, partition): """ Perform the audit of a single partition """ passed = True dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % partition) for node in node_list: - if self._cm.ShouldBeStatus[node] != "up": + if self._cm.expected_status[node] != "up": self._cm.log("Warn: Node %s appeared out of nowhere" % node) - self._cm.ShouldBeStatus[node] = "up" + self._cm.expected_status[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1) self._node_state[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1) self._node_epoch[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1) self._node_quorum[node] = out[0].strip() self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node])) self._node_state[node] = self._trim_string(self._node_state[node]) self._node_epoch[node] = self._trim2int(self._node_epoch[node]) self._node_quorum[node] = self._trim_string(self._node_quorum[node]) if not self._node_epoch[node]: self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node) - self._cm.ShouldBeStatus[node] = "down" + self._cm.expected_status[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch: lowest_epoch = self._node_epoch[node] if not lowest_epoch: self._cm.log("Lowest epoch not determined in %s" % partition) passed = False for node in node_list: - if self._cm.ShouldBeStatus[node] != "up": + if self._cm.expected_status[node] != "up": continue if self._cm.is_node_dc(node, self._node_state[node]): dc_found.append(node) if self._node_epoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self._node_epoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self._cm.log("DC %s is not the oldest node (%d vs. %d)" % (node, self._node_epoch[node], lowest_epoch)) passed = False if not dc_found: self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self._cm.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = False if not passed: for node in node_list: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self._cm.log("epoch %s : %s" % (self._node_epoch[node], self._node_state[node])) return passed def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False # pylint: disable=invalid-name def audit_list(cm): """ Return a list of instances of applicable audits that can be performed for the given ClusterManager. """ result = [] for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit, PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit, ColocationAudit, CIBAudit]: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py index a1fa1df5b2..b7ed1eb666 100644 --- a/python/pacemaker/_cts/clustermanager.py +++ b/python/pacemaker/_cts/clustermanager.py @@ -1,806 +1,806 @@ """ ClusterManager class for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["ClusterManager"] __copyright__ = """Copyright 2000-2023 the Pacemaker project contributors. Certain portions by Huang Zhen are copyright 2004 International Business Machines. The version control history for this file may have further details.""" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import time from collections import UserDict from pacemaker.buildoptions import BuildOptions from pacemaker._cts.CTS import NodeStatus from pacemaker._cts.audits import AuditResource from pacemaker._cts.cib import ConfigFactory from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogWatcher # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). It's possible we could fix this with type annotations, # but those were introduced with python 3.5 and we only support python 3.4. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable # ClusterManager has a lot of methods. # pylint: disable=too-many-public-methods class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def _final_conditions(self): for key in list(self.keys()): if self[key] is None: raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key) def __init__(self): # Eventually, ClusterManager should not be a UserDict subclass. Until # that point... # pylint: disable=super-init-not-called self.Env = EnvFactory().getInstance() self.templates = PatternSelector(self.Env["Name"]) self.logger = LogFactory() self.data = {} self.name = self.Env["Name"] self.rsh = RemoteFactory().getInstance() - self.ShouldBeStatus={} + self.expected_status = {} # pylint: disable=invalid-name self.ns = NodeStatus(self.Env) self.OurNode = os.uname()[1].lower() self.__instance_errors_to_ignore = [] self.cib_installed = False self._final_conditions() self.CIBsync = {} self.CibFactory = ConfigFactory(self) self.cib = self.CibFactory.create_config(self.Env["Schema"]) def __getitem__(self, key): if key == "Name": return self.name print("FIXME: Getting %s from %r" % (key, self)) if key in self.data: return self.data[key] return self.templates.get_patterns(key) def __setitem__(self, key, value): print("FIXME: Setting %s=%s on %r" % (key, value, self)) self.data[key] = value def key_for_node(self, node): return node def clear_instance_errors_to_ignore(self): """ Reset instance-specific errors to ignore on each iteration """ self.__instance_errors_to_ignore = [] @property def instance_errors_to_ignore(self): """ Return a list of known errors that should be ignored for a specific test instance """ return self.__instance_errors_to_ignore @property def errors_to_ignore(self): """ Return a list of known error messages that should be ignored """ return self.templates.get_patterns("BadNewsIgnore") def log(self, args): self.logger.log(args) def debug(self, args): self.logger.debug(args) def upcount(self): '''How many nodes are up?''' count = 0 for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up": + if self.expected_status[node] == "up": count = count + 1 return count def install_support(self, command="install"): for node in self.Env["nodes"]: self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command)) def prepare_fencing_watcher(self): # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced if self.has_quorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonithPats = [] for peer in self.Env["nodes"]: - if self.ShouldBeStatus[peer] == "up": + if self.expected_status[peer] == "up": continue stonithPats.extend([ self.templates["Pat:Fencing_ok"] % peer, self.templates["Pat:Fencing_start"] % peer ]) stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0) stonith.set_watch() return stonith def fencing_cleanup(self, node, stonith): peer_list = [] peer_state = {} self.debug("Looking for nodes that were fenced as a result of %s starting" % node) # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.has_quorum(None) if not q and len(self.Env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug("Quorum: %s Len: %d" % (q, len(self.Env["nodes"]))) return peer_list for n in self.Env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug("looking for: %r" % stonith.regexes) shot = stonith.look(0) while shot: self.debug("Found: %r" % shot) del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.Env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self.logger.log("ERROR: Unknown stonith match: %r" % shot) elif not peer in peer_list: self.debug("Found peer: %s" % peer) peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) if self.Env["at-boot"]: - self.ShouldBeStatus[peer] = "up" + self.expected_status[peer] = "up" else: - self.ShouldBeStatus[peer] = "down" + self.expected_status[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while stonith.regexes and shot: self.debug("Found: %r" % shot) del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too self.ns.wait_for_node(peer, self.Env["DeadTime"]) # Poll until it comes up if self.Env["at-boot"]: if not self.stat_cm(peer): time.sleep(self.Env["StartTime"]) if not self.stat_cm(peer): self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) return None return peer_list def start_cm(self, node, verbose=False): """ Start up the cluster manager on a given node """ if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node)) else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) - if not node in self.ShouldBeStatus: - self.ShouldBeStatus[node] = "down" + if not node in self.expected_status: + self.expected_status[node] = "down" - if self.ShouldBeStatus[node] != "down": + if self.expected_status[node] != "down": return True # Technically we should always be able to notice ourselves starting patterns = [ self.templates["Pat:Local_started"] % node ] if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) watch = LogWatcher( self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10) self.install_config(node) - self.ShouldBeStatus[node] = "any" + self.expected_status[node] = "any" if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]): self.logger.log ("%s was already started" % node) return True stonith = self.prepare_fencing_watcher() watch.set_watch() (rc, _) = self.rsh(node, self.templates["StartCmd"]) if rc != 0: self.logger.log ("Warn: Start command failed on node %s" % node) self.fencing_cleanup(node, stonith) return False - self.ShouldBeStatus[node] = "up" + self.expected_status[node] = "up" watch_result = watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % regex) if watch_result and self.cluster_stable(self.Env["DeadTime"]): self.fencing_cleanup(node, stonith) return True if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]): self.fencing_cleanup(node, stonith) return True self.logger.log ("Warn: Start failed for node %s" % node) return False def start_cm_async(self, node, verbose=False): """ Start up the cluster manager on a given node without blocking """ if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node)) else: self.debug("Starting %s on node %s" % (self["Name"], node)) self.install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=False) - self.ShouldBeStatus[node] = "up" + self.expected_status[node] = "up" def stop_cm(self, node, verbose=False, force=False): """ Stop the cluster manager on a given node """ if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node)) else: self.debug("Stopping %s on node %s" % (self["Name"], node)) - if self.ShouldBeStatus[node] != "up" and not force: + if self.expected_status[node] != "up" and not force: return True (rc, _) = self.rsh(node, self.templates["StopCmd"]) if rc == 0: # Make sure we can continue even if corosync leaks - self.ShouldBeStatus[node] = "down" + self.expected_status[node] = "down" self.cluster_stable(self.Env["DeadTime"]) return True self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) return False def stop_cm_async(self, node): """ Stop the cluster manager on a given node without blocking """ self.debug("Stopping %s on node %s" % (self["Name"], node)) self.rsh(node, self.templates["StopCmd"], synchronous=False) - self.ShouldBeStatus[node] = "down" + self.expected_status[node] = "down" def startall(self, nodelist=None, verbose=False, quick=False): """ Start the cluster manager on every node in the cluster, or on every node in nodelist if not None """ if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: - if self.ShouldBeStatus[node] == "down": + if self.expected_status[node] == "down": self.ns.wait_for_all_nodes(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... return self.start_cm(nodelist[0], verbose=verbose) # Approximation of SimulStartList for --boot watchpats = [ self.templates["Pat:DC_IDLE"] ] for node in nodelist: watchpats.extend([ self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node, self.templates["Pat:Local_started"] % node, self.templates["Pat:They_up"] % (nodelist[0], node) ]) # Start all the nodes - at about the same time... watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10) watch.set_watch() if not self.start_cm(nodelist[0], verbose=verbose): return False for node in nodelist: self.start_cm_async(node, verbose=verbose) watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % regex) if not self.cluster_stable(): self.logger.log("Cluster did not stabilize") return False return True def stopall(self, nodelist=None, verbose=False, force=False): """ Stop the cluster manager on every node in the cluster, or on every node in nodelist if not None """ ret = True if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up" or force: + if self.expected_status[node] == "up" or force: if not self.stop_cm(node, verbose=verbose, force=force): ret = False return ret def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.stat_cm(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): '''isolate the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node == target: continue (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node)) if rc != 0: self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) return False self.debug("Communication cut between %s and %s" % (target, node)) return True def unisolate_node(self, target, nodes=None): '''fix the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node == target: continue # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False) self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False) self.debug("Communication restored between %s and %s" % (target, node)) def oprofile_start(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofile_start(n) elif node in self.Env["oprofile"]: self.debug("Enabling oprofile on %s" % node) self.rsh(node, "opcontrol --init") self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") self.rsh(node, "opcontrol --start") self.rsh(node, "opcontrol --reset") def oprofile_save(self, test, node=None): if not node: for n in self.Env["oprofile"]: self.oprofile_save(test, n) elif node in self.Env["oprofile"]: self.rsh(node, "opcontrol --dump") self.rsh(node, "opcontrol --save=cts.%d" % test) # Read back with: opreport -l session:cts.0 image:/c* self.oprofile_stop(node) self.oprofile_start(node) def oprofile_stop(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofile_stop(n) elif node in self.Env["oprofile"]: self.debug("Stopping oprofile on %s" % node) self.rsh(node, "opcontrol --reset") self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") def install_config(self, node): if not self.ns.wait_for_node(node): self.log("Node %s is not up." % node) return if node in self.CIBsync or not self.Env["ClobberCIB"]: return self.CIBsync[node] = True self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR) # Only install the CIB on the first node, all the other ones will pick it up from there if self.cib_installed: return self.cib_installed = True if self.Env["CIBfilename"] is None: self.log("Installing Generated CIB on node %s" % node) self.cib.install(node) else: self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node)) rc = self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) if rc != 0: raise ValueError("Can not scp file to %s %d" % (node, rc)) self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR)) def prepare(self): '''Finish the Initialization process. Prepare to test...''' self.partitions_expected = 1 for node in self.Env["nodes"]: - self.ShouldBeStatus[node] = "" + self.expected_status[node] = "" if self.Env["experimental-tests"]: self.unisolate_node(node) self.stat_cm(node) def test_node_cm(self, node): '''Report the status of the cluster manager on a given node''' watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)", self.templates["Pat:NonDC_started"] % node, self.templates["Pat:DC_started"] % node ] idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle") idle_watch.set_watch() (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if not out: out = "" else: out = out[0].strip() self.debug("Node %s status: '%s'" % (node, out)) if out.find('ok') < 0: - if self.ShouldBeStatus[node] == "up": + if self.expected_status[node] == "up": self.log( "Node status for %s is %s but we think it should be %s" - % (node, "down", self.ShouldBeStatus[node])) - self.ShouldBeStatus[node] = "down" + % (node, "down", self.expected_status[node])) + self.expected_status[node] = "down" return 0 - if self.ShouldBeStatus[node] == "down": + if self.expected_status[node] == "down": self.log( "Node status for %s is %s but we think it should be %s: %s" - % (node, "up", self.ShouldBeStatus[node], out)) + % (node, "up", self.expected_status[node], out)) - self.ShouldBeStatus[node] = "up" + self.expected_status[node] = "up" # check the output first - because syslog-ng loses messages if out.find('S_NOT_DC') != -1: # Up and stable return 2 if out.find('S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" % (node, out)) return 1 # Up and stable return 2 def stat_cm(self, node): """ Report the status of the cluster manager on a given node """ return self.test_node_cm(node) > 0 # Being up and being stable is not the same question... def node_stable(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_cm(node) == 2: return True self.log("Warn: Node %s not stable" % node) return False def partition_stable(self, nodes, timeout=None): watchpats = [ "Current ping state: S_IDLE", self.templates["Pat:DC_IDLE"] ] self.debug("Waiting for cluster stability...") if timeout is None: timeout = self.Env["DeadTime"] if len(nodes) < 3: self.debug("Cluster is inactive") return True idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout) idle_watch.set_watch() for node in nodes.split(): # have each node dump its current state self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) ret = idle_watch.look() while ret: self.debug(ret) for node in nodes.split(): if re.search(node, ret): return True ret = idle_watch.look() self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout)) return False def cluster_stable(self, timeout=None, double_check=False): partitions = self.find_partitions() for partition in partitions: if not self.partition_stable(partition, timeout): return False if not double_check: return True # Make sure we are really stable and that all resources, # including those that depend on transient node attributes, # are started if they were going to be time.sleep(5) for partition in partitions: if not self.partition_stable(partition, timeout): return False return True def is_node_dc(self, node, status_line=None): if not status_line: (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if out: status_line = out[0].strip() if not status_line: return False if status_line.find('S_IDLE') != -1: return True if status_line.find('S_INTEGRATION') != -1: return True if status_line.find('S_FINALIZE_JOIN') != -1: return True if status_line.find('S_POLICY_ENGINE') != -1: return True if status_line.find('S_TRANSITION_ENGINE') != -1: return True return False def active_resources(self, node): (_, output) = self.rsh(node, "crm_resource -c", verbose=1) resources = [] for line in output: if not re.search("^Resource", line): continue tmp = AuditResource(self, line) if tmp.type == "primitive" and tmp.host == node: resources.append(tmp.id) return resources def resource_location(self, rid): ResourceNodes = [] for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] != "up": + if self.expected_status[node] != "up": continue cmd = self.templates["RscRunning"] % rid (rc, lines) = self.rsh(node, cmd) if rc == 127: self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) for line in lines: self.log("Output: %s " % line) elif rc == 0: ResourceNodes.append(node) return ResourceNodes def find_partitions(self): ccm_partitions = [] for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] != "up": + if self.expected_status[node] != "up": self.debug("Node %s is down... skipping" % node) continue (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) if not out: self.log("no partition details for %s" % node) continue partition = out[0].strip() if len(partition) <= 2: self.log("bad partition details for %s" % node) continue nodes = partition.split() nodes.sort() partition = ' '.join(nodes) found = 0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug("Adding partition from %s: %s" % (node, partition)) ccm_partitions.append(partition) else: self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) self.debug("Found partitions: %r" % ccm_partitions) return ccm_partitions def has_quorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] for node in node_list: - if self.ShouldBeStatus[node] != "up": + if self.expected_status[node] != "up": continue (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) quorum = quorum[0].strip() if quorum.find("1") != -1: return True if quorum.find("0") != -1: return False self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum)) return False @property def components(self): raise NotImplementedError def standby_status(self, node): (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) if not out: return "off" out = out[0].strip() self.debug("Standby result: %s" % out) return out # status == "on" : Enter Standby mode # status == "off": Enter Active mode def set_standby_mode(self, node, status): current_status = self.standby_status(node) if current_status == status: return True cmd = self.templates["StandbyCmd"] % (node, status) (rc, _) = self.rsh(node, cmd) return rc == 0 def add_dummy_rsc(self, node, rid): rsc_xml = """ ' '""" % (rid, rid) constraint_xml = """ ' ' """ % (rid, node, node, rid) self.rsh(node, self.templates['CibAddXml'] % rsc_xml) self.rsh(node, self.templates['CibAddXml'] % constraint_xml) def remove_dummy_rsc(self, node, rid): constraint = "\"//rsc_location[@rsc='%s']\"" % rid rsc = "\"//primitive[@id='%s']\"" % rid self.rsh(node, self.templates['CibDelXpath'] % constraint) self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py index aa323b3b9b..fba883dc7a 100644 --- a/python/pacemaker/_cts/tests/componentfail.py +++ b/python/pacemaker/_cts/tests/componentfail.py @@ -1,159 +1,159 @@ """ Kill a pacemaker daemon and test how the cluster recovers """ __all__ = ["ComponentFail"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re from pacemaker._cts.audits import AuditResource from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class ComponentFail(CTSTest): """ A concrete test that kills a random pacemaker daemon and waits for the cluster to recover """ def __init__(self, cm): """ Create a new ComponentFail instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.is_unsafe = True self.name = "ComponentFail" self._complist = cm.components self._okerrpatterns = [] self._patterns = [] self._startall = SimulStartLite(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") self._patterns = [] self._okerrpatterns = [] # start all nodes ret = self._startall(None) if not ret: return self.failure("Setup failed") if not self._cm.cluster_stable(self._env["StableTime"]): return self.failure("Setup failed - unstable") node_is_dc = self._cm.is_node_dc(node, None) # select a component to kill chosen = self._env.random_gen.choice(self._complist) while chosen.dc_only and not node_is_dc: chosen = self._env.random_gen.choice(self._complist) self.debug("...component %s (dc=%s)" % (chosen.name, node_is_dc)) self.incr(chosen.name) if chosen.name != "corosync": self._patterns.extend([ self.templates["Pat:ChildKilled"] % (node, chosen.name), self.templates["Pat:ChildRespawn"] % (node, chosen.name) ]) self._patterns.extend(chosen.pats) if node_is_dc: self._patterns.extend(chosen.dc_pats) # @TODO this should be a flag in the Component if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]: # Ignore actions for fence devices if fencer will respawn # (their registration will be lost, and probes will fail) self._okerrpatterns = [ self.templates["Pat:Fencing_active"] ] (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self._cm, line) if r.rclass == "stonith": self._okerrpatterns.extend([ self.templates["Pat:Fencing_recover"] % r.id, self.templates["Pat:Fencing_probe"] % r.id ]) # supply a copy so self.patterns doesn't end up empty tmp_pats = self._patterns.copy() self._patterns.extend(chosen.badnews_ignore) # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status stonith_pats = [ self.templates["Pat:Fencing_ok"] % node ] stonith = self.create_watch(stonith_pats, 0) stonith.set_watch() # set the watch for stable watch = self.create_watch( tmp_pats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"]) watch.set_watch() # kill the component chosen.kill(node) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for any fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") self._cm.cluster_stable(self._env["StartTime"]) self.debug("Checking if %s was shot" % node) shot = stonith.look(60) if shot: self.debug("Found: %r" % shot) self._okerrpatterns.append(self.templates["Pat:Fencing_start"] % node) if not self._env["at-boot"]: - self._cm.ShouldBeStatus[node] = "down" + self._cm.expected_status[node] = "down" # If fencing occurred, chances are many (if not all) the expected logs # will not be sent - or will be lost when the node reboots return self.success() # check for logs indicating a graceful recovery matched = watch.look_for_all(allow_multiple_matches=True) if watch.unmatched: self._logger.log("Patterns not found: %r" % watch.unmatched) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["StartTime"]) if not matched: return self.failure("Didn't find all expected %s patterns" % chosen.name) if not is_stable: return self.failure("Cluster did not become stable after killing %s" % chosen.name) return self.success() @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ # Note that okerrpatterns refers to the last time we ran this test # The good news is that this works fine for us... self._okerrpatterns.extend(self._patterns) return self._okerrpatterns diff --git a/python/pacemaker/_cts/tests/fliptest.py b/python/pacemaker/_cts/tests/fliptest.py index d3f1571419..5e779367e4 100644 --- a/python/pacemaker/_cts/tests/fliptest.py +++ b/python/pacemaker/_cts/tests/fliptest.py @@ -1,61 +1,61 @@ """ Stop running nodes, and start stopped nodes """ __all__ = ["FlipTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import time from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.tests.stoptest import StopTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class FlipTest(CTSTest): """ A concrete test that stops running nodes and starts stopped nodes """ def __init__(self, cm): """ Create a new FlipTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Flip" self._start = StartTest(cm) self._stop = StopTest(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self.incr("stopped") ret = self._stop(node) kind = "up->down" # Give the cluster time to recognize it's gone... time.sleep(self._env["StableTime"]) - elif self._cm.ShouldBeStatus[node] == "down": + elif self._cm.expected_status[node] == "down": self.incr("started") ret = self._start(node) kind = "down->up" else: return self.skipped() self.incr(kind) if ret: return self.success() return self.failure("%s failure" % kind) diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py index 94b1abdcbf..abbd82599e 100644 --- a/python/pacemaker/_cts/tests/nearquorumpointtest.py +++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py @@ -1,125 +1,125 @@ """ Randomly start and stop nodes to bring the cluster close to the quorum point """ __all__ = ["NearQuorumPointTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class NearQuorumPointTest(CTSTest): """ A concrete test that randomly starts and stops nodes to bring the cluster close to the quorum point """ def __init__(self, cm): """ Create a new NearQuorumPointTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "NearQuorumPoint" def __call__(self, dummy): """ Perform this test """ self.incr("calls") startset = [] stopset = [] stonith = self._cm.prepare_fencing_watcher() #decide what to do with each node for node in self._env["nodes"]: action = self._env.random_gen.choice(["start", "stop"]) if action == "start" : startset.append(node) elif action == "stop" : stopset.append(node) self.debug("start nodes:%r" % startset) self.debug("stop nodes:%r" % stopset) #add search patterns watchpats = [ ] for node in stopset: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": watchpats.append(self.templates["Pat:We_stopped"] % node) for node in startset: - if self._cm.ShouldBeStatus[node] == "down": + if self._cm.expected_status[node] == "down": watchpats.append(self.templates["Pat:Local_started"] % node) else: for stopping in stopset: - if self._cm.ShouldBeStatus[stopping] == "up": + if self._cm.expected_status[stopping] == "up": watchpats.append(self.templates["Pat:They_stopped"] % (node, self._cm.key_for_node(stopping))) if not watchpats: return self.skipped() if startset: watchpats.append(self.templates["Pat:DC_IDLE"]) watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) watch.set_watch() #begin actions for node in stopset: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self._cm.stop_cm_async(node) for node in startset: - if self._cm.ShouldBeStatus[node] == "down": + if self._cm.expected_status[node] == "down": self._cm.start_cm_async(node) #get the result if watch.look_for_all(): self._cm.cluster_stable() self._cm.fencing_cleanup("NearQuorumPoint", stonith) return self.success() self._logger.log("Warn: Patterns not found: %r" % watch.unmatched) #get the "bad" nodes upnodes = [] for node in stopset: if self._cm.stat_cm(node): upnodes.append(node) downnodes = [] for node in startset: if not self._cm.stat_cm(node): downnodes.append(node) self._cm.fencing_cleanup("NearQuorumPoint", stonith) if not upnodes and not downnodes: self._cm.cluster_stable() # Make sure they're completely down with no residule for node in stopset: self._rsh(node, self.templates["StopCmd"]) return self.success() if upnodes: self._logger.log("Warn: Unstoppable nodes: %r" % upnodes) if downnodes: self._logger.log("Warn: Unstartable nodes: %r" % downnodes) return self.failure() diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py index 05f07c9014..16cf67472c 100644 --- a/python/pacemaker/_cts/tests/simulstartlite.py +++ b/python/pacemaker/_cts/tests/simulstartlite.py @@ -1,131 +1,131 @@ """ Simultaneously start stopped nodes """ __all__ = ["SimulStartLite"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStartLite(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class starts any stopped nodes more or less simultaneously. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStartLite instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self,cm) self.name = "SimulStartLite" def __call__(self, dummy): """ Start all stopped nodes more or less simultaneously, returning whether this succeeded or not. """ self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... node_list = [] for node in self._env["nodes"]: - if self._cm.ShouldBeStatus[node] == "down": + if self._cm.expected_status[node] == "down": self.incr("WasStopped") node_list.append(node) self.set_timer() while len(node_list) > 0: # Repeat until all nodes come up uppat = self.templates["Pat:NonDC_started"] if self._cm.upcount() == 0: uppat = self.templates["Pat:Local_started"] watchpats = [ self.templates["Pat:DC_IDLE"] ] for node in node_list: watchpats.extend([uppat % node, self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node]) # Start all the nodes - at about the same time... watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() stonith = self._cm.prepare_fencing_watcher() for node in node_list: self._cm.start_cm_async(node) watch.look_for_all() node_list = self._cm.fencing_cleanup(self.name, stonith) if node_list is None: return self.failure("Cluster did not stabilize") # Remove node_list messages from watch.unmatched for node in node_list: self._logger.debug("Dealing with stonith operations for %s" % node_list) if watch.unmatched: try: watch.unmatched.remove(uppat % node) except ValueError: self.debug("Already matched: %s" % (uppat % node)) try: watch.unmatched.remove(self.templates["Pat:InfraUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node)) try: watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node)) if watch.unmatched: for regex in watch.unmatched: self._logger.log ("Warn: Startup pattern not found: %s" % regex) if not self._cm.cluster_stable(): return self.failure("Cluster did not stabilize") did_fail = False unstable = [] for node in self._env["nodes"]: if not self._cm.stat_cm(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: %s" % unstable) unstable = [] for node in self._env["nodes"]: if not self._cm.node_stable(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: %s" % unstable) return self.success() def is_applicable(self): """ SimulStartLite is a setup test and never applicable """ return False diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py index 2f51ac655a..9dbc1f2d2d 100644 --- a/python/pacemaker/_cts/tests/simulstoplite.py +++ b/python/pacemaker/_cts/tests/simulstoplite.py @@ -1,91 +1,91 @@ """ Simultaneously stop running nodes """ __all__ = ["SimulStopLite"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStopLite(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class stops any running nodes more or less simultaneously. It can be used both to set up a test or to clean up a test. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStopLite instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self,cm) self.name = "SimulStopLite" def __call__(self, dummy): """ Stop all running nodes more or less simultaneously, returning whether this succeeded or not. """ self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... watchpats = [] for node in self._env["nodes"]: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self.incr("WasStarted") watchpats.append(self.templates["Pat:We_stopped"] % node) if len(watchpats) == 0: return self.success() # Stop all the nodes - at about the same time... watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() self.set_timer() for node in self._env["nodes"]: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self._cm.stop_cm_async(node) if watch.look_for_all(): # Make sure they're completely down with no residule for node in self._env["nodes"]: self._rsh(node, self.templates["StopCmd"]) return self.success() did_fail = False up_nodes = [] for node in self._env["nodes"]: if self._cm.stat_cm(node): did_fail = True up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: %s" % up_nodes) self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched) return self.failure("Missing log message: %s " % watch.unmatched) def is_applicable(self): """ SimulStopLite is a setup test and never applicable """ return False diff --git a/python/pacemaker/_cts/tests/starttest.py b/python/pacemaker/_cts/tests/starttest.py index 3b0b18f7a3..53a347a392 100644 --- a/python/pacemaker/_cts/tests/starttest.py +++ b/python/pacemaker/_cts/tests/starttest.py @@ -1,54 +1,54 @@ """ Start the cluster manager on a given node """ __all__ = ["StartTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StartTest(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class starts the cluster manager on a given node. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new StartTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self,cm) self.name = "Start" def __call__(self, node): """ Start the given node, returning whether this succeeded or not """ self.incr("calls") if self._cm.upcount() == 0: self.incr("us") else: self.incr("them") - if self._cm.ShouldBeStatus[node] != "down": + if self._cm.expected_status[node] != "down": return self.skipped() if self._cm.start_cm(node): return self.success() return self.failure("Startup %s on node %s failed" % (self._env["Name"], node)) diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py index da3848ad0c..e2fa341227 100644 --- a/python/pacemaker/_cts/tests/stonithdtest.py +++ b/python/pacemaker/_cts/tests/stonithdtest.py @@ -1,139 +1,139 @@ """ Fence a running node and wait for it to restart """ __all__ = ["StonithdTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker.exitstatus import ExitStatus from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StonithdTest(CTSTest): """ A concrete test that fences a running node and waits for it to restart """ def __init__(self, cm): """ Create a new StonithdTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Stonithd" self._startall = SimulStartLite(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") if len(self._env["nodes"]) < 2: return self.skipped() ret = self._startall(None) if not ret: return self.failure("Setup failed") watchpats = [ self.templates["Pat:Fencing_ok"] % node, self.templates["Pat:NodeFenced"] % node ] if not self._env["at-boot"]: self.debug("Expecting %s to stay down" % node) - self._cm.ShouldBeStatus[node] = "down" + self._cm.expected_status[node] = "down" else: self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"])) watchpats.extend([ "%s.* S_STARTING -> S_PENDING" % node, "%s.* S_PENDING -> S_NOT_DC" % node ]) watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"]) watch.set_watch() origin = self._env.random_gen.choice(self._env["nodes"]) (rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node) if rc == ExitStatus.TIMEOUT: # Look for the patterns, usually this means the required # device was running on the node to be fenced - or that # the required devices were in the process of being loaded # and/or moved # # Effectively the node committed suicide so there will be # no confirmation, but pacemaker should be watching and # fence the node again self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node)) elif origin != node and rc != 0: self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc)) elif origin == node and rc != 255: # 255 == broken pipe, ie. the node was fenced as expected self._logger.log("Locally originated fencing returned %d" % rc) with Timer(self._logger, self.name, "fence"): matched = watch.look_for_all() self.set_timer("reform") if watch.unmatched: self._logger.log("Patterns not found: %r" % watch.unmatched) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["StartTime"]) if not matched: return self.failure("Didn't find all expected patterns") if not is_stable: return self.failure("Cluster did not become stable") self.log_timer("reform") return self.success() @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ return [ self.templates["Pat:Fencing_start"] % ".*", self.templates["Pat:Fencing_ok"] % ".*", self.templates["Pat:Fencing_active"], r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ] def is_applicable(self): """ Return True if this test is applicable in the current test configuration. """ if not CTSTest.is_applicable(self): return False # pylint gets confused because of EnvFactory here. # pylint: disable=unsupported-membership-test if "DoFencing" in self._env: return self._env["DoFencing"] return True diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py index 237e928c20..7c864e67cf 100644 --- a/python/pacemaker/_cts/tests/stoptest.py +++ b/python/pacemaker/_cts/tests/stoptest.py @@ -1,97 +1,97 @@ """ Stop the cluster manager on a given node """ __all__ = ["StopTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StopTest(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class stops the cluster manager on a given node. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new StopTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Stop" def __call__(self, node): """ Stop the given node, returning whether this succeeded or not """ self.incr("calls") - if self._cm.ShouldBeStatus[node] != "up": + if self._cm.expected_status[node] != "up": return self.skipped() # Technically we should always be able to notice ourselves stopping patterns = [ self.templates["Pat:We_stopped"] % node ] # Any active node needs to notice this one left # (note that this won't work if we have multiple partitions) for other in self._env["nodes"]: - if self._cm.ShouldBeStatus[other] == "up" and other != node: + if self._cm.expected_status[other] == "up" and other != node: patterns.append(self.templates["Pat:They_stopped"] %(other, self._cm.key_for_node(node))) watch = self.create_watch(patterns, self._env["DeadTime"]) watch.set_watch() if node == self._cm.OurNode: self.incr("us") else: if self._cm.upcount() <= 1: self.incr("all") else: self.incr("them") self._cm.stop_cm(node) watch.look_for_all() failreason = None unmatched_str = "||" if watch.unmatched: (_, output) = self._rsh(node, "/bin/ps axf", verbose=1) for line in output: self.debug(line) (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) for line in output: self.debug(line) for regex in watch.unmatched: self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex) unmatched_str += "%s||" % regex failreason = "Missing shutdown pattern" self._cm.cluster_stable(self._env["DeadTime"]) if not watch.unmatched or self._cm.upcount() == 0: return self.success() if len(watch.unmatched) >= self._cm.upcount(): return self.failure("no match against (%s)" % unmatched_str) if failreason is None: return self.success() return self.failure(failreason)