diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py index a9adbbbc00..dafa684b24 100644 --- a/python/pacemaker/_cts/audits.py +++ b/python/pacemaker/_cts/audits.py @@ -1,1033 +1,1033 @@ """ Auditing classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time import uuid from pacemaker.buildoptions import BuildOptions from pacemaker._cts.input import should_continue from pacemaker._cts.watcher import LogKind, LogWatcher class ClusterAudit: """ The base class for various kinds of auditors. Specific audit implementations should be built on top of this one. Audits can do all kinds of checks on the system. The basic interface for callers is the `__call__` method, which returns True if the audit passes and False if it fails. """ def __init__(self, cm): """ Create a new ClusterAudit instance Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.name = None def __call__(self): raise NotImplementedError def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. This method must be implemented by all subclasses. """ raise NotImplementedError def log(self, args): """ Log a message """ self._cm.log("audit: %s" % args) def debug(self, args): """ Log a debug message """ self._cm.debug("audit: %s" % args) class LogAudit(ClusterAudit): """ Audit each cluster node to verify that some logging system is usable. This is done by logging a unique test message and then verifying that we can read back that test message using logging tools. """ def __init__(self, cm): """ Create a new LogAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "LogAudit" def _restart_cluster_logging(self, nodes=None): """ Restart logging on the given nodes, or all if none are given """ if not nodes: nodes = self._cm.Env["nodes"] self._cm.debug("Restarting logging on: %s" % repr(nodes)) for node in nodes: if self._cm.Env["have_systemd"]: (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket") if rc != 0: self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service") if rc != 0: self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"]) if rc != 0: self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node)) def _create_watcher(self, patterns, kind): """ Create a new LogWatcher instance for the given patterns """ watch = LogWatcher(self._cm.Env["LogFileName"], patterns, self._cm.Env["nodes"], kind, "LogAudit", 5, silent=True) watch.set_watch() return watch def _test_logging(self): """ Perform the log audit """ patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self._cm.Env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) watch_pref = self._cm.Env["LogWatcher"] if watch_pref == LogKind.ANY: kinds = [ LogKind.FILE ] if self._cm.Env["have_systemd"]: kinds += [ LogKind.JOURNAL ] kinds += [ LogKind.REMOTE_FILE ] for k in kinds: watch[k] = self._create_watcher(patterns, k) self._cm.log("Logging test message with identifier %s" % suffix) else: watch[watch_pref] = self._create_watcher(patterns, watch_pref) for node in self._cm.Env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix) (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0) if rc != 0: self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in list(watch.keys()): w = watch[k] if watch_pref == LogKind.ANY: self._cm.log("Checking for test message in %s logs" % k) w.look_for_all(silent=True) if w.unmatched: for regex in w.unmatched: self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind)) else: if watch_pref == LogKind.ANY: self._cm.log("Found test message in %s logs" % k) self._cm.Env["LogWatcher"] = k return 1 return False def __call__(self): max_attempts = 3 attempt = 0 self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) while attempt <= max_attempts and not self._test_logging(): attempt += 1 self._restart_cluster_logging() time.sleep(60*attempt) if attempt > max_attempts: self._cm.log("ERROR: Cluster logging unrecoverable.") return False return True def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ if self._cm.Env["DoBSC"] or self._cm.Env["LogAuditDisabled"]: return False return True class DiskAudit(ClusterAudit): """ Audit disk usage on cluster nodes to verify that there is enough free space left on whichever mounted file system holds the logs. Warn on: less than 100 MB or 10% of free space Error on: less than 10 MB or 5% of free space """ def __init__(self, cm): """ Create a new DiskAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "DiskspaceAudit" def __call__(self): result = True # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) for node in self._cm.Env["nodes"]: (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1) if not dfout: self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) continue dfout = dfout[0].strip() try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = False if not should_continue(self._cm.Env): raise ValueError("Disk full on %s" % node) elif remaining_mb < 100 or used_percent > 90: self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ return not self._cm.Env["DoBSC"] class FileAudit(ClusterAudit): """ Audit the filesystem looking for various failure conditions: * The presence of core dumps from corosync or Pacemaker daemons * Stale IPC files """ def __init__(self, cm): """ Create a new FileAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.known = [] self.name = "FileAudit" def __call__(self): result = True self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) for node in self._cm.Env["nodes"]: (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Corosync core file on %s: %s" % (node, line)) - if node in self._cm.ShouldBeStatus and self._cm.ShouldBeStatus[node] == "down": + if self._cm.ShouldBeStatus.get(node) == "down": clean = False (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) for line in lsout: result = False clean = True self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) for line in lsout: self._cm.debug("ps[%s]: %s" % (node, line)) self._cm.rsh(node, "rm -rf /dev/shm/qb-*") else: self._cm.debug("Skipping %s" % node) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ return True class AuditResource: """ A base class for storing information about a cluster resource """ def __init__(self, cm, line): """ Create a new AuditResource instance Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single resource """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None @property def unique(self): """ Is this resource unique? """ return self.flags & 0x20 @property def orphan(self): """ Is this resource an orphan? """ return self.flags & 0x01 @property def managed(self): """ Is this resource managed by the cluster? """ return self.flags & 0x02 class AuditConstraint: """ A base class for storing information about a cluster constraint """ def __init__(self, cm, line): """ Create a new AuditConstraint instance Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single constraint """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): """ Audit primitive resources to verify a variety of conditions, including that they are active and managed only when expected; they are active on the expected clusted node; and that they are not orphaned. """ def __init__(self, cm): """ Create a new PrimitiveAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PrimitiveAudit" self._active_nodes = [] self._constraints = [] self._inactive_nodes = [] self._resources = [] self._target = None def _audit_resource(self, resource, quorum): """ Perform the audit of a single resource """ rc = True active = self._cm.ResourceLocation(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %s" % (resource.id, repr(active))) elif resource.needs_quorum == 1: self._cm.log("Resource %s active without quorum: %s" % (resource.id, repr(active))) rc = False elif not resource.managed: self._cm.log("Resource %s not managed. Active on %s" % (resource.id, repr(active))) elif not resource.unique: # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %s" % (resource.id, repr(active))) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self._cm.log("Resource %s is active multiple times: %s" % (resource.id, repr(active))) rc = False elif resource.orphan: self.debug("Resource %s is an inactive orphan" % resource.id) elif not self._inactive_nodes: self._cm.log("WARN: Resource %s not served anywhere" % resource.id) rc = False elif self._cm.Env["warn-inactive"]: if quorum or not resource.needs_quorum: self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self._inactive_nodes))) else: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self._inactive_nodes))) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self._inactive_nodes))) return rc def _setup(self): """ Verify cluster nodes are active, and collect resource and colocation information used for performing the audit. """ for node in self._cm.Env["nodes"]: if self._cm.ShouldBeStatus[node] == "up": self._active_nodes.append(node) else: self._inactive_nodes.append(node) for node in self._cm.Env["nodes"]: if self._target is None and self._cm.ShouldBeStatus[node] == "up": self._target = node if not self._target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name) return False (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): self._resources.append(AuditResource(self._cm, line)) elif re.search("^Constraint", line): self._constraints.append(AuditConstraint(self._cm, line)) else: self._cm.log("Unknown entry: %s" % line) return True def __call__(self): result = True if not self._setup(): return result quorum = self._cm.HasQuorum(None) for resource in self._resources: if resource.type == "primitive" and not self._audit_resource(resource, quorum): result = False return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class GroupAudit(PrimitiveAudit): """ Audit group resources to verify that each of its child primitive resources is active on the expected cluster node. """ def __init__(self, cm): """ Create a new GroupAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "GroupAudit" def __call__(self): result = True if not self._setup(): return result for group in self._resources: if group.type != "group": continue first_match = True group_location = None for child in self._resources: if child.parent != group.id: continue nodes = self._cm.ResourceLocation(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = False if len(nodes) > 1: result = False self._cm.log("Child %s of %s is active more than once: %s" % (child.id, group.id, repr(nodes))) elif not nodes: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: result = False self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return result class CloneAudit(PrimitiveAudit): """ Audit clone resources. NOTE: Currently, this class does not perform any actual audit functions. """ def __init__(self, cm): """ Create a new CloneAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "CloneAudit" def __call__(self): result = True if not self._setup(): return result for clone in self._resources: if clone.type != "clone": continue for child in self._resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return result class ColocationAudit(PrimitiveAudit): """ Audit cluster resources to verify that those that should be colocated with each other actually are. """ def __init__(self, cm): """ Create a new ColocationAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "ColocationAudit" def _crm_location(self, resource): """ Return a list of cluster nodes where a given resource is running """ (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): result = True if not self._setup(): return result for coloc in self._constraints: if coloc.type != "rsc_colocation": continue source = self._crm_location(coloc.rsc) target = self._crm_location(coloc.target) if not source: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: result = False self._cm.log("Colocation audit (%s): %s running on %s (not in %s)" % (coloc.id, coloc.rsc, node, repr(target))) else: self.debug("Colocation audit (%s): %s running on %s (in %s)" % (coloc.id, coloc.rsc, node, repr(target))) return result class ControllerStateAudit(ClusterAudit): """ Audit cluster nodes to verify that those we expect to be active are active, and those that are expected to be inactive are inactive. """ def __init__(self, cm): """ Create a new ControllerStateAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "ControllerStateAudit" def __call__(self): result = True up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self._cm.Env["nodes"]: should_be = self._cm.ShouldBeStatus[node] rc = self._cm.test_node_CM(node) if rc > 0: if should_be == "down": down_are_up += 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down += 1 if len(unstable_list) > 0: result = False self._cm.log("Cluster is not stable: %d (of %d): %s" % (len(unstable_list), self._cm.upcount(), repr(unstable_list))) if up_are_down > 0: result = False self._cm.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self._cm.Env["nodes"]))) if down_are_up > 0: result = False self._cm.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self._cm.Env["nodes"]))) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class CIBAudit(ClusterAudit): """ Audit the CIB by verifying that it is identical across cluster nodes """ def __init__(self, cm): """ Create a new CIBAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "CibAudit" def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: self.debug("\tNo partitions to audit") return result for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) if self._audit_cib_contents(partition) == 0: result = False return result def _audit_cib_contents(self, hostlist): """ Perform the CIB audit on the given hosts """ passed = True node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self._store_remote_cib(node, node0) if node_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node) passed = False elif node0 is None: node0 = node node0_xml = node_xml elif node0_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node0) passed = False else: (rc, result) = self._cm.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) if rc != 0: self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = False for line in result: if not re.search("", line): passed = False self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) return passed def _store_remote_cib(self, node, target): """ Store a copy of the given node's CIB on the given target node. If no target is given, store the CIB on the given node. """ filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1) if rc != 0: self._cm.log("Could not retrieve configuration") return None self._cm.rsh("localhost", "rm -f %s" % filename) for line in lines: self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self._cm.log("Could not store configuration") return None return filename def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class PartitionAudit(ClusterAudit): """ Audit each partition in a cluster to verify a variety of conditions: * The number of partitions and the nodes in each is as expected * Each node is active when it should be active and inactive when it should be inactive * The status and epoch of each node is as expected * A partition has quorum * A partition has a DC when expected """ def __init__(self, cm): """ Create a new PartitionAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PartitionAudit" self._node_epoch = {} self._node_state = {} self._node_quorum = {} def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: return result self._cm.cluster_stable(double_check=True) if len(ccm_partitions) != self._cm.partitions_expected: self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) result = False for partition in ccm_partitions: self._cm.log("\t %s" % partition) for partition in ccm_partitions: if self._audit_partition(partition) == 0: result = False return result def _trim_string(self, avalue): """ Remove the last character from a multi-character string """ if not avalue: return None if len(avalue) > 1: return avalue[:-1] return avalue def _trim2int(self, avalue): """ Remove the last character from a multi-character string and convert the result to an int. """ trimmed = self._trim_string(avalue) if trimmed: return int(trimmed) return None def _audit_partition(self, partition): """ Perform the audit of a single partition """ passed = True dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % partition) for node in node_list: if self._cm.ShouldBeStatus[node] != "up": self._cm.log("Warn: Node %s appeared out of nowhere" % node) self._cm.ShouldBeStatus[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1) self._node_state[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1) self._node_epoch[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1) self._node_quorum[node] = out[0].strip() self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node])) self._node_state[node] = self._trim_string(self._node_state[node]) self._node_epoch[node] = self._trim2int(self._node_epoch[node]) self._node_quorum[node] = self._trim_string(self._node_quorum[node]) if not self._node_epoch[node]: self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node) self._cm.ShouldBeStatus[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch: lowest_epoch = self._node_epoch[node] if not lowest_epoch: self._cm.log("Lowest epoch not determined in %s" % partition) passed = False for node in node_list: if self._cm.ShouldBeStatus[node] != "up": continue if self._cm.is_node_dc(node, self._node_state[node]): dc_found.append(node) if self._node_epoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self._node_epoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self._cm.log("DC %s is not the oldest node (%d vs. %d)" % (node, self._node_epoch[node], lowest_epoch)) passed = False if not dc_found: self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self._cm.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = False if not passed: for node in node_list: if self._cm.ShouldBeStatus[node] == "up": self._cm.log("epoch %s : %s" % (self._node_epoch[node], self._node_state[node])) return passed def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False # pylint: disable=invalid-name def audit_list(cm): """ Return a list of instances of applicable audits that can be performed for the given ClusterManager. """ result = [] for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit, PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit, ColocationAudit, CIBAudit]: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/python/pacemaker/_cts/remote.py b/python/pacemaker/_cts/remote.py index 99d2ed7aca..0ad792bd15 100644 --- a/python/pacemaker/_cts/remote.py +++ b/python/pacemaker/_cts/remote.py @@ -1,288 +1,288 @@ """ Remote command runner for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["RemoteExec", "RemoteFactory"] __copyright__ = "Copyright 2014-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import os from subprocess import Popen,PIPE from threading import Thread from pacemaker._cts.logging import LogFactory def convert2string(lines): """ Convert a byte string to a UTF-8 string, and a list of byte strings to a list of UTF-8 strings. All other text formats are passed through. """ if isinstance(lines, bytes): return lines.decode("utf-8") if isinstance(lines, list): lst = [] for line in lines: if isinstance(line, bytes): line = line.decode("utf-8") lst.append(line) return lst return lines class AsyncCmd(Thread): """ A class for doing the hard work of running a command on another machine """ def __init__(self, node, command, proc=None, delegate=None): """ Create a new AsyncCmd instance Arguments: node -- The remote machine to run on command -- The ssh command string to use for remote execution proc -- If not None, a process object previously created with Popen. Instead of spawning a new process, we will then wait on this process to finish and handle its output. delegate -- When the command completes, call the async_complete method on this object """ self._command = command self._delegate = delegate self._logger = LogFactory() self._node = node self._proc = proc Thread.__init__(self) def run(self): """ Run the previously instantiated AsyncCmd object """ out = None err = None if not self._proc: # pylint: disable=consider-using-with self._proc = Popen(self._command, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True) self._logger.debug("cmd: async: target=%s, pid=%d: %s" % (self._node, self._proc.pid, self._command)) self._proc.wait() if self._delegate: self._logger.debug("cmd: pid %d returned %d to %s" % (self._proc.pid, self._proc.returncode, repr(self._delegate))) else: self._logger.debug("cmd: pid %d returned %d" % (self._proc.pid, self._proc.returncode)) if self._proc.stderr: err = self._proc.stderr.readlines() self._proc.stderr.close() for line in err: self._logger.debug("cmd: stderr[%d]: %s" % (self._proc.pid, line)) err = convert2string(err) if self._proc.stdout: out = self._proc.stdout.readlines() self._proc.stdout.close() out = convert2string(out) if self._delegate: self._delegate.async_complete(self._proc.pid, self._proc.returncode, out, err) class RemoteExec: """ An abstract class for remote execution. It runs a command on another machine using ssh and scp. """ def __init__(self, command, cp_command, silent=False): """ Create a new RemoteExec instance Arguments: command -- The ssh command string to use for remote execution cp_command -- The scp command string to use for copying files silent -- Should we log command status? """ self._command = command self._cp_command = cp_command self._logger = LogFactory() self._silent = silent self._our_node = os.uname()[1].lower() def _fixcmd(self, cmd): """ Perform shell escapes on certain characters in the input cmd string """ return re.sub("\'", "'\\''", cmd) def _cmd(self, args): """ Given a list of arguments, return the string that will be run on the remote system """ sysname = args[0] command = args[1] - if sysname is None or sysname.lower() == self._our_node or sysname == "localhost": + if sysname is None or sysname.lower() in [self._our_node, "localhost"]: ret = command else: ret = "%s %s '%s'" % (self._command, sysname, self._fixcmd(command)) return ret def _log(self, args): """ Log a message """ if not self._silent: self._logger.log(args) def _debug(self, args): """ Log a message at the debug level """ if not self._silent: self._logger.debug(args) def call_async(self, node, command, delegate=None): """ Run the given command on the given remote system and do not wait for it to complete. Arguments: node -- The remote machine to run on command -- The command to run, as a string delegate -- When the command completes, call the async_complete method on this object Returns: The running process object """ aproc = AsyncCmd(node, self._cmd([node, command]), delegate=delegate) aproc.start() return aproc def __call__(self, node, command, synchronous=True, verbose=2): """ Run the given command on the given remote system. If you call this class like a function, this is what gets called. It's approximately the same as a system() call on the remote machine. Arguments: node -- The remote machine to run on command -- The command to run, as a string synchronous -- Should we wait for the command to complete? verbose -- If 0, do not lo:g anything. If 1, log the command and its return code but not its output. If 2, additionally log command output. Returns: A tuple of (return code, command output) """ rc = 0 result = None # pylint: disable=consider-using-with proc = Popen(self._cmd([node, command]), stdout = PIPE, stderr = PIPE, close_fds = True, shell = True) if not synchronous and proc.pid > 0 and not self._silent: aproc = AsyncCmd(node, command, proc=proc) aproc.start() return (rc, result) if proc.stdout: result = proc.stdout.readlines() proc.stdout.close() else: self._log("No stdout stream") rc = proc.wait() if verbose > 0: self._debug("cmd: target=%s, rc=%d: %s" % (node, rc, command)) result = convert2string(result) if proc.stderr: errors = proc.stderr.readlines() proc.stderr.close() for err in errors: self._debug("cmd: stderr: %s" % err) if verbose == 2: for line in result: self._debug("cmd: stdout: %s" % line) return (rc, result) def copy(self, source, target, silent=False): """ Perform a copy of the source file to the remote target, using the cp_command provided when the RemoteExec object was created. Returns: The return code of the cp_command """ cmd = "%s '%s' '%s'" % (self._cp_command, source, target) rc = os.system(cmd) if not silent: self._debug("cmd: rc=%d: %s" % (rc, cmd)) return rc def exists_on_all(self, filename, hosts): """ Return True if specified file exists on all specified hosts. """ for host in hosts: rc = self(host, "test -r %s" % filename) if rc != 0: return False return True class RemoteFactory: """ A class for constructing a singleton instance of a RemoteExec object """ # Class variables # -n: no stdin, -x: no X11, # -o ServerAliveInterval=5: disconnect after 3*5s if the server # stops responding command = ("ssh -l root -n -x -o ServerAliveInterval=5 " "-o ConnectTimeout=10 -o TCPKeepAlive=yes " "-o ServerAliveCountMax=3 ") # -B: batch mode, -q: no stats (quiet) cp_command = "scp -B -q" instance = None # pylint: disable=invalid-name def getInstance(self): """ Returns the previously created instance of RemoteExec, or creates a new instance if one does not already exist. """ if not RemoteFactory.instance: RemoteFactory.instance = RemoteExec(RemoteFactory.command, RemoteFactory.cp_command, False) return RemoteFactory.instance def enable_qarsh(self): """ Enable the QA remote shell """ # http://nstraz.wordpress.com/2008/12/03/introducing-qarsh/ print("Using QARSH for connections to cluster nodes") RemoteFactory.command = "qarsh -t 300 -l root" RemoteFactory.cp_command = "qacp -q" diff --git a/python/pacemaker/_cts/test.py b/python/pacemaker/_cts/test.py index df484dc864..b2a90a6e08 100644 --- a/python/pacemaker/_cts/test.py +++ b/python/pacemaker/_cts/test.py @@ -1,594 +1,601 @@ """ A module providing base classes for defining regression tests and groups of regression tests. Everything exported here should be considered an abstract class that needs to be subclassed in order to do anything useful. Various functions will raise NotImplementedError if not overridden by a subclass. """ __copyright__ = "Copyright 2009-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+)" __all__ = ["Test", "Tests"] import io import os import re import shlex import signal import subprocess import sys import time from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError from pacemaker._cts.process import pipe_communicate from pacemaker.buildoptions import BuildOptions from pacemaker.exitstatus import ExitStatus def find_validator(rng_file): """ Return the command line used to validate XML output, or None if the validator is not installed. """ if os.access("/usr/bin/xmllint", os.X_OK): if rng_file is None: return ["xmllint", "-"] return ["xmllint", "--relaxng", rng_file, "-"] return None def rng_directory(): """ Which directory contains the RNG schema files? """ if "PCMK_schema_directory" in os.environ: return os.environ["PCMK_schema_directory"] if os.path.exists("%s/cts-fencing.in" % sys.path[0]): return "xml" return BuildOptions.SCHEMA_DIR class Pattern: """ A class for checking log files for a given pattern """ def __init__(self, pat, negative=False, regex=False): """ Create a new Pattern instance Arguments: pat -- The string to search for negative -- If True, pat must not be found in any input regex -- If True, pat is a regex and not a substring """ self._pat = pat self.negative = negative self.regex = regex def __str__(self): return self._pat def match(self, line): """ Is this pattern found in the given line? """ if self.regex: return re.search(self._pat, line) is not None return self._pat in line class Test: """ The base class for a single regression test. A single regression test may still run multiple commands as part of its execution. """ def __init__(self, name, description, **kwargs): """ Create a new Test instance. This method must be provided by all subclasses, which must call Test.__init__ first. Arguments: description -- A user-readable description of the test, helpful in identifying what test is running or has failed. name -- The name of the test. Command line tools use this attribute to allow running only tests with the exact name, or tests whose name matches a given pattern. This should be unique among all tests. Keyword arguments: force_wait -- logdir -- The base directory under which to create a directory to store output and temporary data. timeout -- How long to wait for the test to complete. verbose -- Whether to print additional information, including verbose command output and daemon log files. """ self.description = description self.executed = False self.name = name self.force_wait = kwargs.get("force_wait", False) self.logdir = kwargs.get("logdir", "/tmp") self.timeout = kwargs.get("timeout", 2) self.verbose = kwargs.get("verbose", False) self._cmds = [] self._patterns = [] self._daemon_location = None self._daemon_output = "" self._daemon_process = None self._result_exitcode = ExitStatus.OK self._result_txt = "" ### ### PROPERTIES ### @property def exitcode(self): """ The final exitcode of the Test. If all commands pass, this property will be ExitStatus.OK. Otherwise, this property will be the exitcode of the first command to fail. """ return self._result_exitcode @exitcode.setter def exitcode(self, value): self._result_exitcode = value @property def logpath(self): """ The path to the log for whatever daemon is being tested. Note that this requires all subclasses to set self._daemon_location before accessing this property or an exception will be raised. """ return os.path.join(self.logdir, "%s.log" % self._daemon_location) ### ### PRIVATE METHODS ### def _kill_daemons(self): """ Kill any running daemons in preparation for executing the test """ raise NotImplementedError("_kill_daemons not provided by subclass") def _match_log_patterns(self): """ Check test output for expected patterns, setting self.exitcode and self._result_txt as appropriate. Not all subclass will need to do this. """ if len(self._patterns) == 0: return n_failed_matches = 0 n_negative_matches = 0 output = self._daemon_output.split("\n") for pat in self._patterns: positive_match = False for line in output: if pat.match(line): if pat.negative: n_negative_matches += 1 if self.verbose: print("This pattern should not have matched = '%s" % pat) break positive_match = True break if not pat.negative and not positive_match: n_failed_matches += 1 print("Pattern Not Matched = '%s'" % pat) if n_failed_matches > 0 or n_negative_matches > 0: msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches." self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches) self.exitcode = ExitStatus.ERROR def _new_cmd(self, cmd, args, exitcode, **kwargs): """ Add a command to be executed as part of this test. Arguments: cmd -- The program to run. args -- Commands line arguments to pass to cmd, as a string. exitcode -- The expected exit code of cmd. This can be used to run a command that is expected to fail. Keyword arguments: stdout_match -- If not None, a string that is expected to be present in the stdout of cmd. This can be a regular expression. no_wait -- Do not wait for cmd to complete. stdout_negative_match -- If not None, a string that is expected to be missing in the stdout of cmd. This can be a regualr expression. kill -- A command to be run after cmd, typically in order to kill a failed process. This should be the entire command line including arguments as a single string. validate -- If True, the output of cmd will be passed to xmllint for validation. If validation fails, XmlValidationError will be raised. check_rng -- If True and validate is True, command output will additionally be checked against the api-result.rng file. check_stderr -- If True, the stderr of cmd will be included in output. env -- If not None, variables to set in the environment """ self._cmds.append( { "args": args, "check_rng": kwargs.get("check_rng", True), "check_stderr": kwargs.get("check_stderr", True), "cmd": cmd, "expected_exitcode": exitcode, "kill": kwargs.get("kill", None), "no_wait": kwargs.get("no_wait", False), "stdout_match": kwargs.get("stdout_match", None), "stdout_negative_match": kwargs.get("stdout_negative_match", None), "validate": kwargs.get("validate", True), "env": kwargs.get("env", None), } ) def _start_daemons(self): """ Start any necessary daemons in preparation for executing the test """ raise NotImplementedError("_start_daemons not provided by subclass") ### ### PUBLIC METHODS ### def add_cmd(self, cmd, args, validate=True, check_rng=True, check_stderr=True, env=None): """ Add a simple command to be executed as part of this test """ self._new_cmd(cmd, args, ExitStatus.OK, validate=validate, check_rng=check_rng, check_stderr=check_stderr, env=env) def add_cmd_and_kill(self, cmd, args, kill_proc): """ Add a command and system command to be executed as part of this test """ self._new_cmd(cmd, args, ExitStatus.OK, kill=kill_proc) def add_cmd_check_stdout(self, cmd, args, match, no_match=None, env=None): """ Add a simple command with expected output to be executed as part of this test """ self._new_cmd(cmd, args, ExitStatus.OK, stdout_match=match, stdout_negative_match=no_match, env=env) def add_cmd_expected_fail(self, cmd, args, exitcode=ExitStatus.ERROR): """ Add a command that is expected to fail to be executed as part of this test """ self._new_cmd(cmd, args, exitcode) def add_cmd_no_wait(self, cmd, args): """ Add a simple command to be executed (without waiting) as part of this test """ self._new_cmd(cmd, args, ExitStatus.OK, no_wait=True) def add_log_pattern(self, pattern, negative=False, regex=False): """ Add a pattern that should appear in the test's logs """ self._patterns.append(Pattern(pattern, negative=negative, regex=regex)) + def _signal_dict(self): + """ Return a dictionary mapping signal numbers to their names """ + + # FIXME: When we support python >= 3.5, this function can be replaced with: + # signal.Signals(self.daemon_process.returncode).name + return { getattr(signal, _signame): _signame + for _signame in dir(signal) + if _signame.startswith("SIG") and not _signame.startswith("SIG_") } + def clean_environment(self): """ Clean up the host after executing a test """ if self._daemon_process: if self._daemon_process.poll() is None: self._daemon_process.terminate() self._daemon_process.wait() else: - return_code = { - getattr(signal, _signame): _signame - for _signame in dir(signal) - if _signame.startswith('SIG') and not _signame.startswith("SIG_") - }.get(-self._daemon_process.returncode, "RET=%d" % (self._daemon_process.returncode)) + rc = self._daemon_process.returncode + signame = self._signal_dict().get(-rc, "RET=%s" % rc) msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)." - self._result_txt = msg % (self.name, self._daemon_location, return_code) + + self._result_txt = msg % (self.name, self._daemon_location, signame) self.exitcode = ExitStatus.ERROR self._daemon_process = None self._daemon_output = "" # the default for utf-8 encoding would error out if e.g. memory corruption # makes fenced output any kind of 8 bit value - while still interesting # for debugging and we'd still like the regression-test to go over the # full set of test-cases with open(self.logpath, 'rt', encoding = "ISO-8859-1") as logfile: for line in logfile.readlines(): self._daemon_output += line if self.verbose: print("Daemon Output Start") print(self._daemon_output) print("Daemon Output End") def print_result(self, filler): """ Print the result of the last test execution """ print("%s%s" % (filler, self._result_txt)) def run(self): """ Execute this test """ i = 1 self.start_environment() if self.verbose: print("\n--- START TEST - %s" % self.name) self._result_txt = "SUCCESS - '%s'" % (self.name) self.exitcode = ExitStatus.OK for cmd in self._cmds: try: self.run_cmd(cmd) except ExitCodeError as e: print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode'])) self.set_error(i, cmd) break except OutputNotFoundError as e: print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e)) self.set_error(i, cmd) break except OutputFoundError as e: print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e)) self.set_error(i, cmd) break except XmlValidationError as e: print("Step %d FAILED - xmllint failed: %s" % (i, e)) self.set_error(i, cmd) break if self.verbose: print("Step %d SUCCESS" % (i)) i += 1 self.clean_environment() if self.exitcode == ExitStatus.OK: self._match_log_patterns() print(self._result_txt) if self.verbose: print("--- END TEST - %s\n" % self.name) self.executed = True def run_cmd(self, args): """ Execute a command as part of this test """ cmd = shlex.split(args['args']) cmd.insert(0, args['cmd']) if self.verbose: print("\n\nRunning: %s" % " ".join(cmd)) # FIXME: Using "with" here breaks fencing merge tests. # pylint: disable=consider-using-with if args['env']: new_env = os.environ.copy() new_env.update(args['env']) test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=new_env) else: test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if args['kill']: if self.verbose: print("Also running: %s" % args['kill']) ### Typically, the kill argument is used to detect some sort of ### failure. Without yielding for a few seconds here, the process ### launched earlier that is listening for the failure may not have ### time to connect to pacemaker-execd. time.sleep(2) subprocess.Popen(shlex.split(args['kill'])) if not args['no_wait']: test.wait() else: return ExitStatus.OK output = pipe_communicate(test, check_stderr=args['check_stderr']) if self.verbose: print(output) if test.returncode != args['expected_exitcode']: raise ExitCodeError(test.returncode) if args['stdout_match'] is not None and \ re.search(args['stdout_match'], output) is None: raise OutputNotFoundError(output) if args['stdout_negative_match'] is not None and \ re.search(args['stdout_negative_match'], output) is not None: raise OutputFoundError(output) if args['validate']: if args['check_rng']: rng_file = "%s/api/api-result.rng" % rng_directory() else: rng_file = None cmd = find_validator(rng_file) if not cmd: raise XmlValidationError("Could not find validator for %s" % rng_file) if self.verbose: print("\nRunning: %s" % " ".join(cmd)) with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator: output = pipe_communicate(validator, check_stderr=True, stdin=output) if self.verbose: print(output) if validator.returncode != 0: raise XmlValidationError(output) return ExitStatus.OK def set_error(self, step, cmd): """ Record failure of this test """ msg = "FAILURE - '%s' failed at step %d. Command: %s %s" self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args']) self.exitcode = ExitStatus.ERROR def start_environment(self): """ Prepare the host for executing a test """ if os.path.exists(self.logpath): os.remove(self.logpath) self._kill_daemons() self._start_daemons() logfile = None init_time = time.time() update_time = init_time while True: # FIXME: Eventually use 'with' here, which seems complicated given # everything happens in a loop. # pylint: disable=consider-using-with time.sleep(0.1) if not self.force_wait and logfile is None \ and os.path.exists(self.logpath): logfile = io.open(self.logpath, 'rt', encoding = "ISO-8859-1") if not self.force_wait and logfile is not None: for line in logfile.readlines(): if "successfully started" in line: return now = time.time() if self.timeout > 0 and (now - init_time) >= self.timeout: if not self.force_wait: print("\tDaemon %s doesn't seem to have been initialized within %fs." "\n\tConsider specifying a longer '--timeout' value." %(self._daemon_location, self.timeout)) return if self.verbose and (now - update_time) >= 5: print("Waiting for %s to be initialized: %fs ..." %(self._daemon_location, now - init_time)) update_time = now class Tests: """ The base class for a collection of regression tests """ def __init__(self, **kwargs): """ Create a new Tests instance. This method must be provided by all subclasses, which must call Tests.__init__ first. Keywork arguments: force_wait -- logdir -- The base directory under which to create a directory to store output and temporary data. timeout -- How long to wait for the test to complete. verbose -- Whether to print additional information, including verbose command output and daemon log files. """ self.force_wait = kwargs.get("force_wait", False) self.logdir = kwargs.get("logdir", "/tmp") self.timeout = kwargs.get("timeout", 2) self.verbose = kwargs.get("verbose", False) self._tests = [] def exit(self): """ Exit (with error status code if any test failed) """ for test in self._tests: if not test.executed: continue if test.exitcode != ExitStatus.OK: sys.exit(ExitStatus.ERROR) sys.exit(ExitStatus.OK) def print_list(self): """ List all registered tests """ print("\n==== %d TESTS FOUND ====" % len(self._tests)) print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION")) print("%35s - %s" % ("--------------------", "--------------------")) for test in self._tests: print("%35s - %s" % (test.name, test.description)) print("==== END OF LIST ====\n") def print_results(self): """ Print summary of results of executed tests """ failures = 0 success = 0 print("\n\n======= FINAL RESULTS ==========") print("\n--- FAILURE RESULTS:") for test in self._tests: if not test.executed: continue if test.exitcode != ExitStatus.OK: failures += 1 test.print_result(" ") else: success += 1 if failures == 0: print(" None") print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures)) def run_single(self, name): """ Run a single named test """ for test in self._tests: if test.name == name: test.run() break def run_tests(self): """ Run all tests """ for test in self._tests: test.run() def run_tests_matching(self, pattern): """ Run all tests whose name matches a pattern """ for test in self._tests: if test.name.count(pattern) != 0: test.run()