diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py index 121ce778e2..73d28307f3 100644 --- a/python/pacemaker/_cts/audits.py +++ b/python/pacemaker/_cts/audits.py @@ -1,1022 +1,1022 @@ """Auditing classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time import uuid from pacemaker.buildoptions import BuildOptions from pacemaker._cts.input import should_continue from pacemaker._cts.watcher import LogKind, LogWatcher class ClusterAudit: """ The base class for various kinds of auditors. Specific audit implementations should be built on top of this one. Audits can do all kinds of checks on the system. The basic interface for callers is the `__call__` method, which returns True if the audit passes and False if it fails. """ def __init__(self, cm): """ Create a new ClusterAudit instance. Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.name = None def __call__(self): """Perform the audit action.""" raise NotImplementedError def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. This method must be implemented by all subclasses. """ raise NotImplementedError def log(self, args): """Log a message.""" self._cm.log("audit: %s" % args) def debug(self, args): """Log a debug message.""" self._cm.debug("audit: %s" % args) class LogAudit(ClusterAudit): """ Audit each cluster node to verify that some logging system is usable. This is done by logging a unique test message and then verifying that we can read back that test message using logging tools. """ def __init__(self, cm): """ Create a new LogAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "LogAudit" def _restart_cluster_logging(self, nodes=None): """Restart logging on the given nodes, or all if none are given.""" if not nodes: nodes = self._cm.env["nodes"] self._cm.debug("Restarting logging on: %r" % nodes) for node in nodes: if self._cm.env["have_systemd"]: (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket") if rc != 0: self._cm.log("ERROR: Cannot stop 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service") if rc != 0: self._cm.log("ERROR: Cannot start 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"]) if rc != 0: self._cm.log("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node)) def _create_watcher(self, patterns, kind): """Create a new LogWatcher instance for the given patterns.""" watch = LogWatcher(self._cm.env["LogFileName"], patterns, self._cm.env["nodes"], kind, "LogAudit", 5, silent=True) watch.set_watch() return watch def _test_logging(self): """Perform the log audit.""" patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self._cm.env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) watch_pref = self._cm.env["LogWatcher"] if watch_pref == LogKind.ANY: kinds = [LogKind.FILE] if self._cm.env["have_systemd"]: kinds += [LogKind.JOURNAL] kinds += [LogKind.REMOTE_FILE] for k in kinds: watch[k] = self._create_watcher(patterns, k) self._cm.log("Logging test message with identifier %s" % suffix) else: watch[watch_pref] = self._create_watcher(patterns, watch_pref) for node in self._cm.env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix) (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0) if rc != 0: self._cm.log("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in list(watch.keys()): w = watch[k] if watch_pref == LogKind.ANY: self._cm.log("Checking for test message in %s logs" % k) w.look_for_all(silent=True) if w.unmatched: for regex in w.unmatched: self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind)) else: if watch_pref == LogKind.ANY: self._cm.log("Found test message in %s logs" % k) self._cm.env["LogWatcher"] = k return 1 return False def __call__(self): """Perform the audit action.""" max_attempts = 3 attempt = 0 self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) while attempt <= max_attempts and not self._test_logging(): attempt += 1 self._restart_cluster_logging() - time.sleep(60*attempt) + time.sleep(60 * attempt) if attempt > max_attempts: self._cm.log("ERROR: Cluster logging unrecoverable.") return False return True def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" if self._cm.env["LogAuditDisabled"]: return False return True class DiskAudit(ClusterAudit): """ Audit disk usage on cluster nodes. Verify that there is enough free space left on whichever mounted file system holds the logs. Warn on: less than 100 MB or 10% of free space Error on: less than 10 MB or 5% of free space """ def __init__(self, cm): """ Create a new DiskAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "DiskspaceAudit" def __call__(self): """Perform the audit action.""" result = True # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) for node in self._cm.env["nodes"]: (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1) if not dfout: self._cm.log("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) continue dfout = dfout[0].strip() try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = False if not should_continue(self._cm.env): raise ValueError("Disk full on %s" % node) elif remaining_mb < 100 or used_percent > 90: self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" return True class FileAudit(ClusterAudit): """ Audit the filesystem looking for various failure conditions. Check for: * The presence of core dumps from corosync or Pacemaker daemons * Stale IPC files """ def __init__(self, cm): """ Create a new FileAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.known = [] self.name = "FileAudit" def __call__(self): """Perform the audit action.""" result = True self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) for node in self._cm.env["nodes"]: (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Corosync core file on %s: %s" % (node, line)) if self._cm.expected_status.get(node) == "down": clean = False (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) for line in lsout: result = False clean = True self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) for line in lsout: self._cm.debug("ps[%s]: %s" % (node, line)) self._cm.rsh(node, "rm -rf /dev/shm/qb-*") else: self._cm.debug("Skipping %s" % node) return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" return True class AuditResource: """A base class for storing information about a cluster resource.""" def __init__(self, cm, line): """ Create a new AuditResource instance. Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single resource """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None @property def unique(self): """Return True if this resource is unique.""" return self.flags & 0x20 @property def orphan(self): """Return True if this resource is an orphan.""" return self.flags & 0x01 @property def managed(self): """Return True if this resource is managed by the cluster.""" return self.flags & 0x02 class AuditConstraint: """A base class for storing information about a cluster constraint.""" def __init__(self, cm, line): """ Create a new AuditConstraint instance. Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single constraint """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): """ Audit primitive resources to verify a variety of conditions. Check that: * Resources are active and managed only when expected * Resources are active on the expected cluster node * Resources are not orphaned """ def __init__(self, cm): """ Create a new PrimitiveAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PrimitiveAudit" self._active_nodes = [] self._constraints = [] self._inactive_nodes = [] self._resources = [] self._target = None def _audit_resource(self, resource, quorum): """Perform the audit of a single resource.""" rc = True active = self._cm.resource_location(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %r" % (resource.id, active)) elif resource.needs_quorum == 1: self._cm.log("Resource %s active without quorum: %r" % (resource.id, active)) rc = False elif not resource.managed: self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active)) elif not resource.unique: # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %r" % (resource.id, active)) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active)) rc = False elif resource.orphan: self.debug("Resource %s is an inactive orphan" % resource.id) elif not self._inactive_nodes: self._cm.log("WARN: Resource %s not served anywhere" % resource.id) rc = False elif self._cm.env["warn-inactive"]: if quorum or not resource.needs_quorum: self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) else: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) return rc def _setup(self): """ Verify cluster nodes are active. Collect resource and colocation information used for performing the audit. """ for node in self._cm.env["nodes"]: if self._cm.expected_status[node] == "up": self._active_nodes.append(node) else: self._inactive_nodes.append(node) for node in self._cm.env["nodes"]: if self._target is None and self._cm.expected_status[node] == "up": self._target = node if not self._target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name) return False (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): self._resources.append(AuditResource(self._cm, line)) elif re.search("^Constraint", line): self._constraints.append(AuditConstraint(self._cm, line)) else: self._cm.log("Unknown entry: %s" % line) return True def __call__(self): """Perform the audit action.""" result = True if not self._setup(): return result quorum = self._cm.has_quorum(None) for resource in self._resources: if resource.type == "primitive" and not self._audit_resource(resource, quorum): result = False return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class GroupAudit(PrimitiveAudit): """ Audit group resources. Check that: * Each of its child primitive resources is active on the expected cluster node """ def __init__(self, cm): """ Create a new GroupAudit instance. Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "GroupAudit" def __call__(self): result = True if not self._setup(): return result for group in self._resources: if group.type != "group": continue first_match = True group_location = None for child in self._resources: if child.parent != group.id: continue nodes = self._cm.resource_location(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = False if len(nodes) > 1: result = False self._cm.log("Child %s of %s is active more than once: %r" % (child.id, group.id, nodes)) elif not nodes: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: result = False self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return result class CloneAudit(PrimitiveAudit): """ Audit clone resources. NOTE: Currently, this class does not perform any actual audit functions. """ def __init__(self, cm): """ Create a new CloneAudit instance. Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "CloneAudit" def __call__(self): result = True if not self._setup(): return result for clone in self._resources: if clone.type != "clone": continue for child in self._resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return result class ColocationAudit(PrimitiveAudit): """ Audit cluster resources. Check that: * Resources are colocated with the expected resource """ def __init__(self, cm): """ Create a new ColocationAudit instance. Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "ColocationAudit" def _crm_location(self, resource): """Return a list of cluster nodes where a given resource is running.""" (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): result = True if not self._setup(): return result for coloc in self._constraints: if coloc.type != "rsc_colocation": continue source = self._crm_location(coloc.rsc) target = self._crm_location(coloc.target) if not source: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: result = False self._cm.log("Colocation audit (%s): %s running on %s (not in %r)" % (coloc.id, coloc.rsc, node, target)) else: self.debug("Colocation audit (%s): %s running on %s (in %r)" % (coloc.id, coloc.rsc, node, target)) return result class ControllerStateAudit(ClusterAudit): """Verify active and inactive resources.""" def __init__(self, cm): """ Create a new ControllerStateAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "ControllerStateAudit" def __call__(self): result = True up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self._cm.env["nodes"]: should_be = self._cm.expected_status[node] rc = self._cm.test_node_cm(node) if rc > 0: if should_be == "down": down_are_up += 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down += 1 if len(unstable_list) > 0: result = False self._cm.log("Cluster is not stable: %d (of %d): %r" % (len(unstable_list), self._cm.upcount(), unstable_list)) if up_are_down > 0: result = False self._cm.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self._cm.env["nodes"]))) if down_are_up > 0: result = False self._cm.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self._cm.env["nodes"]))) return result def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class CIBAudit(ClusterAudit): """Audit the CIB by verifying that it is identical across cluster nodes.""" def __init__(self, cm): """ Create a new CIBAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "CibAudit" def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: self.debug("\tNo partitions to audit") return result for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) if self._audit_cib_contents(partition) == 0: result = False return result def _audit_cib_contents(self, hostlist): """Perform the CIB audit on the given hosts.""" passed = True node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self._store_remote_cib(node, node0) if node_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node) passed = False elif node0 is None: node0 = node node0_xml = node_xml elif node0_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node0) passed = False else: (rc, result) = self._cm.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) if rc != 0: self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = False for line in result: if not re.search("", line): passed = False self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) return passed def _store_remote_cib(self, node, target): """ Store a copy of the given node's CIB on the given target node. If no target is given, store the CIB on the given node. """ filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1) if rc != 0: self._cm.log("Could not retrieve configuration") return None self._cm.rsh("localhost", "rm -f %s" % filename) for line in lines: self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self._cm.log("Could not store configuration") return None return filename def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class PartitionAudit(ClusterAudit): """ Audit each partition in a cluster to verify a variety of conditions. Check that: * The number of partitions and the nodes in each is as expected * Each node is active when it should be active and inactive when it should be inactive * The status and epoch of each node is as expected * A partition has quorum * A partition has a DC when expected """ def __init__(self, cm): """ Create a new PartitionAudit instance. Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PartitionAudit" self._node_epoch = {} self._node_state = {} self._node_quorum = {} def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: return result self._cm.cluster_stable(double_check=True) if len(ccm_partitions) != self._cm.partitions_expected: self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) result = False for partition in ccm_partitions: self._cm.log("\t %s" % partition) for partition in ccm_partitions: if self._audit_partition(partition) == 0: result = False return result def _trim_string(self, avalue): """Remove the last character from a multi-character string.""" if not avalue: return None if len(avalue) > 1: return avalue[:-1] return avalue def _trim2int(self, avalue): """Remove the last character from a multi-character string and convert the result to an int.""" trimmed = self._trim_string(avalue) if trimmed: return int(trimmed) return None def _audit_partition(self, partition): """Perform the audit of a single partition.""" passed = True dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % partition) for node in node_list: if self._cm.expected_status[node] != "up": self._cm.log("Warn: Node %s appeared out of nowhere" % node) self._cm.expected_status[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1) self._node_state[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1) self._node_epoch[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1) self._node_quorum[node] = out[0].strip() self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node])) self._node_state[node] = self._trim_string(self._node_state[node]) self._node_epoch[node] = self._trim2int(self._node_epoch[node]) self._node_quorum[node] = self._trim_string(self._node_quorum[node]) if not self._node_epoch[node]: self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node) self._cm.expected_status[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch: lowest_epoch = self._node_epoch[node] if not lowest_epoch: self._cm.log("Lowest epoch not determined in %s" % partition) passed = False for node in node_list: if self._cm.expected_status[node] != "up": continue if self._cm.is_node_dc(node, self._node_state[node]): dc_found.append(node) if self._node_epoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self._node_epoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self._cm.log("DC %s is not the oldest node (%d vs. %d)" % (node, self._node_epoch[node], lowest_epoch)) passed = False if not dc_found: self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self._cm.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = False if not passed: for node in node_list: if self._cm.expected_status[node] == "up": self._cm.log("epoch %s : %s" % (self._node_epoch[node], self._node_state[node])) return passed def is_applicable(self): """Return True if this audit is applicable in the current test configuration.""" # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False # pylint: disable=invalid-name def audit_list(cm): """Return a list of instances of applicable audits that can be performed.""" result = [] for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit, PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit, ColocationAudit, CIBAudit]: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/python/pacemaker/_cts/cib.py b/python/pacemaker/_cts/cib.py index 8afe906bc5..bb3307726a 100644 --- a/python/pacemaker/_cts/cib.py +++ b/python/pacemaker/_cts/cib.py @@ -1,410 +1,410 @@ """CIB generator for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["ConfigFactory"] __copyright__ = "Copyright 2008-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import warnings import tempfile from pacemaker.buildoptions import BuildOptions from pacemaker._cts.cibxml import Alerts, Clone, Expression, FencingTopology, Group, Nodes, OpDefaults, Option, Resource, Rule from pacemaker._cts.network import next_ip class CIB: """A class for generating, representing, and installing a CIB file onto cluster nodes.""" def __init__(self, cm, version, factory, tmpfile=None): """ Create a new CIB instance. Arguments: cm -- A ClusterManager instance version -- The schema syntax version factory -- A ConfigFactory instance tmpfile -- Where to store the CIB, or None to use a new tempfile """ # pylint: disable=invalid-name self._cib = None self._cm = cm self._counter = 1 self._factory = factory self._num_nodes = 0 self.version = version if not tmpfile: warnings.filterwarnings("ignore") # pylint: disable=consider-using-with f = tempfile.NamedTemporaryFile(delete=True) f.close() tmpfile = f.name warnings.resetwarnings() self._factory.tmpfile = tmpfile def _show(self): """Query a cluster node for its generated CIB; log and return the result.""" output = "" (_, result) = self._factory.rsh(self._factory.target, "HOME=/root CIB_file=%s cibadmin -Ql" % self._factory.tmpfile, verbose=1) for line in result: output += line self._factory.debug("Generated Config: %s" % line) return output def new_ip(self, name=None): """Generate an IP resource for the next available IP address, optionally specifying the resource's name.""" if self._cm.env["IPagent"] == "IPaddr2": ip = next_ip(self._cm.env["IPBase"]) if not name: if ":" in ip: (_, _, suffix) = ip.rpartition(":") name = "r%s" % suffix else: name = "r%s" % ip r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf") r["ip"] = ip if ":" in ip: r["cidr_netmask"] = "64" r["nic"] = "eth0" else: r["cidr_netmask"] = "32" else: if not name: name = "r%s%d" % (self._cm.env["IPagent"], self._counter) self._counter += 1 r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf") r.add_op("monitor", "5s") return r def get_node_id(self, node_name): """Check the cluster configuration for the node ID for the given node_name.""" # We can't account for every possible configuration, # so we only return a node ID if: # * The node is specified in /etc/corosync/corosync.conf # with "ring0_addr:" equal to node_name and "nodeid:" # explicitly specified. # In all other cases, we return 0. node_id = 0 # awkward command: use } as record separator # so each corosync.conf "object" is one record; # match the "node {" record that has "ring0_addr: node_name"; # then print the substring of that record after "nodeid:" awk = r"""awk -v RS="}" """ \ r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/""" \ r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s""" \ % (node_name, BuildOptions.COROSYNC_CONFIG_FILE) (rc, output) = self._factory.rsh(self._factory.target, awk, verbose=1) if rc == 0 and len(output) == 1: try: node_id = int(output[0]) except ValueError: node_id = 0 return node_id def install(self, target): """Generate a CIB file and install it to the given cluster node.""" old = self._factory.tmpfile # Force a rebuild self._cib = None self._factory.tmpfile = "%s/cib.xml" % BuildOptions.CIB_DIR self.contents(target) self._factory.rsh(self._factory.target, "chown %s %s" % (BuildOptions.DAEMON_USER, self._factory.tmpfile)) self._factory.tmpfile = old def contents(self, target): """Generate a complete CIB file.""" # fencing resource if self._cib: return self._cib if target: self._factory.target = target self._factory.rsh(self._factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self._factory.tmpfile)) self._num_nodes = len(self._cm.env["nodes"]) no_quorum = "stop" if self._num_nodes < 3: no_quorum = "ignore" self._factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self._num_nodes) # We don't need a nodes section unless we add attributes stn = None # Fencing resource # Define first so that the shell doesn't reject every update if self._cm.env["DoFencing"]: # Define the "real" fencing device st = Resource(self._factory, "Fencing", self._cm.env["stonith-type"], "stonith") # Set a threshold for unreliable stonith devices such as the vmware one st.add_meta("migration-threshold", "5") st.add_op("monitor", "120s", timeout="120s") st.add_op("stop", "0", timeout="60s") st.add_op("start", "0", timeout="60s") # For remote node tests, a cluster node is stopped and brought back up # as a remote node with the name "remote-OLDNAME". To allow fencing # devices to fence these nodes, create a list of all possible node names. - all_node_names = [prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-')] + all_node_names = [prefix + n for n in self._cm.env["nodes"] for prefix in ('', 'remote-')] # Add all parameters specified by user entries = self._cm.env["stonith-params"].split(',') for entry in entries: try: (name, value) = entry.split('=', 1) except ValueError: print("Warning: skipping invalid fencing parameter: %s" % entry) continue # Allow user to specify "all" as the node list, and expand it here if name in ["hostlist", "pcmk_host_list"] and value == "all": value = ' '.join(all_node_names) st[name] = value st.commit() # Test advanced fencing logic stf_nodes = [] stt_nodes = [] attr_nodes = {} # Create the levels stl = FencingTopology(self._factory) for node in self._cm.env["nodes"]: # Remote node tests will rename the node remote_node = "remote-%s" % node # Randomly assign node to a fencing method ftype = self._cm.env.random_gen.choice(["levels-and", "levels-or ", "broadcast "]) # For levels-and, randomly choose targeting by node name or attribute by = "" if ftype == "levels-and": node_id = self.get_node_id(node) if node_id == 0 or self._cm.env.random_gen.choice([True, False]): by = " (by name)" else: attr_nodes[node] = node_id by = " (by attribute)" self._cm.log(" - Using %s fencing for node: %s%s" % (ftype, node, by)) if ftype == "levels-and": # If targeting by name, add a topology level for this node if node not in attr_nodes: stl.level(1, node, "FencingPass,Fencing") # Always target remote nodes by name, otherwise we would need to add # an attribute to the remote node only during remote tests (we don't # want nonexistent remote nodes showing up in the non-remote tests). # That complexity is not worth the effort. stl.level(1, remote_node, "FencingPass,Fencing") # Add the node (and its remote equivalent) to the list of levels-and nodes. stt_nodes.extend([node, remote_node]) elif ftype == "levels-or ": for n in [node, remote_node]: stl.level(1, n, "FencingFail") stl.level(2, n, "Fencing") stf_nodes.extend([node, remote_node]) # If any levels-and nodes were targeted by attribute, # create the attributes and a level for the attribute. if attr_nodes: stn = Nodes(self._factory) for (node_name, node_id) in attr_nodes.items(): stn.add_node(node_name, node_id, {"cts-fencing": "levels-and"}) stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and") # Create a Dummy agent that always passes for levels-and if stt_nodes: stt = Resource(self._factory, "FencingPass", "fence_dummy", "stonith") stt["pcmk_host_list"] = " ".join(stt_nodes) # Wait this many seconds before doing anything, handy for letting disks get flushed too stt["random_sleep_range"] = "30" stt["mode"] = "pass" stt.commit() # Create a Dummy agent that always fails for levels-or if stf_nodes: stf = Resource(self._factory, "FencingFail", "fence_dummy", "stonith") stf["pcmk_host_list"] = " ".join(stf_nodes) # Wait this many seconds before doing anything, handy for letting disks get flushed too stf["random_sleep_range"] = "30" stf["mode"] = "fail" stf.commit() # Now commit the levels themselves stl.commit() o = Option(self._factory) o["stonith-enabled"] = self._cm.env["DoFencing"] o["start-failure-is-fatal"] = "false" o["pe-input-series-max"] = "5000" o["shutdown-escalation"] = "5min" o["batch-limit"] = "10" o["dc-deadtime"] = "5s" o["no-quorum-policy"] = no_quorum o.commit() o = OpDefaults(self._factory) o["timeout"] = "90s" o.commit() # Commit the nodes section if we defined one if stn is not None: stn.commit() # Add an alerts section if possible if self._factory.rsh.exists_on_all(self._cm.env["notification-agent"], self._cm.env["nodes"]): alerts = Alerts(self._factory) alerts.add_alert(self._cm.env["notification-agent"], self._cm.env["notification-recipient"]) alerts.commit() # Add resources? if self._cm.env["CIBResource"]: self.add_resources() # generate cib self._cib = self._show() if self._factory.tmpfile != "%s/cib.xml" % BuildOptions.CIB_DIR: self._factory.rsh(self._factory.target, "rm -f %s" % self._factory.tmpfile) return self._cib def add_resources(self): """Add various resources and their constraints to the CIB.""" # Per-node resources for node in self._cm.env["nodes"]: name = "rsc_%s" % node r = self.new_ip(name) r.prefer(node, "100") r.commit() # Migrator # Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach m = Resource(self._factory, "migrator", "Dummy", "ocf", "pacemaker") m["passwd"] = "whatever" m.add_meta("resource-stickiness", "1") m.add_meta("allow-migrate", "1") m.add_op("monitor", "P10S") m.commit() # Ping the test exerciser p = Resource(self._factory, "ping-1", "ping", "ocf", "pacemaker") p.add_op("monitor", "60s") p["host_list"] = self._cm.env["cts-exerciser"] p["name"] = "connected" p["debug"] = "true" c = Clone(self._factory, "Connectivity", p) c["globally-unique"] = "false" c.commit() # promotable clone resource s = Resource(self._factory, "stateful-1", "Stateful", "ocf", "pacemaker") s.add_op("monitor", "15s", timeout="60s") s.add_op("monitor", "16s", timeout="60s", role="Promoted") ms = Clone(self._factory, "promotable-1", s) ms["promotable"] = "true" ms["clone-max"] = self._num_nodes ms["clone-node-max"] = 1 ms["promoted-max"] = 1 ms["promoted-node-max"] = 1 # Require connectivity to run the promotable clone r = Rule(self._factory, "connected", "-INFINITY", op="or") r.add_child(Expression(self._factory, "m1-connected-1", "connected", "lt", "1")) r.add_child(Expression(self._factory, "m1-connected-2", "connected", "not_defined", None)) ms.prefer("connected", rule=r) ms.commit() # Group Resource g = Group(self._factory, "group-1") g.add_child(self.new_ip()) if self._cm.env["have_systemd"]: sysd = Resource(self._factory, "petulant", "pacemaker-cts-dummyd@10", "service") sysd.add_op("monitor", "P10S") g.add_child(sysd) else: g.add_child(self.new_ip()) g.add_child(self.new_ip()) # Make group depend on the promotable clone g.after("promotable-1", first="promote", then="start") g.colocate("promotable-1", "INFINITY", withrole="Promoted") g.commit() # LSB resource lsb = Resource(self._factory, "lsb-dummy", "LSBDummy", "lsb") lsb.add_op("monitor", "5s") # LSB with group lsb.after("group-1") lsb.colocate("group-1") lsb.commit() class ConfigFactory: """Singleton to generate a CIB file for the environment's schema version.""" def __init__(self, cm): """ Create a new ConfigFactory instance. Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.rsh = self._cm.rsh if not self._cm.env["ListTests"]: self.target = self._cm.env["nodes"][0] self.tmpfile = None def log(self, args): """Log a message.""" self._cm.log("cib: %s" % args) def debug(self, args): """Log a debug message.""" self._cm.debug("cib: %s" % args) def create_config(self, name="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION): """Return a CIB object for the given schema version.""" return CIB(self._cm, name, self) diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index d983b9bbc6..9d19f79cc8 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,638 +1,638 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory"] __copyright__ = "Copyright 2014-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse import os import random import socket import sys import time from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). It's possible we could fix this with type annotations, # but those were introduced with python 3.5 and we only support python 3.4. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} self._nodes = [] # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["IPagent"] = "IPaddr2" self["DoFencing"] = True self["ClobberCIB"] = False self["CIBfilename"] = None self["CIBResource"] = False self["LogWatcher"] = LogKind.ANY self["node-limit"] = 0 self["scenario"] = "random" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._target = "localhost" self._seed_random() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def _seed_random(self, seed=None): """ Initialize the random number generator. Arguments: seed -- Use this to see the random number generator, or use the current time if None. """ if not seed: seed = int(time.time()) self["RandSeed"] = seed self.random_gen.seed(str(seed)) def dump(self): """Print the current environment.""" keys = [] for key in list(self.data.keys()): keys.append(key) keys.sort() for key in keys: s = "Environment[%s]" % key self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key]))) def keys(self): """Return a list of all environment keys stored in this instance.""" return list(self.data.keys()) def __contains__(self, key): """Return True if the given key exists in the environment.""" if key == "nodes": return True return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" if str(key) == "0": raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead") if key == "nodes": return self._nodes if key == "Name": return self._get_stack_short() return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "Stack": self._set_stack(value) elif key == "node-limit": self.data[key] = value self._filter_nodes() elif key == "nodes": self._nodes = [] for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: n = node.strip() socket.gethostbyname_ex(n) self._nodes.append(n) except: self._logger.log("%s not found in DNS... aborting" % node) raise self._filter_nodes() else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def get(self, key, default=None): """Return the value for key if key is in the environment, else default.""" if key == "nodes": return self._nodes return self.data.get(key, default) def _set_stack(self, name): """Normalize the given cluster stack name.""" if name in ["corosync", "cs", "mcp"]: self.data["Stack"] = "corosync 2+" else: raise ValueError("Unknown stack: %s" % name) def _get_stack_short(self): """Return the short name for the currently set cluster stack.""" if "Stack" not in self.data: return "unknown" if self.data["Stack"] == "corosync 2+": return "crm-corosync" LogFactory().log("Unknown stack: %s" % self["stack"]) raise ValueError("Unknown stack: %s" % self["stack"]) def _detect_systemd(self): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self): """Detect the syslog variant in use on the target node.""" if "syslogd" not in self.data: if self["have_systemd"]: # Systemd (_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) self["syslogd"] = lines[0].strip() else: # SYS-V (_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) self["syslogd"] = lines[0].strip() if "syslogd" not in self.data or not self["syslogd"]: # default self["syslogd"] = "rsyslog" def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl disable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s off" % service) return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl enable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s on" % service) return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service) return rc == 0 # SYS-V (rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service) return rc == 0 def _detect_at_boot(self): """Detect if the cluster starts at boot.""" if "at-boot" not in self.data: self["at-boot"] = self.service_is_enabled(self._target, "corosync") \ or self.service_is_enabled(self._target, "pacemaker") def _detect_ip_offset(self): """Detect the offset for IPaddr resources.""" if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) return # pylint thinks self["IPBase"] is a list, not a string, which causes it # to error out because a list doesn't have split(). # pylint: disable=no-member if int(self["IPBase"].split('.')[3]) >= 240: self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s" % (self["IPBase"], self["IPBase"].split('.')[3])) self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) def _filter_nodes(self): """ Filter the list of cluster nodes. If --limit-nodes is given, keep that many nodes from the front of the list of cluster nodes and drop the rest. """ if self["node-limit"] > 0: if len(self["nodes"]) > self["node-limit"]: # pylint thinks self["node-limit"] is a list even though we initialize # it as an int in __init__ and treat it as an int everywhere. # pylint: disable=bad-string-format-type self._logger.log("Limiting the number of nodes configured=%d (max=%d)" % (len(self["nodes"]), self["node-limit"])) while len(self["nodes"]) > self["node-limit"]: - self["nodes"].pop(len(self["nodes"])-1) + self["nodes"].pop(len(self["nodes"]) - 1) def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" self._target = random.Random().choice(self["nodes"]) exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser self._detect_systemd() self._detect_syslog() self._detect_at_boot() self._detect_ip_offset() def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0]) grp1 = parser.add_argument_group("Common options") grp1.add_argument("-g", "--dsh-group", "--group", metavar="GROUP", dest="group", help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)") grp1.add_argument("-l", "--limit-nodes", type=int, default=0, metavar="MAX", help="Only use the first MAX cluster nodes supplied with --nodes") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", metavar="NODES", help="List of cluster nodes separated by whitespace") grp1.add_argument("--stack", default="corosync", metavar="STACK", help="Which cluster stack is installed") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes") grp2.add_argument("--at-boot", "--cluster-starts-at-boot", choices=["1", "0", "yes", "no"], help="Does the cluster software start at boot time?") grp2.add_argument("--facility", "--syslog-facility", default="daemon", metavar="NAME", help="Which syslog facility to log to") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named test") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--experimental-tests", action="store_true", help="Include experimental tests") grp4.add_argument("--loop-minutes", type=int, default=60, help="") grp4.add_argument("--no-loop-tests", action="store_true", help="Don't run looping/time-based tests") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--oprofile", metavar="NODES", help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--qarsh", action="store_true", help="Use QARSH to access nodes instead of SSH") grp4.add_argument("--schema", metavar="SCHEMA", default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION, help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--set", action="append", metavar="ARG", default=[], help="Set key=value pairs (can be specified multiple times)") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--valgrind-procs", metavar="PROCS", default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd", help="Run valgrind against the given space-separated list of processes") grp4.add_argument("--valgrind-tests", action="store_true", help="Include tests using valgrind") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. They get a default from the add_argument # calls, only do one thing, and they do not have any side effects. self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["Stack"] = args.stack self["SyslogFacility"] = args.facility self["TruncateLog"] = args.truncate self["at-boot"] = args.at_boot in ["1", "yes"] self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["experimental-tests"] = args.experimental_tests self["iterations"] = args.iterations self["loop-minutes"] = args.loop_minutes self["loop-tests"] = not args.no_loop_tests self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["node-limit"] = args.limit_nodes self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["valgrind-procs"] = args.valgrind_procs self["valgrind-tests"] = args.valgrind_tests self["warn-inactive"] = args.warn_inactive # Nodes and groups are mutually exclusive, so their defaults cannot be # set in their add_argument calls. Additionally, groups does more than # just set a value. Here, set nodes first and then if a group is # specified, override the previous nodes value. if args.nodes: self["nodes"] = args.nodes.split(" ") else: self["nodes"] = [] if args.group: self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group) LogFactory().add_file(self["OutputFile"], "CTS") dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group) if os.path.isfile(dsh_file): self["nodes"] = [] with open(dsh_file, "r", encoding="utf-8") as f: for line in f: l = line.strip() if not l.startswith('#'): self["nodes"].append(l) else: print("Unknown DSH group: %s" % args.dsh_group) # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.cib_filename: self["CIBfilename"] = args.cib_filename else: self["CIBfilename"] = None if args.choose: self["scenario"] = "sequence" self["tests"].append(args.choose) if args.fencing: if args.fencing in ["0", "no"]: self["DoFencing"] = False else: self["DoFencing"] = True if args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % ( os.environ['OS_REGION_NAME'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL'], os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'] ) elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["LogWatcher"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.oprofile: self["oprofile"] = args.oprofile.split(" ") else: self["oprofile"] = [] if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True if args.qarsh: self._rsh.enable_qarsh() for kv in args.set: (name, value) = kv.split("=") self[name] = value print("Setting %s = %s" % (name, value)) class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py index 0da004ced1..d60d2492f1 100644 --- a/python/pacemaker/_cts/scenarios.py +++ b/python/pacemaker/_cts/scenarios.py @@ -1,424 +1,424 @@ """Test scenario classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence", ] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time from pacemaker._cts.audits import ClusterAudit from pacemaker._cts.input import should_continue from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.watcher import LogWatcher class ScenarioComponent: """ The base class for all scenario components. A scenario component is one single step in a scenario. Each component is basically just a setup and teardown method. """ def __init__(self, cm, env): """ Create a new ScenarioComponent instance. Arguments: cm -- A ClusterManager instance env -- An Environment instance """ # pylint: disable=invalid-name self._cm = cm self._env = env def is_applicable(self): """ Return True if this component is applicable in the given Environment. This method must be provided by all subclasses. """ raise NotImplementedError def setup(self): """ Set up the component, returning True on success. This method must be provided by all subclasses. """ raise NotImplementedError def teardown(self): """ Tear down the given component. This method must be provided by all subclasses. """ raise NotImplementedError class Scenario: """ The base class for scenarios. A scenario is an ordered list of ScenarioComponent objects. A scenario proceeds by setting up all its components in sequence, running a list of tests and audits, and then tearing down its components in reverse. """ def __init__(self, cm, components, audits, tests): """ Create a new Scenario instance. Arguments: cm -- A ClusterManager instance components -- A list of ScenarioComponents comprising this Scenario audits -- A list of ClusterAudits that will be performed as part of this Scenario tests -- A list of CTSTests that will be run """ # pylint: disable=invalid-name self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 } self.tests = tests self._audits = audits self._bad_news = None self._cm = cm self._components = components for comp in components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def is_applicable(self): """Return True if all ScenarioComponents are applicable.""" for comp in self._components: if not comp.is_applicable(): return False return True def setup(self): """ Set up the scenario, returning True on success. If setup fails at some point, tear down those components that did successfully set up. """ self._cm.prepare() self.audit() # Also detects remote/local log config self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) self.audit() self._cm.install_support() self._bad_news = LogWatcher(self._cm.env["LogFileName"], self._cm.templates.get_patterns("BadNews"), self._cm.env["nodes"], self._cm.env["LogWatcher"], "BadNews", 0) self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self._components): if not self._components[j].setup(): # OOPS! We failed. Tear partial setups down. self.audit() self._cm.log("Tearing down partial setup") self.teardown(j) return False j += 1 self.audit() return True def teardown(self, n_components=None): """ Tear down the scenario in the reverse order it was set up. If n_components is not None, only tear down that many components. """ if not n_components: - n_components = len(self._components)-1 + n_components = len(self._components) - 1 j = n_components while j >= 0: self._components[j].teardown() j -= 1 self.audit() self._cm.install_support("uninstall") def incr(self, name): """Increment the given stats key.""" if not name in self.stats: self.stats[name] = 0 self.stats[name] += 1 def run(self, iterations): """Run all tests in the scenario the given number of times.""" self._cm.oprofile_start() try: self._run_loop(iterations) self._cm.oprofile_stop() except: self._cm.oprofile_stop() raise def _run_loop(self, iterations): """Run all the tests the given number of times.""" raise NotImplementedError def run_test(self, test, testcount): """ Run the given test. testcount is the number of tests (including this one) that have been run across all iterations. """ nodechoice = self._cm.env.random_node() ret = True did_run = False self._cm.clear_instance_errors_to_ignore() choice = "(%s)" % nodechoice self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount)) starttime = test.set_timer() if not test.setup(nodechoice): self._cm.log("Setup failed") ret = False else: did_run = True ret = test(nodechoice) if not test.teardown(nodechoice): self._cm.log("Teardown failed") if not should_continue(self._cm.env): raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) ret = False stoptime = time.time() self._cm.oprofile_save(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if "min_time" not in test.stats: test.stats["elapsed_time"] = elapsed_time test.stats["min_time"] = test_time test.stats["max_time"] = test_time else: test.stats["elapsed_time"] += elapsed_time if test_time < test.stats["min_time"]: test.stats["min_time"] = test_time if test_time > test.stats["max_time"]: test.stats["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self._cm.statall() did_run = True # Force the test count to be incremented anyway so test extraction works self.audit(test.errors_to_ignore) return did_run def summarize(self): """Output scenario results.""" self._cm.log("****************") self._cm.log("Overall Results:%r" % self.stats) self._cm.log("****************") stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0, } self._cm.log("Test Summary") for test in self.tests: for key in stat_filter: stat_filter[key] = test.stats[key] name = "Test %s:" % test.name self._cm.log("{:<25} {!r}".format(name, stat_filter)) self._cm.debug("Detailed Results") for test in self.tests: name = "Test %s:" % test.name self._cm.debug("{:<25} {!r}".format(name, stat_filter)) self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, local_ignore=None): """ Perform all scenario audits and log results. If there are too many failures, prompt the user to confirm that the scenario should continue running. """ errcount = 0 ignorelist = ["CTS:"] if local_ignore: ignorelist.extend(local_ignore) ignorelist.extend(self._cm.errors_to_ignore) ignorelist.extend(self._cm.instance_errors_to_ignore) # This makes sure everything is stabilized before starting... failed = 0 for audit in self._audits: if not audit(): self._cm.log("Audit %s FAILED." % audit.name) failed += 1 else: self._cm.debug("Audit %s passed." % audit.name) while errcount < 1000: match = None if self._bad_news: match = self._bad_news.look(0) if match: add_err = True for ignore in ignorelist: if add_err and re.search(ignore, match): add_err = False if add_err: self._cm.log("BadNews: %s" % match) self.incr("BadNews") errcount += 1 else: break else: print("Big problems") if not should_continue(self._cm.env): self._cm.log("Shutting down.") self.summarize() self.teardown() raise ValueError("Looks like we hit a BadNews jackpot!") if self._bad_news: self._bad_news.end() return failed class AllOnce(Scenario): """Every Test Once.""" def _run_loop(self, iterations): testcount = 1 for test in self.tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): """Random Test Execution.""" def _run_loop(self, iterations): testcount = 1 while testcount <= iterations: test = self._cm.env.random_gen.choice(self.tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): """Named Tests in Sequence.""" def _run_loop(self, iterations): testcount = 1 while testcount <= iterations: for test in self.tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): """Start the Cluster.""" def _run_loop(self, iterations): return class BootCluster(ScenarioComponent): """ Start the cluster manager on all nodes. Wait for each to come up before starting in order to account for the possibility that a given node might have been rebooted or crashed beforehand. """ def is_applicable(self): """Return whether this scenario is applicable.""" return True def setup(self): """Set up the component, returning True on success.""" self._cm.prepare() # Clear out the cobwebs ;-) self._cm.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. self._cm.log("Starting Cluster Manager on all nodes.") return self._cm.startall(verbose=True, quick=True) def teardown(self): """Tear down the component.""" self._cm.log("Stopping Cluster Manager on all nodes") self._cm.stopall(verbose=True, force=False) class LeaveBooted(BootCluster): """Leave all nodes up when the scenario is complete.""" def teardown(self): """Tear down the component.""" self._cm.log("Leaving Cluster running on all nodes") diff --git a/python/pacemaker/_cts/test.py b/python/pacemaker/_cts/test.py index 4f94f50edd..0498060b11 100644 --- a/python/pacemaker/_cts/test.py +++ b/python/pacemaker/_cts/test.py @@ -1,594 +1,594 @@ """ A module providing base classes. These classes are used for defining regression tests and groups of regression tests. Everything exported here should be considered an abstract class that needs to be subclassed in order to do anything useful. Various functions will raise NotImplementedError if not overridden by a subclass. """ __copyright__ = "Copyright 2009-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+)" __all__ = ["Test", "Tests"] import io import os import re import shlex import signal import subprocess import sys import time from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError from pacemaker._cts.process import pipe_communicate from pacemaker.buildoptions import BuildOptions from pacemaker.exitstatus import ExitStatus def find_validator(rng_file): """ Return the command line used to validate XML output. If no validator is found, return None. """ if os.access("/usr/bin/xmllint", os.X_OK): if rng_file is None: return ["xmllint", "-"] return ["xmllint", "--relaxng", rng_file, "-"] return None def rng_directory(): """Return the directory containing RNG schema files.""" if "PCMK_schema_directory" in os.environ: return os.environ["PCMK_schema_directory"] if os.path.exists("%s/cts-fencing.in" % sys.path[0]): return "xml" return BuildOptions.SCHEMA_DIR class Pattern: """A class for checking log files for a given pattern.""" def __init__(self, pat, negative=False, regex=False): """ Create a new Pattern instance. Arguments: pat -- The string to search for negative -- If True, pat must not be found in any input regex -- If True, pat is a regex and not a substring """ self._pat = pat self.negative = negative self.regex = regex def __str__(self): return self._pat def match(self, line): """Return True if this pattern is found in the given line.""" if self.regex: return re.search(self._pat, line) is not None return self._pat in line class Test: """ The base class for a single regression test. A single regression test may still run multiple commands as part of its execution. """ def __init__(self, name, description, **kwargs): """ Create a new Test instance. This method must be provided by all subclasses, which must call Test.__init__ first. Arguments: description -- A user-readable description of the test, helpful in identifying what test is running or has failed. name -- The name of the test. Command line tools use this attribute to allow running only tests with the exact name, or tests whose name matches a given pattern. This should be unique among all tests. Keyword arguments: force_wait -- logdir -- The base directory under which to create a directory to store output and temporary data. timeout -- How long to wait for the test to complete. verbose -- Whether to print additional information, including verbose command output and daemon log files. """ self.description = description self.executed = False self.name = name self.force_wait = kwargs.get("force_wait", False) self.logdir = kwargs.get("logdir", "/tmp") self.timeout = kwargs.get("timeout", 2) self.verbose = kwargs.get("verbose", False) self._cmds = [] self._patterns = [] self._daemon_location = None self._daemon_output = "" self._daemon_process = None self._result_exitcode = ExitStatus.OK self._result_txt = "" ### ### PROPERTIES ### @property def exitcode(self): """ Return the final exitcode of the Test. If all commands pass, this property will be ExitStatus.OK. Otherwise, this property will be the exitcode of the first command to fail. """ return self._result_exitcode @exitcode.setter def exitcode(self, value): self._result_exitcode = value @property def logpath(self): """ Return the path to the log for whatever daemon is being tested. Note that this requires all subclasses to set self._daemon_location before accessing this property or an exception will be raised. """ return os.path.join(self.logdir, "%s.log" % self._daemon_location) ### ### PRIVATE METHODS ### def _kill_daemons(self): """Kill any running daemons in preparation for executing the test.""" raise NotImplementedError("_kill_daemons not provided by subclass") def _match_log_patterns(self): """ Check test output for expected patterns. Set self.exitcode and self._result_txt as appropriate. Not all subclass will need to do this. """ if len(self._patterns) == 0: return n_failed_matches = 0 n_negative_matches = 0 output = self._daemon_output.split("\n") for pat in self._patterns: positive_match = False for line in output: if pat.match(line): if pat.negative: n_negative_matches += 1 if self.verbose: print("This pattern should not have matched = '%s" % pat) break positive_match = True break if not pat.negative and not positive_match: n_failed_matches += 1 print("Pattern Not Matched = '%s'" % pat) if n_failed_matches > 0 or n_negative_matches > 0: msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches." self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches) self.exitcode = ExitStatus.ERROR def _new_cmd(self, cmd, args, exitcode, **kwargs): """ Add a command to be executed as part of this test. Arguments: cmd -- The program to run. args -- Commands line arguments to pass to cmd, as a string. exitcode -- The expected exit code of cmd. This can be used to run a command that is expected to fail. Keyword arguments: stdout_match -- If not None, a string that is expected to be present in the stdout of cmd. This can be a regular expression. no_wait -- Do not wait for cmd to complete. stdout_negative_match -- If not None, a string that is expected to be missing in the stdout of cmd. This can be a regualr expression. kill -- A command to be run after cmd, typically in order to kill a failed process. This should be the entire command line including arguments as a single string. validate -- If True, the output of cmd will be passed to xmllint for validation. If validation fails, XmlValidationError will be raised. check_rng -- If True and validate is True, command output will additionally be checked against the api-result.rng file. check_stderr -- If True, the stderr of cmd will be included in output. env -- If not None, variables to set in the environment """ self._cmds.append( { "args": args, "check_rng": kwargs.get("check_rng", True), "check_stderr": kwargs.get("check_stderr", True), "cmd": cmd, "expected_exitcode": exitcode, "kill": kwargs.get("kill", None), "no_wait": kwargs.get("no_wait", False), "stdout_match": kwargs.get("stdout_match", None), "stdout_negative_match": kwargs.get("stdout_negative_match", None), "validate": kwargs.get("validate", True), "env": kwargs.get("env", None), } ) def _start_daemons(self): """Start any necessary daemons in preparation for executing the test.""" raise NotImplementedError("_start_daemons not provided by subclass") ### ### PUBLIC METHODS ### def add_cmd(self, cmd, args, validate=True, check_rng=True, check_stderr=True, env=None): """Add a simple command to be executed as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, validate=validate, check_rng=check_rng, check_stderr=check_stderr, env=env) def add_cmd_and_kill(self, cmd, args, kill_proc): """Add a command and system command to be executed as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, kill=kill_proc) def add_cmd_check_stdout(self, cmd, args, match, no_match=None, env=None): """Add a simple command with expected output to be executed as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, stdout_match=match, stdout_negative_match=no_match, env=env) def add_cmd_expected_fail(self, cmd, args, exitcode=ExitStatus.ERROR): """Add a command that is expected to fail to be executed as part of this test.""" self._new_cmd(cmd, args, exitcode) def add_cmd_no_wait(self, cmd, args): """Add a simple command to be executed (without waiting) as part of this test.""" self._new_cmd(cmd, args, ExitStatus.OK, no_wait=True) def add_log_pattern(self, pattern, negative=False, regex=False): """Add a pattern that should appear in the test's logs.""" self._patterns.append(Pattern(pattern, negative=negative, regex=regex)) def _signal_dict(self): """Return a dictionary mapping signal numbers to their names.""" # FIXME: When we support python >= 3.5, this function can be replaced with: # signal.Signals(self.daemon_process.returncode).name return { getattr(signal, _signame): _signame for _signame in dir(signal) if _signame.startswith("SIG") and not _signame.startswith("SIG_") } def clean_environment(self): """Clean up the host after executing a test.""" if self._daemon_process: if self._daemon_process.poll() is None: self._daemon_process.terminate() self._daemon_process.wait() else: rc = self._daemon_process.returncode signame = self._signal_dict().get(-rc, "RET=%s" % rc) msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)." self._result_txt = msg % (self.name, self._daemon_location, signame) self.exitcode = ExitStatus.ERROR self._daemon_process = None self._daemon_output = "" # the default for utf-8 encoding would error out if e.g. memory corruption # makes fenced output any kind of 8 bit value - while still interesting # for debugging and we'd still like the regression-test to go over the # full set of test-cases with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile: for line in logfile.readlines(): self._daemon_output += line if self.verbose: print("Daemon Output Start") print(self._daemon_output) print("Daemon Output End") def print_result(self, filler): """Print the result of the last test execution.""" print("%s%s" % (filler, self._result_txt)) def run(self): """Execute this test.""" i = 1 self.start_environment() if self.verbose: print("\n--- START TEST - %s" % self.name) self._result_txt = "SUCCESS - '%s'" % (self.name) self.exitcode = ExitStatus.OK for cmd in self._cmds: try: self.run_cmd(cmd) except ExitCodeError as e: print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode'])) self.set_error(i, cmd) break except OutputNotFoundError as e: print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e)) self.set_error(i, cmd) break except OutputFoundError as e: print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e)) self.set_error(i, cmd) break except XmlValidationError as e: print("Step %d FAILED - xmllint failed: %s" % (i, e)) self.set_error(i, cmd) break if self.verbose: print("Step %d SUCCESS" % (i)) i += 1 self.clean_environment() if self.exitcode == ExitStatus.OK: self._match_log_patterns() print(self._result_txt) if self.verbose: print("--- END TEST - %s\n" % self.name) self.executed = True def run_cmd(self, args): """Execute a command as part of this test.""" cmd = shlex.split(args['args']) cmd.insert(0, args['cmd']) if self.verbose: print("\n\nRunning: %s" % " ".join(cmd)) # FIXME: Using "with" here breaks fencing merge tests. # pylint: disable=consider-using-with if args['env']: new_env = os.environ.copy() new_env.update(args['env']) test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=new_env) else: test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if args['kill']: if self.verbose: print("Also running: %s" % args['kill']) ### Typically, the kill argument is used to detect some sort of ### failure. Without yielding for a few seconds here, the process ### launched earlier that is listening for the failure may not have ### time to connect to pacemaker-execd. time.sleep(2) subprocess.Popen(shlex.split(args['kill'])) if not args['no_wait']: test.wait() else: return ExitStatus.OK output = pipe_communicate(test, check_stderr=args['check_stderr']) if self.verbose: print(output) if test.returncode != args['expected_exitcode']: raise ExitCodeError(test.returncode) if args['stdout_match'] is not None and \ re.search(args['stdout_match'], output) is None: raise OutputNotFoundError(output) if args['stdout_negative_match'] is not None and \ re.search(args['stdout_negative_match'], output) is not None: raise OutputFoundError(output) if args['validate']: if args['check_rng']: rng_file = "%s/api/api-result.rng" % rng_directory() else: rng_file = None cmd = find_validator(rng_file) if not cmd: raise XmlValidationError("Could not find validator for %s" % rng_file) if self.verbose: print("\nRunning: %s" % " ".join(cmd)) with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator: output = pipe_communicate(validator, check_stderr=True, stdin=output) if self.verbose: print(output) if validator.returncode != 0: raise XmlValidationError(output) return ExitStatus.OK def set_error(self, step, cmd): """Record failure of this test.""" msg = "FAILURE - '%s' failed at step %d. Command: %s %s" self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args']) self.exitcode = ExitStatus.ERROR def start_environment(self): """Prepare the host for executing a test.""" if os.path.exists(self.logpath): os.remove(self.logpath) self._kill_daemons() self._start_daemons() logfile = None init_time = time.time() update_time = init_time while True: # FIXME: Eventually use 'with' here, which seems complicated given # everything happens in a loop. # pylint: disable=consider-using-with time.sleep(0.1) if not self.force_wait and logfile is None \ and os.path.exists(self.logpath): logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1") if not self.force_wait and logfile is not None: for line in logfile.readlines(): if "successfully started" in line: return now = time.time() if self.timeout > 0 and (now - init_time) >= self.timeout: if not self.force_wait: print("\tDaemon %s doesn't seem to have been initialized within %fs." "\n\tConsider specifying a longer '--timeout' value." - %(self._daemon_location, self.timeout)) + % (self._daemon_location, self.timeout)) return if self.verbose and (now - update_time) >= 5: print("Waiting for %s to be initialized: %fs ..." - %(self._daemon_location, now - init_time)) + % (self._daemon_location, now - init_time)) update_time = now class Tests: """The base class for a collection of regression tests.""" def __init__(self, **kwargs): """ Create a new Tests instance. This method must be provided by all subclasses, which must call Tests.__init__ first. Keywork arguments: force_wait -- logdir -- The base directory under which to create a directory to store output and temporary data. timeout -- How long to wait for the test to complete. verbose -- Whether to print additional information, including verbose command output and daemon log files. """ self.force_wait = kwargs.get("force_wait", False) self.logdir = kwargs.get("logdir", "/tmp") self.timeout = kwargs.get("timeout", 2) self.verbose = kwargs.get("verbose", False) self._tests = [] def exit(self): """Exit (with error status code if any test failed).""" for test in self._tests: if not test.executed: continue if test.exitcode != ExitStatus.OK: sys.exit(ExitStatus.ERROR) sys.exit(ExitStatus.OK) def print_list(self): """List all registered tests.""" print("\n==== %d TESTS FOUND ====" % len(self._tests)) print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION")) print("%35s - %s" % ("--------------------", "--------------------")) for test in self._tests: print("%35s - %s" % (test.name, test.description)) print("==== END OF LIST ====\n") def print_results(self): """Print summary of results of executed tests.""" failures = 0 success = 0 print("\n\n======= FINAL RESULTS ==========") print("\n--- FAILURE RESULTS:") for test in self._tests: if not test.executed: continue if test.exitcode != ExitStatus.OK: failures += 1 test.print_result(" ") else: success += 1 if failures == 0: print(" None") print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures)) def run_single(self, name): """Run a single named test.""" for test in self._tests: if test.name == name: test.run() break def run_tests(self): """Run all tests.""" for test in self._tests: test.run() def run_tests_matching(self, pattern): """Run all tests whose name matches a pattern.""" for test in self._tests: if test.name.count(pattern) != 0: test.run() diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py index f64a276cdb..666b3a1496 100644 --- a/python/pacemaker/_cts/tests/simulstartlite.py +++ b/python/pacemaker/_cts/tests/simulstartlite.py @@ -1,128 +1,128 @@ """Simultaneously start stopped nodes.""" __all__ = ["SimulStartLite"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStartLite(CTSTest): """ A pseudo-test that sets up conditions before running some other test. This class starts any stopped nodes more or less simultaneously. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStartLite instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "SimulStartLite" def __call__(self, dummy): """Return whether starting all stopped nodes more or less simultaneously succeeds.""" self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... node_list = [] for node in self._env["nodes"]: if self._cm.expected_status[node] == "down": self.incr("WasStopped") node_list.append(node) self.set_timer() while len(node_list) > 0: # Repeat until all nodes come up uppat = self.templates["Pat:NonDC_started"] if self._cm.upcount() == 0: uppat = self.templates["Pat:Local_started"] watchpats = [ self.templates["Pat:DC_IDLE"] ] for node in node_list: watchpats.extend([uppat % node, self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node]) # Start all the nodes - at about the same time... - watch = self.create_watch(watchpats, self._env["DeadTime"]+10) + watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) watch.set_watch() stonith = self._cm.prepare_fencing_watcher() for node in node_list: self._cm.start_cm_async(node) watch.look_for_all() node_list = self._cm.fencing_cleanup(self.name, stonith) if node_list is None: return self.failure("Cluster did not stabilize") # Remove node_list messages from watch.unmatched for node in node_list: self._logger.debug("Dealing with stonith operations for %s" % node_list) if watch.unmatched: try: watch.unmatched.remove(uppat % node) except ValueError: self.debug("Already matched: %s" % (uppat % node)) try: watch.unmatched.remove(self.templates["Pat:InfraUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node)) try: watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node)) if watch.unmatched: for regex in watch.unmatched: self._logger.log("Warn: Startup pattern not found: %s" % regex) if not self._cm.cluster_stable(): return self.failure("Cluster did not stabilize") did_fail = False unstable = [] for node in self._env["nodes"]: if not self._cm.stat_cm(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: %s" % unstable) unstable = [] for node in self._env["nodes"]: if not self._cm.node_stable(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: %s" % unstable) return self.success() def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" return False diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py index 8375d55310..69f1b9c4d2 100644 --- a/python/pacemaker/_cts/tests/simulstoplite.py +++ b/python/pacemaker/_cts/tests/simulstoplite.py @@ -1,86 +1,86 @@ """Simultaneously stop running nodes.""" __all__ = ["SimulStopLite"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStopLite(CTSTest): """ A pseudo-test that sets up conditions before running some other test. This class stops any running nodes more or less simultaneously. It can be used both to set up a test or to clean up a test. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStopLite instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "SimulStopLite" def __call__(self, dummy): """Return whether stopping all running nodes more or less simultaneously succeeds.""" self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... watchpats = [] for node in self._env["nodes"]: if self._cm.expected_status[node] == "up": self.incr("WasStarted") watchpats.append(self.templates["Pat:We_stopped"] % node) if len(watchpats) == 0: return self.success() # Stop all the nodes - at about the same time... - watch = self.create_watch(watchpats, self._env["DeadTime"]+10) + watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) watch.set_watch() self.set_timer() for node in self._env["nodes"]: if self._cm.expected_status[node] == "up": self._cm.stop_cm_async(node) if watch.look_for_all(): # Make sure they're completely down with no residule for node in self._env["nodes"]: self._rsh(node, self.templates["StopCmd"]) return self.success() did_fail = False up_nodes = [] for node in self._env["nodes"]: if self._cm.stat_cm(node): did_fail = True up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: %s" % up_nodes) self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched) return self.failure("Missing log message: %s " % watch.unmatched) def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" return False diff --git a/python/pacemaker/_cts/tests/standbytest.py b/python/pacemaker/_cts/tests/standbytest.py index 924bdbcaea..a3e17343a8 100644 --- a/python/pacemaker/_cts/tests/standbytest.py +++ b/python/pacemaker/_cts/tests/standbytest.py @@ -1,106 +1,106 @@ """Put a node into standby mode and check that resources migrate.""" __all__ = ["StandbyTest"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.starttest import StartTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StandbyTest(CTSTest): """Put a node into standby and check that resources migrate away from it.""" def __init__(self, cm): """ Create a new StandbyTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Standby" self._start = StartTest(cm) self._startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resources, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): """Perform this test.""" self.incr("calls") ret = self._startall(None) if not ret: return self.failure("Start all nodes failed") self.debug("Make sure node %s is active" % node) if self._cm.in_standby_mode(node): if not self._cm.set_standby_mode(node, False): return self.failure("can't set node %s to active mode" % node) self._cm.cluster_stable() if self._cm.in_standby_mode(node): return self.failure("standby status of %s is [on] but we expect [off]" % node) watchpats = [ r"State transition .* -> S_POLICY_ENGINE", ] - watch = self.create_watch(watchpats, self._env["DeadTime"]+10) + watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) watch.set_watch() self.debug("Setting node %s to standby mode" % node) if not self._cm.set_standby_mode(node, True): return self.failure("can't set node %s to standby mode" % node) self.set_timer("on") ret = watch.look_for_all() if not ret: self._logger.log("Patterns not found: %r" % watch.unmatched) self._cm.set_standby_mode(node, False) return self.failure("cluster didn't react to standby change on %s" % node) self._cm.cluster_stable() if not self._cm.in_standby_mode(node): return self.failure("standby status of %s is [off] but we expect [on]" % node) self.log_timer("on") self.debug("Checking resources") rscs_on_node = self._cm.active_resources(node) if rscs_on_node: rc = self.failure("%s set to standby, %r is still running on it" % (node, rscs_on_node)) self.debug("Setting node %s to active mode" % node) self._cm.set_standby_mode(node, False) return rc self.debug("Setting node %s to active mode" % node) if not self._cm.set_standby_mode(node, False): return self.failure("can't set node %s to active mode" % node) self.set_timer("off") self._cm.cluster_stable() if self._cm.in_standby_mode(node): return self.failure("standby status of %s is [on] but we expect [off]" % node) self.log_timer("off") return self.success() diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py index 616d37e978..48ef73ac0f 100644 --- a/python/pacemaker/_cts/tests/stoptest.py +++ b/python/pacemaker/_cts/tests/stoptest.py @@ -1,97 +1,97 @@ """Stop the cluster manager on a given node.""" __all__ = ["StopTest"] __copyright__ = "Copyright 2000-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StopTest(CTSTest): """ A pseudo-test that sets up conditions before running some other test. This class stops the cluster manager on a given node. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new StopTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Stop" def __call__(self, node): """Stop the given node, returning whether this succeeded or not.""" self.incr("calls") if self._cm.expected_status[node] != "up": return self.skipped() # Technically we should always be able to notice ourselves stopping patterns = [ self.templates["Pat:We_stopped"] % node, ] # Any active node needs to notice this one left # (note that this won't work if we have multiple partitions) for other in self._env["nodes"]: if self._cm.expected_status[other] == "up" and other != node: - patterns.append(self.templates["Pat:They_stopped"] %(other, node)) + patterns.append(self.templates["Pat:They_stopped"] % (other, node)) watch = self.create_watch(patterns, self._env["DeadTime"]) watch.set_watch() if node == self._cm.our_node: self.incr("us") else: if self._cm.upcount() <= 1: self.incr("all") else: self.incr("them") self._cm.stop_cm(node) watch.look_for_all() failreason = None unmatched_str = "||" if watch.unmatched: (_, output) = self._rsh(node, "/bin/ps axf", verbose=1) for line in output: self.debug(line) (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) for line in output: self.debug(line) for regex in watch.unmatched: self._logger.log("ERROR: Shutdown pattern not found: %s" % regex) unmatched_str += "%s||" % regex failreason = "Missing shutdown pattern" self._cm.cluster_stable(self._env["DeadTime"]) if not watch.unmatched or self._cm.upcount() == 0: return self.success() if len(watch.unmatched) >= self._cm.upcount(): return self.failure("no match against (%s)" % unmatched_str) if failreason is None: return self.success() return self.failure(failreason)