diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py index 08356b88d6..21a0ac1af7 100644 --- a/python/pacemaker/_cts/audits.py +++ b/python/pacemaker/_cts/audits.py @@ -1,1023 +1,1023 @@ """Auditing classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time import uuid from pacemaker.buildoptions import BuildOptions from pacemaker._cts.input import should_continue from pacemaker._cts.watcher import LogKind, LogWatcher class ClusterAudit: """ The base class for various kinds of auditors. Specific audit implementations should be built on top of this one. Audits can do all kinds of checks on the system. The basic interface for callers is the `__call__` method, which returns True if the audit passes and False if it fails. """ def __init__(self, cm): """ Create a new ClusterAudit instance. Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.name = None def __call__(self): """Perform the audit action.""" raise NotImplementedError def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. This method must be implemented by all subclasses. """ raise NotImplementedError def log(self, args): """Log a message.""" self._cm.log("audit: %s" % args) def debug(self, args): """Log a debug message.""" self._cm.debug("audit: %s" % args) class LogAudit(ClusterAudit): """ Audit each cluster node to verify that some logging system is usable. This is done by logging a unique test message and then verifying that we can read back that test message using logging tools. """ def __init__(self, cm): """ Create a new LogAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "LogAudit" def _restart_cluster_logging(self, nodes=None): """Restart logging on the given nodes, or all if none are given.""" if not nodes: nodes = self._cm.env["nodes"] self._cm.debug("Restarting logging on: %r" % nodes) for node in nodes: if self._cm.env["have_systemd"]: (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket") if rc != 0: self._cm.log("ERROR: Cannot stop 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service") if rc != 0: self._cm.log("ERROR: Cannot start 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"]) if rc != 0: self._cm.log("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node)) def _create_watcher(self, patterns, kind): """Create a new LogWatcher instance for the given patterns.""" watch = LogWatcher(self._cm.env["LogFileName"], patterns, self._cm.env["nodes"], kind, "LogAudit", 5, silent=True) watch.set_watch() return watch def _test_logging(self): """Perform the log audit.""" patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self._cm.env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) - watch_pref = self._cm.env["LogWatcher"] + watch_pref = self._cm.env["log_kind"] if watch_pref == LogKind.ANY: kinds = [LogKind.LOCAL_FILE] if self._cm.env["have_systemd"]: kinds.append(LogKind.JOURNAL) kinds.append(LogKind.REMOTE_FILE) for k in kinds: watch[k] = self._create_watcher(patterns, k) self._cm.log("Logging test message with identifier %s" % suffix) else: watch[watch_pref] = self._create_watcher(patterns, watch_pref) for node in self._cm.env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix) (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0) if rc != 0: self._cm.log("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in list(watch.keys()): w = watch[k] if watch_pref == LogKind.ANY: self._cm.log("Checking for test message in %s logs" % k) w.look_for_all(silent=True) if w.unmatched: for regex in w.unmatched: self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind)) else: if watch_pref == LogKind.ANY: self._cm.log("Found test message in %s logs" % k) - self._cm.env["LogWatcher"] = k + self._cm.env["log_kind"] = k return 1 return False def __call__(self): """Perform the audit action.""" max_attempts = 3 attempt = 0 self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) while attempt <= max_attempts and not self._test_logging(): attempt += 1 self._restart_cluster_logging() time.sleep(60 * attempt) if attempt > max_attempts: self._cm.log("ERROR: Cluster logging unrecoverable.") return False return True def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" if self._cm.env["LogAuditDisabled"]: return False return True class DiskAudit(ClusterAudit): """ Audit disk usage on cluster nodes. Verify that there is enough free space left on whichever mounted file system holds the logs. Warn on: less than 100 MB or 10% of free space Error on: less than 10 MB or 5% of free space """ def __init__(self, cm): """ Create a new DiskAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "DiskspaceAudit" def __call__(self): """Perform the audit action.""" result = True # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) for node in self._cm.env["nodes"]: (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1) if not dfout: self._cm.log("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) continue dfout = dfout[0].strip() try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = False if not should_continue(self._cm.env): raise ValueError("Disk full on %s" % node) elif remaining_mb < 100 or used_percent > 90: self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" return True class FileAudit(ClusterAudit): """ Audit the filesystem looking for various failure conditions. Check for: * The presence of core dumps from corosync or Pacemaker daemons * Stale IPC files """ def __init__(self, cm): """ Create a new FileAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.known = [] self.name = "FileAudit" def __call__(self): """Perform the audit action.""" result = True self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) for node in self._cm.env["nodes"]: (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Corosync core file on %s: %s" % (node, line)) if self._cm.expected_status.get(node) == "down": clean = False (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) for line in lsout: result = False clean = True self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) for line in lsout: self._cm.debug("ps[%s]: %s" % (node, line)) self._cm.rsh(node, "rm -rf /dev/shm/qb-*") else: self._cm.debug("Skipping %s" % node) return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" return True class AuditResource: """A base class for storing information about a cluster resource.""" def __init__(self, cm, line): """ Create a new AuditResource instance. Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single resource """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None @property def unique(self): """Return True if this resource is unique.""" return self.flags & 0x20 @property def orphan(self): """Return True if this resource is an orphan.""" return self.flags & 0x01 @property def managed(self): """Return True if this resource is managed by the cluster.""" return self.flags & 0x02 class AuditConstraint: """A base class for storing information about a cluster constraint.""" def __init__(self, cm, line): """ Create a new AuditConstraint instance. Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single constraint """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): """ Audit primitive resources to verify a variety of conditions. Check that: * Resources are active and managed only when expected * Resources are active on the expected cluster node * Resources are not orphaned """ def __init__(self, cm): """ Create a new PrimitiveAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PrimitiveAudit" self._active_nodes = [] self._constraints = [] self._inactive_nodes = [] self._resources = [] self._target = None def _audit_resource(self, resource, quorum): """Perform the audit of a single resource.""" rc = True active = self._cm.resource_location(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %r" % (resource.id, active)) elif resource.needs_quorum == 1: self._cm.log("Resource %s active without quorum: %r" % (resource.id, active)) rc = False elif not resource.managed: self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active)) elif not resource.unique: # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %r" % (resource.id, active)) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active)) rc = False elif resource.orphan: self.debug("Resource %s is an inactive orphan" % resource.id) elif not self._inactive_nodes: self._cm.log("WARN: Resource %s not served anywhere" % resource.id) rc = False elif self._cm.env["warn-inactive"]: if quorum or not resource.needs_quorum: self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) else: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) return rc def _setup(self): """ Verify cluster nodes are active. Collect resource and colocation information used for performing the audit. """ for node in self._cm.env["nodes"]: if self._cm.expected_status[node] == "up": self._active_nodes.append(node) else: self._inactive_nodes.append(node) for node in self._cm.env["nodes"]: if self._target is None and self._cm.expected_status[node] == "up": self._target = node if not self._target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name) return False (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): self._resources.append(AuditResource(self._cm, line)) elif re.search("^Constraint", line): self._constraints.append(AuditConstraint(self._cm, line)) else: self._cm.log("Unknown entry: %s" % line) return True def __call__(self): """Perform the audit action.""" result = True if not self._setup(): return result quorum = self._cm.has_quorum(None) for resource in self._resources: if resource.type == "primitive" and not self._audit_resource(resource, quorum): result = False return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. # if self._cm["Name"] == "crm-corosync": # return True return False class GroupAudit(PrimitiveAudit): """ Audit group resources. Check that: * Each of its child primitive resources is active on the expected cluster node """ def __init__(self, cm): """ Create a new GroupAudit instance. Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "GroupAudit" def __call__(self): result = True if not self._setup(): return result for group in self._resources: if group.type != "group": continue first_match = True group_location = None for child in self._resources: if child.parent != group.id: continue nodes = self._cm.resource_location(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = False if len(nodes) > 1: result = False self._cm.log("Child %s of %s is active more than once: %r" % (child.id, group.id, nodes)) elif not nodes: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: result = False self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return result class CloneAudit(PrimitiveAudit): """ Audit clone resources. NOTE: Currently, this class does not perform any actual audit functions. """ def __init__(self, cm): """ Create a new CloneAudit instance. Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "CloneAudit" def __call__(self): result = True if not self._setup(): return result for clone in self._resources: if clone.type != "clone": continue for child in self._resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return result class ColocationAudit(PrimitiveAudit): """ Audit cluster resources. Check that: * Resources are colocated with the expected resource """ def __init__(self, cm): """ Create a new ColocationAudit instance. Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "ColocationAudit" def _crm_location(self, resource): """Return a list of cluster nodes where a given resource is running.""" (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): result = True if not self._setup(): return result for coloc in self._constraints: if coloc.type != "rsc_colocation": continue source = self._crm_location(coloc.rsc) target = self._crm_location(coloc.target) if not source: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if node not in target: result = False self._cm.log("Colocation audit (%s): %s running on %s (not in %r)" % (coloc.id, coloc.rsc, node, target)) else: self.debug("Colocation audit (%s): %s running on %s (in %r)" % (coloc.id, coloc.rsc, node, target)) return result class ControllerStateAudit(ClusterAudit): """Verify active and inactive resources.""" def __init__(self, cm): """ Create a new ControllerStateAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "ControllerStateAudit" def __call__(self): result = True up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self._cm.env["nodes"]: should_be = self._cm.expected_status[node] rc = self._cm.test_node_cm(node) if rc > 0: if should_be == "down": down_are_up += 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down += 1 if len(unstable_list) > 0: result = False self._cm.log("Cluster is not stable: %d (of %d): %r" % (len(unstable_list), self._cm.upcount(), unstable_list)) if up_are_down > 0: result = False self._cm.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self._cm.env["nodes"]))) if down_are_up > 0: result = False self._cm.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self._cm.env["nodes"]))) return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. # if self._cm["Name"] == "crm-corosync": # return True return False class CIBAudit(ClusterAudit): """Audit the CIB by verifying that it is identical across cluster nodes.""" def __init__(self, cm): """ Create a new CIBAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "CibAudit" def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: self.debug("\tNo partitions to audit") return result for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) if self._audit_cib_contents(partition) == 0: result = False return result def _audit_cib_contents(self, hostlist): """Perform the CIB audit on the given hosts.""" passed = True node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self._store_remote_cib(node, node0) if node_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node) passed = False elif node0 is None: node0 = node node0_xml = node_xml elif node0_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node0) passed = False else: (rc, result) = self._cm.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) if rc != 0: self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = False for line in result: if not re.search("", line): passed = False self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) return passed def _store_remote_cib(self, node, target): """ Store a copy of the given node's CIB on the given target node. If no target is given, store the CIB on the given node. """ filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1) if rc != 0: self._cm.log("Could not retrieve configuration") return None self._cm.rsh("localhost", "rm -f %s" % filename) for line in lines: self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self._cm.log("Could not store configuration") return None return filename def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. # if self._cm["Name"] == "crm-corosync": # return True return False class PartitionAudit(ClusterAudit): """ Audit each partition in a cluster to verify a variety of conditions. Check that: * The number of partitions and the nodes in each is as expected * Each node is active when it should be active and inactive when it should be inactive * The status and epoch of each node is as expected * A partition has quorum * A partition has a DC when expected """ def __init__(self, cm): """ Create a new PartitionAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PartitionAudit" self._node_epoch = {} self._node_state = {} self._node_quorum = {} def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: return result self._cm.cluster_stable(double_check=True) if len(ccm_partitions) != self._cm.partitions_expected: self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) result = False for partition in ccm_partitions: self._cm.log("\t %s" % partition) for partition in ccm_partitions: if self._audit_partition(partition) == 0: result = False return result def _trim_string(self, avalue): """Remove the last character from a multi-character string.""" if not avalue: return None if len(avalue) > 1: return avalue[:-1] return avalue def _trim2int(self, avalue): """Remove the last character from a multi-character string and convert the result to an int.""" trimmed = self._trim_string(avalue) if trimmed: return int(trimmed) return None def _audit_partition(self, partition): """Perform the audit of a single partition.""" passed = True dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % partition) for node in node_list: if self._cm.expected_status[node] != "up": self._cm.log("Warn: Node %s appeared out of nowhere" % node) self._cm.expected_status[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1) self._node_state[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1) self._node_epoch[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1) self._node_quorum[node] = out[0].strip() self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node])) self._node_state[node] = self._trim_string(self._node_state[node]) self._node_epoch[node] = self._trim2int(self._node_epoch[node]) self._node_quorum[node] = self._trim_string(self._node_quorum[node]) if not self._node_epoch[node]: self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node) self._cm.expected_status[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch: lowest_epoch = self._node_epoch[node] if not lowest_epoch: self._cm.log("Lowest epoch not determined in %s" % partition) passed = False for node in node_list: if self._cm.expected_status[node] != "up": continue if self._cm.is_node_dc(node, self._node_state[node]): dc_found.append(node) if self._node_epoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self._node_epoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self._cm.log("DC %s is not the oldest node (%d vs. %d)" % (node, self._node_epoch[node], lowest_epoch)) passed = False if not dc_found: self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self._cm.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = False if not passed: for node in node_list: if self._cm.expected_status[node] == "up": self._cm.log("epoch %s : %s" % (self._node_epoch[node], self._node_state[node])) return passed def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. # if self._cm["Name"] == "crm-corosync": # return True return False # pylint: disable=invalid-name def audit_list(cm): """Return a list of instances of applicable audits that can be performed.""" result = [] for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit, PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit, ColocationAudit, CIBAudit]: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py index 763f1af712..0e9b1cfdb0 100644 --- a/python/pacemaker/_cts/clustermanager.py +++ b/python/pacemaker/_cts/clustermanager.py @@ -1,896 +1,898 @@ """ClusterManager class for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["ClusterManager"] __copyright__ = """Copyright 2000-2024 the Pacemaker project contributors. Certain portions by Huang Zhen are copyright 2004 International Business Machines. The version control history for this file may have further details.""" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import time from collections import UserDict from pacemaker.buildoptions import BuildOptions from pacemaker._cts.CTS import NodeStatus from pacemaker._cts.audits import AuditResource from pacemaker._cts.cib import ConfigFactory from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogWatcher # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). It's possible we could fix this with type annotations, # but those were introduced with python 3.5 and we only support python 3.4. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable # ClusterManager has a lot of methods. # pylint: disable=too-many-public-methods class ClusterManager(UserDict): """ An abstract base class for managing the cluster. This class implements high-level operations on the cluster and/or its cluster managers. Actual cluster-specific management classes should be subclassed from this one. Among other things, this class tracks the state every node is expected to be in. """ def _final_conditions(self): """Check all keys to make sure they have a non-None value.""" for (key, val) in self._data.items(): if val is None: raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key) def __init__(self): """ Create a new ClusterManager instance. This class can be treated kind of like a dictionary due to the process of certain dict functions like __getitem__ and __setitem__. This is because it contains a lot of name/value pairs. However, it is not actually a dictionary so do not rely on standard dictionary behavior. """ # Eventually, ClusterManager should not be a UserDict subclass. Until # that point... # pylint: disable=super-init-not-called self.__instance_errors_to_ignore = [] self._cib_installed = False self._data = {} self._logger = LogFactory() self.env = EnvFactory().getInstance() self.expected_status = {} self.name = self.env["Name"] # pylint: disable=invalid-name self.ns = NodeStatus(self.env) self.our_node = os.uname()[1].lower() self.partitions_expected = 1 self.rsh = RemoteFactory().getInstance() self.templates = PatternSelector(self.env["Name"]) self._final_conditions() self._cib_factory = ConfigFactory(self) self._cib = self._cib_factory.create_config(self.env["Schema"]) self._cib_sync = {} def __getitem__(self, key): """ Return the given key, checking for it in several places. If key is "Name", return the name of the cluster manager. If the key was previously added to the dictionary via __setitem__, return that. Otherwise, return the template pattern for the key. This method should not be used and may be removed in the future. """ if key == "Name": return self.name print("FIXME: Getting %s from %r" % (key, self)) if key in self._data: return self._data[key] return self.templates.get_patterns(key) def __setitem__(self, key, value): """ Set the given key to the given value, overriding any previous value. This method should not be used and may be removed in the future. """ print("FIXME: Setting %s=%s on %r" % (key, value, self)) self._data[key] = value def clear_instance_errors_to_ignore(self): """Reset instance-specific errors to ignore on each iteration.""" self.__instance_errors_to_ignore = [] @property def instance_errors_to_ignore(self): """Return a list of known errors that should be ignored for a specific test instance.""" return self.__instance_errors_to_ignore @property def errors_to_ignore(self): """Return a list of known error messages that should be ignored.""" return self.templates.get_patterns("BadNewsIgnore") def log(self, args): """Log a message.""" self._logger.log(args) def debug(self, args): """Log a debug message.""" self._logger.debug(args) def upcount(self): """Return how many nodes are up.""" count = 0 for node in self.env["nodes"]: if self.expected_status[node] == "up": count += 1 return count def install_support(self, command="install"): """ Install or uninstall the CTS support files. This includes various init scripts and data, daemons, fencing agents, etc. """ for node in self.env["nodes"]: self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command)) def prepare_fencing_watcher(self): """Return a LogWatcher object that watches for fencing log messages.""" # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced if self.has_quorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonith_pats = [] for peer in self.env["nodes"]: if self.expected_status[peer] == "up": continue stonith_pats.extend([ self.templates["Pat:Fencing_ok"] % peer, self.templates["Pat:Fencing_start"] % peer, ]) stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"], - self.env["LogWatcher"], "StartupFencing", 0) + self.env["log_kind"], "StartupFencing", 0) stonith.set_watch() return stonith def fencing_cleanup(self, node, stonith): """Wait for a previously fenced node to return to the cluster.""" peer_list = [] peer_state = {} self.debug("Looking for nodes that were fenced as a result of %s starting" % node) # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.has_quorum(None) if not q and len(self.env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug("Quorum: %s Len: %d" % (q, len(self.env["nodes"]))) return peer_list for n in self.env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug("looking for: %r" % stonith.regexes) shot = stonith.look(0) while shot: self.debug("Found: %r" % shot) del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self._logger.log("ERROR: Unknown stonith match: %r" % shot) elif peer not in peer_list: self.debug("Found peer: %s" % peer) peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) if self.env["at-boot"]: self.expected_status[peer] = "up" else: self.expected_status[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while stonith.regexes and shot: self.debug("Found: %r" % shot) del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too self.ns.wait_for_node(peer, self.env["DeadTime"]) # Poll until it comes up if self.env["at-boot"]: if not self.stat_cm(peer): time.sleep(self.env["StartTime"]) if not self.stat_cm(peer): self._logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) return None return peer_list def start_cm(self, node, verbose=False): """Start up the cluster manager on a given node.""" if verbose: self._logger.log("Starting %s on node %s" % (self.templates["Name"], node)) else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) if node not in self.expected_status: self.expected_status[node] = "down" if self.expected_status[node] != "down": return True # Technically we should always be able to notice ourselves starting patterns = [ self.templates["Pat:Local_started"] % node, ] if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) - watch = LogWatcher(self.env["LogFileName"], patterns, self.env["nodes"], self.env["LogWatcher"], + watch = LogWatcher(self.env["LogFileName"], patterns, + self.env["nodes"], self.env["log_kind"], "StartaCM", self.env["StartTime"] + 10) self.install_config(node) self.expected_status[node] = "any" if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): self._logger.log("%s was already started" % node) return True stonith = self.prepare_fencing_watcher() watch.set_watch() (rc, _) = self.rsh(node, self.templates["StartCmd"]) if rc != 0: self._logger.log("Warn: Start command failed on node %s" % node) self.fencing_cleanup(node, stonith) return False self.expected_status[node] = "up" watch_result = watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self._logger.log("Warn: Startup pattern not found: %s" % regex) if watch_result and self.cluster_stable(self.env["DeadTime"]): self.fencing_cleanup(node, stonith) return True if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): self.fencing_cleanup(node, stonith) return True self._logger.log("Warn: Start failed for node %s" % node) return False def start_cm_async(self, node, verbose=False): """Start up the cluster manager on a given node without blocking.""" if verbose: self._logger.log("Starting %s on node %s" % (self["Name"], node)) else: self.debug("Starting %s on node %s" % (self["Name"], node)) self.install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=False) self.expected_status[node] = "up" def stop_cm(self, node, verbose=False, force=False): """Stop the cluster manager on a given node.""" if verbose: self._logger.log("Stopping %s on node %s" % (self["Name"], node)) else: self.debug("Stopping %s on node %s" % (self["Name"], node)) if self.expected_status[node] != "up" and not force: return True (rc, _) = self.rsh(node, self.templates["StopCmd"]) if rc == 0: # Make sure we can continue even if corosync leaks self.expected_status[node] = "down" self.cluster_stable(self.env["DeadTime"]) return True self._logger.log("ERROR: Could not stop %s on node %s" % (self["Name"], node)) return False def stop_cm_async(self, node): """Stop the cluster manager on a given node without blocking.""" self.debug("Stopping %s on node %s" % (self["Name"], node)) self.rsh(node, self.templates["StopCmd"], synchronous=False) self.expected_status[node] = "down" def startall(self, nodelist=None, verbose=False, quick=False): """Start the cluster manager on every node in the cluster, or on every node in nodelist.""" if not nodelist: nodelist = self.env["nodes"] for node in nodelist: if self.expected_status[node] == "down": self.ns.wait_for_all_nodes(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... return self.start_cm(nodelist[0], verbose=verbose) # Approximation of SimulStartList for --boot watchpats = [ self.templates["Pat:DC_IDLE"], ] for node in nodelist: watchpats.extend([ self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node, self.templates["Pat:Local_started"] % node, self.templates["Pat:They_up"] % (nodelist[0], node), ]) # Start all the nodes - at about the same time... watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"], - self.env["LogWatcher"], "fast-start", self.env["DeadTime"] + 10) + self.env["log_kind"], "fast-start", + self.env["DeadTime"] + 10) watch.set_watch() if not self.start_cm(nodelist[0], verbose=verbose): return False for node in nodelist: self.start_cm_async(node, verbose=verbose) watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self._logger.log("Warn: Startup pattern not found: %s" % regex) if not self.cluster_stable(): self._logger.log("Cluster did not stabilize") return False return True def stopall(self, nodelist=None, verbose=False, force=False): """Stop the cluster manager on every node in the cluster, or on every node in nodelist.""" ret = True if not nodelist: nodelist = self.env["nodes"] for node in self.env["nodes"]: if self.expected_status[node] == "up" or force: if not self.stop_cm(node, verbose=verbose, force=force): ret = False return ret def statall(self, nodelist=None): """Return the status of the cluster manager on every node in the cluster, or on every node in nodelist.""" result = {} if not nodelist: nodelist = self.env["nodes"] for node in nodelist: if self.stat_cm(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): """Break communication between the target node and all other nodes in the cluster, or nodes.""" if not nodes: nodes = self.env["nodes"] for node in nodes: if node == target: continue (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node) if rc != 0: self._logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) return False self.debug("Communication cut between %s and %s" % (target, node)) return True def unisolate_node(self, target, nodes=None): """Re-establish communication between the target node and all other nodes in the cluster, or nodes.""" if not nodes: nodes = self.env["nodes"] for node in nodes: if node == target: continue # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False) self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False) self.debug("Communication restored between %s and %s" % (target, node)) def oprofile_start(self, node=None): """Start profiling on the given node, or all nodes in the cluster.""" if not node: for n in self.env["oprofile"]: self.oprofile_start(n) elif node in self.env["oprofile"]: self.debug("Enabling oprofile on %s" % node) self.rsh(node, "opcontrol --init") self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") self.rsh(node, "opcontrol --start") self.rsh(node, "opcontrol --reset") def oprofile_save(self, test, node=None): """Save profiling data and restart profiling on the given node, or all nodes in the cluster.""" if not node: for n in self.env["oprofile"]: self.oprofile_save(test, n) elif node in self.env["oprofile"]: self.rsh(node, "opcontrol --dump") self.rsh(node, "opcontrol --save=cts.%d" % test) # Read back with: opreport -l session:cts.0 image:/c* self.oprofile_stop(node) self.oprofile_start(node) def oprofile_stop(self, node=None): """ Start profiling on the given node, or all nodes in the cluster. This does not save profiling data, so call oprofile_save first if needed. """ if not node: for n in self.env["oprofile"]: self.oprofile_stop(n) elif node in self.env["oprofile"]: self.debug("Stopping oprofile on %s" % node) self.rsh(node, "opcontrol --reset") self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") def install_config(self, node): """Remove and re-install the CIB on the first node in the cluster.""" if not self.ns.wait_for_node(node): self.log("Node %s is not up." % node) return if node in self._cib_sync or not self.env["ClobberCIB"]: return self._cib_sync[node] = True self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR) # Only install the CIB on the first node, all the other ones will pick it up from there if self._cib_installed: return self._cib_installed = True if self.env["CIBfilename"] is None: self.log("Installing Generated CIB on node %s" % node) self._cib.install(node) else: self.log("Installing CIB (%s) on node %s" % (self.env["CIBfilename"], node)) rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) if rc != 0: raise ValueError("Can not scp file to %s %d" % (node, rc)) self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR)) def prepare(self): """ Finish initialization. Clear out the expected status and record the current status of every node in the cluster. """ self.partitions_expected = 1 for node in self.env["nodes"]: self.expected_status[node] = "" if self.env["experimental-tests"]: self.unisolate_node(node) self.stat_cm(node) def test_node_cm(self, node): """ Check the status of a given node. Returns 0 if the node is down, 1 if the node is up but unstable, and 2 if the node is up and stable. """ watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)", self.templates["Pat:NonDC_started"] % node, self.templates["Pat:DC_started"] % node, ] idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node], - self.env["LogWatcher"], "ClusterIdle") + self.env["log_kind"], "ClusterIdle") idle_watch.set_watch() (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if not out: out = "" else: out = out[0].strip() self.debug("Node %s status: '%s'" % (node, out)) if out.find('ok') < 0: if self.expected_status[node] == "up": self.log("Node status for %s is %s but we think it should be %s" % (node, "down", self.expected_status[node])) self.expected_status[node] = "down" return 0 if self.expected_status[node] == "down": self.log("Node status for %s is %s but we think it should be %s: %s" % (node, "up", self.expected_status[node], out)) self.expected_status[node] = "up" # check the output first - because syslog-ng loses messages if out.find('S_NOT_DC') != -1: # Up and stable return 2 if out.find('S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" % (node, out)) return 1 # Up and stable return 2 def stat_cm(self, node): """Report the status of the cluster manager on a given node.""" return self.test_node_cm(node) > 0 # Being up and being stable is not the same question... def node_stable(self, node): """Return whether or not the given node is stable.""" if self.test_node_cm(node) == 2: return True self.log("Warn: Node %s not stable" % node) return False def partition_stable(self, nodes, timeout=None): """Return whether or not all nodes in the given partition are stable.""" watchpats = [ "Current ping state: S_IDLE", self.templates["Pat:DC_IDLE"], ] self.debug("Waiting for cluster stability...") if timeout is None: timeout = self.env["DeadTime"] if len(nodes) < 3: self.debug("Cluster is inactive") return True idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(), - self.env["LogWatcher"], "ClusterStable", timeout) + self.env["log_kind"], "ClusterStable", timeout) idle_watch.set_watch() for node in nodes.split(): # have each node dump its current state self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) ret = idle_watch.look() while ret: self.debug(ret) for node in nodes.split(): if re.search(node, ret): return True ret = idle_watch.look() self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout)) return False def cluster_stable(self, timeout=None, double_check=False): """Return whether or not all nodes in the cluster are stable.""" partitions = self.find_partitions() for partition in partitions: if not self.partition_stable(partition, timeout): return False if not double_check: return True # Make sure we are really stable and that all resources, # including those that depend on transient node attributes, # are started if they were going to be time.sleep(5) for partition in partitions: if not self.partition_stable(partition, timeout): return False return True def is_node_dc(self, node, status_line=None): """ Return whether or not the given node is the cluster DC. Check the given status_line, or query the cluster if None. """ if not status_line: (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if out: status_line = out[0].strip() if not status_line: return False if status_line.find('S_IDLE') != -1: return True if status_line.find('S_INTEGRATION') != -1: return True if status_line.find('S_FINALIZE_JOIN') != -1: return True if status_line.find('S_POLICY_ENGINE') != -1: return True if status_line.find('S_TRANSITION_ENGINE') != -1: return True return False def active_resources(self, node): """Return a list of primitive resources active on the given node.""" (_, output) = self.rsh(node, "crm_resource -c", verbose=1) resources = [] for line in output: if not re.search("^Resource", line): continue tmp = AuditResource(self, line) if tmp.type == "primitive" and tmp.host == node: resources.append(tmp.id) return resources def resource_location(self, rid): """Return a list of nodes on which the given resource is running.""" resource_nodes = [] for node in self.env["nodes"]: if self.expected_status[node] != "up": continue cmd = self.templates["RscRunning"] % rid (rc, lines) = self.rsh(node, cmd) if rc == 127: self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) for line in lines: self.log("Output: %s " % line) elif rc == 0: resource_nodes.append(node) return resource_nodes def find_partitions(self): """ Return a list of all partitions in the cluster. Each element of the list is itself a list of all active nodes in that partition. """ ccm_partitions = [] for node in self.env["nodes"]: if self.expected_status[node] != "up": self.debug("Node %s is down... skipping" % node) continue (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) if not out: self.log("no partition details for %s" % node) continue partition = out[0].strip() if len(partition) <= 2: self.log("bad partition details for %s" % node) continue nodes = partition.split() nodes.sort() partition = ' '.join(nodes) found = 0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug("Adding partition from %s: %s" % (node, partition)) ccm_partitions.append(partition) else: self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) self.debug("Found partitions: %r" % ccm_partitions) return ccm_partitions def has_quorum(self, node_list): """Return whether or not the cluster has quorum.""" # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.env["nodes"] for node in node_list: if self.expected_status[node] != "up": continue (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) quorum = quorum[0].strip() if quorum.find("1") != -1: return True if quorum.find("0") != -1: return False self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum)) return False @property def components(self): """ Return a list of all patterns that should be ignored for the cluster's components. This must be provided by all subclasses. """ raise NotImplementedError def in_standby_mode(self, node): """Return whether or not the node is in Standby.""" (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) if not out: return False out = out[0].strip() self.debug("Standby result: %s" % out) return out == "on" def set_standby_mode(self, node, status): """ Set node to Standby if status is True, or Active if status is False. Return whether the node is now in the requested status. """ current_status = self.in_standby_mode(node) if current_status == status: return True if status: cmd = self.templates["StandbyCmd"] % (node, "on") else: cmd = self.templates["StandbyCmd"] % (node, "off") (rc, _) = self.rsh(node, cmd) return rc == 0 def add_dummy_rsc(self, node, rid): """Add a dummy resource with the given ID to the given node.""" rsc_xml = """ ' '""" % (rid, rid) constraint_xml = """ ' ' """ % (rid, node, node, rid) self.rsh(node, self.templates['CibAddXml'] % rsc_xml) self.rsh(node, self.templates['CibAddXml'] % constraint_xml) def remove_dummy_rsc(self, node, rid): """Remove the previously added dummy resource given by rid on the given node.""" constraint = "\"//rsc_location[@rsc='%s']\"" % rid rsc = "\"//primitive[@id='%s']\"" % rid self.rsh(node, self.templates['CibDelXpath'] % constraint) self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index 4ce1359417..82d5912108 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,638 +1,638 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory"] __copyright__ = "Copyright 2014-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse import os import random import socket import sys import time from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). It's possible we could fix this with type annotations, # but those were introduced with python 3.5 and we only support python 3.4. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} self._nodes = [] # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["IPagent"] = "IPaddr2" self["DoFencing"] = True self["ClobberCIB"] = False self["CIBfilename"] = None self["CIBResource"] = False - self["LogWatcher"] = LogKind.ANY + self["log_kind"] = LogKind.ANY self["node-limit"] = 0 self["scenario"] = "random" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._target = "localhost" self._seed_random() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def _seed_random(self, seed=None): """ Initialize the random number generator. Arguments: seed -- Use this to see the random number generator, or use the current time if None. """ if not seed: seed = int(time.time()) self["RandSeed"] = seed self.random_gen.seed(str(seed)) def dump(self): """Print the current environment.""" keys = [] for key in list(self.data.keys()): keys.append(key) keys.sort() for key in keys: s = "Environment[%s]" % key self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key]))) def keys(self): """Return a list of all environment keys stored in this instance.""" return list(self.data.keys()) def __contains__(self, key): """Return True if the given key exists in the environment.""" if key == "nodes": return True return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" if str(key) == "0": raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead") if key == "nodes": return self._nodes if key == "Name": return self._get_stack_short() return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "Stack": self._set_stack(value) elif key == "node-limit": self.data[key] = value self._filter_nodes() elif key == "nodes": self._nodes = [] for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: n = node.strip() socket.gethostbyname_ex(n) self._nodes.append(n) except socket.herror: self._logger.log("%s not found in DNS... aborting" % node) raise self._filter_nodes() else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def get(self, key, default=None): """Return the value for key if key is in the environment, else default.""" if key == "nodes": return self._nodes return self.data.get(key, default) def _set_stack(self, name): """Normalize the given cluster stack name.""" if name in ["corosync", "cs", "mcp"]: self.data["Stack"] = "corosync 2+" else: raise ValueError("Unknown stack: %s" % name) def _get_stack_short(self): """Return the short name for the currently set cluster stack.""" if "Stack" not in self.data: return "unknown" if self.data["Stack"] == "corosync 2+": return "crm-corosync" LogFactory().log("Unknown stack: %s" % self["stack"]) raise ValueError("Unknown stack: %s" % self["stack"]) def _detect_systemd(self): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self): """Detect the syslog variant in use on the target node.""" if "syslogd" not in self.data: if self["have_systemd"]: # Systemd (_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) self["syslogd"] = lines[0].strip() else: # SYS-V (_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) self["syslogd"] = lines[0].strip() if "syslogd" not in self.data or not self["syslogd"]: # default self["syslogd"] = "rsyslog" def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl disable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s off" % service) return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl enable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s on" % service) return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service) return rc == 0 # SYS-V (rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service) return rc == 0 def _detect_at_boot(self): """Detect if the cluster starts at boot.""" if "at-boot" not in self.data: self["at-boot"] = self.service_is_enabled(self._target, "corosync") \ or self.service_is_enabled(self._target, "pacemaker") def _detect_ip_offset(self): """Detect the offset for IPaddr resources.""" if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) return # pylint thinks self["IPBase"] is a list, not a string, which causes it # to error out because a list doesn't have split(). # pylint: disable=no-member if int(self["IPBase"].split('.')[3]) >= 240: self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s" % (self["IPBase"], self["IPBase"].split('.')[3])) self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) def _filter_nodes(self): """ Filter the list of cluster nodes. If --limit-nodes is given, keep that many nodes from the front of the list of cluster nodes and drop the rest. """ if self["node-limit"] > 0: if len(self["nodes"]) > self["node-limit"]: # pylint thinks self["node-limit"] is a list even though we initialize # it as an int in __init__ and treat it as an int everywhere. # pylint: disable=bad-string-format-type self._logger.log("Limiting the number of nodes configured=%d (max=%d)" % (len(self["nodes"]), self["node-limit"])) while len(self["nodes"]) > self["node-limit"]: self["nodes"].pop(len(self["nodes"]) - 1) def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" self._target = random.Random().choice(self["nodes"]) exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser self._detect_systemd() self._detect_syslog() self._detect_at_boot() self._detect_ip_offset() def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0]) grp1 = parser.add_argument_group("Common options") grp1.add_argument("-g", "--dsh-group", "--group", metavar="GROUP", dest="group", help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)") grp1.add_argument("-l", "--limit-nodes", type=int, default=0, metavar="MAX", help="Only use the first MAX cluster nodes supplied with --nodes") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", metavar="NODES", help="List of cluster nodes separated by whitespace") grp1.add_argument("--stack", default="corosync", metavar="STACK", help="Which cluster stack is installed") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes") grp2.add_argument("--at-boot", "--cluster-starts-at-boot", choices=["1", "0", "yes", "no"], help="Does the cluster software start at boot time?") grp2.add_argument("--facility", "--syslog-facility", default="daemon", metavar="NAME", help="Which syslog facility to log to") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named test") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--experimental-tests", action="store_true", help="Include experimental tests") grp4.add_argument("--loop-minutes", type=int, default=60, help="") grp4.add_argument("--no-loop-tests", action="store_true", help="Don't run looping/time-based tests") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--oprofile", metavar="NODES", help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--qarsh", action="store_true", help="Use QARSH to access nodes instead of SSH") grp4.add_argument("--schema", metavar="SCHEMA", default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION, help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--set", action="append", metavar="ARG", default=[], help="Set key=value pairs (can be specified multiple times)") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--valgrind-procs", metavar="PROCS", default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd", help="Run valgrind against the given space-separated list of processes") grp4.add_argument("--valgrind-tests", action="store_true", help="Include tests using valgrind") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. They get a default from the add_argument # calls, only do one thing, and they do not have any side effects. self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["Stack"] = args.stack self["SyslogFacility"] = args.facility self["TruncateLog"] = args.truncate self["at-boot"] = args.at_boot in ["1", "yes"] self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["experimental-tests"] = args.experimental_tests self["iterations"] = args.iterations self["loop-minutes"] = args.loop_minutes self["loop-tests"] = not args.no_loop_tests self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["node-limit"] = args.limit_nodes self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["valgrind-procs"] = args.valgrind_procs self["valgrind-tests"] = args.valgrind_tests self["warn-inactive"] = args.warn_inactive # Nodes and groups are mutually exclusive, so their defaults cannot be # set in their add_argument calls. Additionally, groups does more than # just set a value. Here, set nodes first and then if a group is # specified, override the previous nodes value. if args.nodes: self["nodes"] = args.nodes.split(" ") else: self["nodes"] = [] if args.group: self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group) LogFactory().add_file(self["OutputFile"], "CTS") dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group) if os.path.isfile(dsh_file): self["nodes"] = [] with open(dsh_file, "r", encoding="utf-8") as f: for line in f: stripped = line.strip() if not stripped.startswith('#'): self["nodes"].append(stripped) else: print("Unknown DSH group: %s" % args.dsh_group) # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.cib_filename: self["CIBfilename"] = args.cib_filename else: self["CIBfilename"] = None if args.choose: self["scenario"] = "sequence" self["tests"].append(args.choose) if args.fencing: if args.fencing in ["0", "no"]: self["DoFencing"] = False else: self["DoFencing"] = True if args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % ( os.environ['OS_REGION_NAME'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL'], os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'] ) elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile - self["LogWatcher"] = LogKind.REMOTE_FILE + self["log_kind"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.oprofile: self["oprofile"] = args.oprofile.split(" ") else: self["oprofile"] = [] if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True if args.qarsh: self._rsh.enable_qarsh() for kv in args.set: (name, value) = kv.split("=") self[name] = value print("Setting %s = %s" % (name, value)) class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py index 24de347fb3..81e9e40d1a 100644 --- a/python/pacemaker/_cts/scenarios.py +++ b/python/pacemaker/_cts/scenarios.py @@ -1,424 +1,424 @@ """Test scenario classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence", ] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time from pacemaker._cts.audits import ClusterAudit from pacemaker._cts.input import should_continue from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.watcher import LogWatcher class ScenarioComponent: """ The base class for all scenario components. A scenario component is one single step in a scenario. Each component is basically just a setup and teardown method. """ def __init__(self, cm, env): """ Create a new ScenarioComponent instance. Arguments: cm -- A ClusterManager instance env -- An Environment instance """ # pylint: disable=invalid-name self._cm = cm self._env = env def is_applicable(self): """ Return True if this component is applicable in the given Environment. This method must be provided by all subclasses. """ raise NotImplementedError def setup(self): """ Set up the component, returning True on success. This method must be provided by all subclasses. """ raise NotImplementedError def teardown(self): """ Tear down the given component. This method must be provided by all subclasses. """ raise NotImplementedError class Scenario: """ The base class for scenarios. A scenario is an ordered list of ScenarioComponent objects. A scenario proceeds by setting up all its components in sequence, running a list of tests and audits, and then tearing down its components in reverse. """ def __init__(self, cm, components, audits, tests): """ Create a new Scenario instance. Arguments: cm -- A ClusterManager instance components -- A list of ScenarioComponents comprising this Scenario audits -- A list of ClusterAudits that will be performed as part of this Scenario tests -- A list of CTSTests that will be run """ # pylint: disable=invalid-name self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 } self.tests = tests self._audits = audits self._bad_news = None self._cm = cm self._components = components for comp in components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def is_applicable(self): """Return True if all ScenarioComponents are applicable.""" for comp in self._components: if not comp.is_applicable(): return False return True def setup(self): """ Set up the scenario, returning True on success. If setup fails at some point, tear down those components that did successfully set up. """ self._cm.prepare() self.audit() # Also detects remote/local log config self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) self.audit() self._cm.install_support() self._bad_news = LogWatcher(self._cm.env["LogFileName"], self._cm.templates.get_patterns("BadNews"), self._cm.env["nodes"], - self._cm.env["LogWatcher"], + self._cm.env["log_kind"], "BadNews", 0) self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self._components): if not self._components[j].setup(): # OOPS! We failed. Tear partial setups down. self.audit() self._cm.log("Tearing down partial setup") self.teardown(j) return False j += 1 self.audit() return True def teardown(self, n_components=None): """ Tear down the scenario in the reverse order it was set up. If n_components is not None, only tear down that many components. """ if not n_components: n_components = len(self._components) - 1 j = n_components while j >= 0: self._components[j].teardown() j -= 1 self.audit() self._cm.install_support("uninstall") def incr(self, name): """Increment the given stats key.""" if name not in self.stats: self.stats[name] = 0 self.stats[name] += 1 def run(self, iterations): """Run all tests in the scenario the given number of times.""" self._cm.oprofile_start() try: self._run_loop(iterations) self._cm.oprofile_stop() except: # noqa: E722 self._cm.oprofile_stop() raise def _run_loop(self, iterations): """Run all the tests the given number of times.""" raise NotImplementedError def run_test(self, test, testcount): """ Run the given test. testcount is the number of tests (including this one) that have been run across all iterations. """ nodechoice = self._cm.env.random_node() ret = True did_run = False self._cm.clear_instance_errors_to_ignore() choice = "(%s)" % nodechoice self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount)) starttime = test.set_timer() if not test.setup(nodechoice): self._cm.log("Setup failed") ret = False else: did_run = True ret = test(nodechoice) if not test.teardown(nodechoice): self._cm.log("Teardown failed") if not should_continue(self._cm.env): raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) ret = False stoptime = time.time() self._cm.oprofile_save(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if "min_time" not in test.stats: test.stats["elapsed_time"] = elapsed_time test.stats["min_time"] = test_time test.stats["max_time"] = test_time else: test.stats["elapsed_time"] += elapsed_time if test_time < test.stats["min_time"]: test.stats["min_time"] = test_time if test_time > test.stats["max_time"]: test.stats["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self._cm.statall() did_run = True # Force the test count to be incremented anyway so test extraction works self.audit(test.errors_to_ignore) return did_run def summarize(self): """Output scenario results.""" self._cm.log("****************") self._cm.log("Overall Results:%r" % self.stats) self._cm.log("****************") stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0, } self._cm.log("Test Summary") for test in self.tests: for key in stat_filter: stat_filter[key] = test.stats[key] name = "Test %s:" % test.name self._cm.log("{:<25} {!r}".format(name, stat_filter)) self._cm.debug("Detailed Results") for test in self.tests: name = "Test %s:" % test.name self._cm.debug("{:<25} {!r}".format(name, stat_filter)) self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, local_ignore=None): """ Perform all scenario audits and log results. If there are too many failures, prompt the user to confirm that the scenario should continue running. """ errcount = 0 ignorelist = ["CTS:"] if local_ignore: ignorelist.extend(local_ignore) ignorelist.extend(self._cm.errors_to_ignore) ignorelist.extend(self._cm.instance_errors_to_ignore) # This makes sure everything is stabilized before starting... failed = 0 for audit in self._audits: if not audit(): self._cm.log("Audit %s FAILED." % audit.name) failed += 1 else: self._cm.debug("Audit %s passed." % audit.name) while errcount < 1000: match = None if self._bad_news: match = self._bad_news.look(0) if match: add_err = True for ignore in ignorelist: if add_err and re.search(ignore, match): add_err = False if add_err: self._cm.log("BadNews: %s" % match) self.incr("BadNews") errcount += 1 else: break else: print("Big problems") if not should_continue(self._cm.env): self._cm.log("Shutting down.") self.summarize() self.teardown() raise ValueError("Looks like we hit a BadNews jackpot!") if self._bad_news: self._bad_news.end() return failed class AllOnce(Scenario): """Every Test Once.""" def _run_loop(self, iterations): testcount = 1 for test in self.tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): """Random Test Execution.""" def _run_loop(self, iterations): testcount = 1 while testcount <= iterations: test = self._cm.env.random_gen.choice(self.tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): """Named Tests in Sequence.""" def _run_loop(self, iterations): testcount = 1 while testcount <= iterations: for test in self.tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): """Start the Cluster.""" def _run_loop(self, iterations): return class BootCluster(ScenarioComponent): """ Start the cluster manager on all nodes. Wait for each to come up before starting in order to account for the possibility that a given node might have been rebooted or crashed beforehand. """ def is_applicable(self): """Return whether this scenario is applicable.""" return True def setup(self): """Set up the component, returning True on success.""" self._cm.prepare() # Clear out the cobwebs ;-) self._cm.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. self._cm.log("Starting Cluster Manager on all nodes.") return self._cm.startall(verbose=True, quick=True) def teardown(self): """Tear down the component.""" self._cm.log("Stopping Cluster Manager on all nodes") self._cm.stopall(verbose=True, force=False) class LeaveBooted(BootCluster): """Leave all nodes up when the scenario is complete.""" def teardown(self): """Tear down the component.""" self._cm.log("Leaving Cluster running on all nodes") diff --git a/python/pacemaker/_cts/tests/ctstest.py b/python/pacemaker/_cts/tests/ctstest.py index 4c372b14af..3ed4931fc1 100644 --- a/python/pacemaker/_cts/tests/ctstest.py +++ b/python/pacemaker/_cts/tests/ctstest.py @@ -1,246 +1,248 @@ """Base classes for CTS tests.""" __all__ = ["CTSTest"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.timer import Timer from pacemaker._cts.watcher import LogWatcher # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. class CTSTest: """ The base class for all cluster tests. This implements a basic set of properties and behaviors like setup, tear down, time keeping, and statistics tracking. It is up to specific tests to implement their own specialized behavior on top of this class. """ def __init__(self, cm): """ Create a new CTSTest instance. Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self.audits = [] self.name = None self.templates = PatternSelector(cm["Name"]) self.stats = { "auditfail": 0, "calls": 0, "failure": 0, "skipped": 0, "success": 0 } self._cm = cm self._env = EnvFactory().getInstance() self._r_o2cb = None self._r_ocfs2 = [] self._rsh = RemoteFactory().getInstance() self._logger = LogFactory() self._timers = {} self.benchmark = True # which tests to benchmark self.failed = False self.is_experimental = False self.is_loop = False self.is_unsafe = False self.is_valgrind = False self.passed = True def log(self, args): """Log a message.""" self._logger.log(args) def debug(self, args): """Log a debug message.""" self._logger.debug(args) def get_timer(self, key="test"): """Get the start time of the given timer.""" try: return self._timers[key].start_time except KeyError: return 0 def set_timer(self, key="test"): """Set the start time of the given timer to now, and return that time.""" if key not in self._timers: self._timers[key] = Timer(self._logger, self.name, key) self._timers[key].start() return self._timers[key].start_time def log_timer(self, key="test"): """Log the elapsed time of the given timer.""" if key not in self._timers: return elapsed = self._timers[key].elapsed self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed)) del self._timers[key] def incr(self, name): """Increment the given stats key.""" if name not in self.stats: self.stats[name] = 0 self.stats[name] += 1 # Reset the test passed boolean if name == "calls": self.passed = True def failure(self, reason="none"): """Increment the failure count, with an optional failure reason.""" self.passed = False self.incr("failure") self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason) return False def success(self): """Increment the success count.""" self.incr("success") return True def skipped(self): """Increment the skipped count.""" self.incr("skipped") return True def __call__(self, node): """Perform this test.""" raise NotImplementedError def audit(self): """Perform all the relevant audits (see ClusterAudit), returning whether or not they all passed.""" passed = True for audit in self.audits: if not audit(): self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name)) self.incr("auditfail") passed = False return passed def setup(self, node): """Set up this test.""" # node is used in subclasses # pylint: disable=unused-argument return self.success() def teardown(self, node): """Tear down this test.""" # node is used in subclasses # pylint: disable=unused-argument return self.success() def create_watch(self, patterns, timeout, name=None): """ Create a new LogWatcher object. This object can be used to search log files for matching patterns during this test's run. Arguments: patterns -- A list of regular expressions to match against the log timeout -- Default number of seconds to watch a log file at a time; this can be overridden by the timeout= parameter to self.look on an as-needed basis name -- A unique name to use when logging about this watch """ if not name: name = self.name - return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout) + return LogWatcher(self._env["LogFileName"], patterns, + self._env["nodes"], self._env["log_kind"], name, + timeout) def local_badnews(self, prefix, watch, local_ignore=None): """ Search through log files for messages. Arguments: prefix -- The string to look for at the beginning of lines, or "LocalBadNews:" if None. watch -- The LogWatcher object to use for searching. local_ignore -- A list of regexes that, if found in a line, will cause that line to be ignored. Return the number of matches found. """ errcount = 0 if not prefix: prefix = "LocalBadNews:" ignorelist = [" CTS: ", prefix] if local_ignore: ignorelist += local_ignore while errcount < 100: match = watch.look(0) if match: add_err = True for ignore in ignorelist: if add_err and re.search(ignore, match): add_err = False if add_err: self._logger.log("%s %s" % (prefix, match)) errcount += 1 else: break else: self._logger.log("Too many errors!") watch.end() return errcount def is_applicable(self): """ Return True if this test is applicable in the current test configuration. This method must be implemented by all subclasses. """ if self.is_loop and not self._env["loop-tests"]: return False if self.is_unsafe and not self._env["unsafe-tests"]: return False if self.is_valgrind and not self._env["valgrind-tests"]: return False if self.is_experimental and not self._env["experimental-tests"]: return False if self._env["benchmark"] and not self.benchmark: return False return True @property def errors_to_ignore(self): """Return a list of errors which should be ignored.""" return []