diff --git a/cts/lab/CTSaudits.py b/cts/lab/CTSaudits.py deleted file mode 100755 index 51a04f8c19..0000000000 --- a/cts/lab/CTSaudits.py +++ /dev/null @@ -1,879 +0,0 @@ -""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) -""" - -__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" -__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" - -import time, re, uuid - -from pacemaker.buildoptions import BuildOptions -from pacemaker._cts.watcher import LogKind, LogWatcher - -class ClusterAudit(object): - - def __init__(self, cm): - self.CM = cm - - def __call__(self): - raise ValueError("Abstract Class member (__call__)") - - def is_applicable(self): - '''Return TRUE if we are applicable in the current test configuration''' - raise ValueError("Abstract Class member (is_applicable)") - return 1 - - def log(self, args): - self.CM.log("audit: %s" % args) - - def debug(self, args): - self.CM.debug("audit: %s" % args) - - def name(self): - raise ValueError("Abstract Class member (name)") - -AllAuditClasses = [ ] - - -class LogAudit(ClusterAudit): - - def name(self): - return "LogAudit" - - def __init__(self, cm): - self.CM = cm - - def RestartClusterLogging(self, nodes=None): - if not nodes: - nodes = self.CM.Env["nodes"] - - self.CM.debug("Restarting logging on: %s" % repr(nodes)) - - for node in nodes: - if self.CM.Env["have_systemd"]: - (rc, _) = self.CM.rsh(node, "systemctl stop systemd-journald.socket") - if rc != 0: - self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) - - (rc, _) = self.CM.rsh(node, "systemctl start systemd-journald.service") - if rc != 0: - self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) - - (rc, _) = self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"]) - if rc != 0: - self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node)) - - def _create_watcher(self, patterns, kind): - watch = LogWatcher(self.CM.Env["LogFileName"], patterns, - self.CM.Env["nodes"], kind, "LogAudit", 5, - silent=True) - watch.set_watch() - return watch - - def TestLogging(self): - patterns = [] - prefix = "Test message from" - suffix = str(uuid.uuid4()) - watch = {} - - for node in self.CM.Env["nodes"]: - # Look for the node name in two places to make sure - # that syslog is logging with the correct hostname - m = re.search("^([^.]+).*", node) - if m: - simple = m.group(1) - else: - simple = node - patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) - - watch_pref = self.CM.Env["LogWatcher"] - if watch_pref == LogKind.ANY: - kinds = [ LogKind.FILE ] - if self.CM.Env["have_systemd"]: - kinds += [ LogKind.JOURNAL ] - kinds += [ LogKind.REMOTE_FILE ] - for k in kinds: - watch[k] = self._create_watcher(patterns, k) - self.CM.log("Logging test message with identifier %s" % (suffix)) - else: - watch[watch_pref] = self._create_watcher(patterns, watch_pref) - - for node in self.CM.Env["nodes"]: - cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix) - - (rc, _) = self.CM.rsh(node, cmd, synchronous=False, verbose=0) - if rc != 0: - self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) - - for k in list(watch.keys()): - w = watch[k] - if watch_pref == LogKind.ANY: - self.CM.log("Checking for test message in %s logs" % (k)) - w.look_for_all(silent=True) - if w.unmatched: - for regex in w.unmatched: - self.CM.log("Test message [%s] not found in %s logs" % (regex, w.kind)) - else: - if watch_pref == LogKind.ANY: - self.CM.log("Found test message in %s logs" % (k)) - self.CM.Env["LogWatcher"] = k - return 1 - - return 0 - - def __call__(self): - max = 3 - attempt = 0 - - self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"]) - while attempt <= max and self.TestLogging() == 0: - attempt = attempt + 1 - self.RestartClusterLogging() - time.sleep(60*attempt) - - if attempt > max: - self.CM.log("ERROR: Cluster logging unrecoverable.") - return 0 - - return 1 - - def is_applicable(self): - if self.CM.Env["DoBSC"]: - return 0 - if self.CM.Env["LogAuditDisabled"]: - return 0 - return 1 - - -class DiskAudit(ClusterAudit): - - def name(self): - return "DiskspaceAudit" - - def __init__(self, cm): - self.CM = cm - - def __call__(self): - result = 1 - # @TODO Use directory of PCMK_logfile if set on host - dfcmd = "df -BM " + BuildOptions.LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'" - - self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"]) - for node in self.CM.Env["nodes"]: - (_, dfout) = self.CM.rsh(node, dfcmd, verbose=1) - if not dfout: - self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) - else: - dfout = dfout[0].strip() - - try: - (used, remain) = dfout.split() - used_percent = int(used) - remaining_mb = int(remain) - except (ValueError, TypeError): - self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]" - % (dfout, node, used, remain)) - else: - if remaining_mb < 10 or used_percent > 95: - self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" - % (node, used_percent, remaining_mb)) - result = None - if self.CM.Env["continue"]: - answer = "Y" - else: - try: - answer = input('Continue? [nY]') - except EOFError as e: - answer = "n" - - if answer and answer == "n": - raise ValueError("Disk full on %s" % (node)) - - elif remaining_mb < 100 or used_percent > 90: - self.CM.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) - return result - - def is_applicable(self): - if self.CM.Env["DoBSC"]: - return 0 - return 1 - - -class FileAudit(ClusterAudit): - - def name(self): - return "FileAudit" - - def __init__(self, cm): - self.CM = cm - self.known = [] - - def __call__(self): - result = 1 - - self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"]) - for node in self.CM.Env["nodes"]: - - (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) - for line in lsout: - line = line.strip() - if line not in self.known: - result = 0 - self.known.append(line) - self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line)) - - (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) - for line in lsout: - line = line.strip() - if line not in self.known: - result = 0 - self.known.append(line) - self.CM.log("Warning: Corosync core file on %s: %s" % (node, line)) - - if node in self.CM.ShouldBeStatus and self.CM.ShouldBeStatus[node] == "down": - clean = 0 - (_, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) - for line in lsout: - result = 0 - clean = 1 - self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line)) - - if clean: - (_, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) - for line in lsout: - self.CM.debug("ps[%s]: %s" % (node, line)) - - self.CM.rsh(node, "rm -rf /dev/shm/qb-*") - - else: - self.CM.debug("Skipping %s" % node) - - return result - - def is_applicable(self): - return 1 - - -class AuditResource(object): - def __init__(self, cm, line): - fields = line.split() - self.CM = cm - self.line = line - self.type = fields[1] - self.id = fields[2] - self.clone_id = fields[3] - self.parent = fields[4] - self.rprovider = fields[5] - self.rclass = fields[6] - self.rtype = fields[7] - self.host = fields[8] - self.needs_quorum = fields[9] - self.flags = int(fields[10]) - self.flags_s = fields[11] - - if self.parent == "NA": - self.parent = None - - def unique(self): - if self.flags & int("0x00000020", 16): - return 1 - return 0 - - def orphan(self): - if self.flags & int("0x00000001", 16): - return 1 - return 0 - - def managed(self): - if self.flags & int("0x00000002", 16): - return 1 - return 0 - - -class AuditConstraint(object): - def __init__(self, cm, line): - fields = line.split() - self.CM = cm - self.line = line - self.type = fields[1] - self.id = fields[2] - self.rsc = fields[3] - self.target = fields[4] - self.score = fields[5] - self.rsc_role = fields[6] - self.target_role = fields[7] - - if self.rsc_role == "NA": - self.rsc_role = None - if self.target_role == "NA": - self.target_role = None - - -class PrimitiveAudit(ClusterAudit): - def name(self): - return "PrimitiveAudit" - - def __init__(self, cm): - self.CM = cm - - def doResourceAudit(self, resource, quorum): - rc = 1 - active = self.CM.ResourceLocation(resource.id) - - if len(active) == 1: - if quorum: - self.debug("Resource %s active on %s" % (resource.id, repr(active))) - - elif resource.needs_quorum == 1: - self.CM.log("Resource %s active without quorum: %s" - % (resource.id, repr(active))) - rc = 0 - - elif not resource.managed(): - self.CM.log("Resource %s not managed. Active on %s" - % (resource.id, repr(active))) - - elif not resource.unique(): - # TODO: Figure out a clever way to actually audit these resource types - if len(active) > 1: - self.debug("Non-unique resource %s is active on: %s" - % (resource.id, repr(active))) - else: - self.debug("Non-unique resource %s is not active" % resource.id) - - elif len(active) > 1: - self.CM.log("Resource %s is active multiple times: %s" - % (resource.id, repr(active))) - rc = 0 - - elif resource.orphan(): - self.debug("Resource %s is an inactive orphan" % resource.id) - - elif len(self.inactive_nodes) == 0: - self.CM.log("WARN: Resource %s not served anywhere" % resource.id) - rc = 0 - - elif self.CM.Env["warn-inactive"]: - if quorum or not resource.needs_quorum: - self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" - % (resource.id, repr(self.inactive_nodes))) - else: - self.debug("Resource %s not served anywhere (Inactive nodes: %s)" - % (resource.id, repr(self.inactive_nodes))) - - elif quorum or not resource.needs_quorum: - self.debug("Resource %s not served anywhere (Inactive nodes: %s)" - % (resource.id, repr(self.inactive_nodes))) - - return rc - - def setup(self): - self.target = None - self.resources = [] - self.constraints = [] - self.active_nodes = [] - self.inactive_nodes = [] - - for node in self.CM.Env["nodes"]: - if self.CM.ShouldBeStatus[node] == "up": - self.active_nodes.append(node) - else: - self.inactive_nodes.append(node) - - for node in self.CM.Env["nodes"]: - if self.target == None and self.CM.ShouldBeStatus[node] == "up": - self.target = node - - if not self.target: - # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource - # with CIB_file=/path/to/cib.xml even when the cluster isn't running - self.debug("No nodes active - skipping %s" % self.name()) - return 0 - - (_, lines) = self.CM.rsh(self.target, "crm_resource -c", verbose=1) - - for line in lines: - if re.search("^Resource", line): - self.resources.append(AuditResource(self.CM, line)) - elif re.search("^Constraint", line): - self.constraints.append(AuditConstraint(self.CM, line)) - else: - self.CM.log("Unknown entry: %s" % line); - - return 1 - - def __call__(self): - rc = 1 - - if not self.setup(): - return 1 - - quorum = self.CM.HasQuorum(None) - for resource in self.resources: - if resource.type == "primitive": - if self.doResourceAudit(resource, quorum) == 0: - rc = 0 - return rc - - def is_applicable(self): - # @TODO Due to long-ago refactoring, this name test would never match, - # so this audit (and those derived from it) would never run. - # Uncommenting the next lines fixes the name test, but that then - # exposes pre-existing bugs that need to be fixed. - #if self.CM["Name"] == "crm-corosync": - # return 1 - return 0 - - -class GroupAudit(PrimitiveAudit): - def name(self): - return "GroupAudit" - - def __call__(self): - rc = 1 - if not self.setup(): - return 1 - - for group in self.resources: - if group.type == "group": - first_match = 1 - group_location = None - for child in self.resources: - if child.parent == group.id: - nodes = self.CM.ResourceLocation(child.id) - - if first_match and len(nodes) > 0: - group_location = nodes[0] - - first_match = 0 - - if len(nodes) > 1: - rc = 0 - self.CM.log("Child %s of %s is active more than once: %s" - % (child.id, group.id, repr(nodes))) - - elif len(nodes) == 0: - # Groups are allowed to be partially active - # However we do need to make sure later children aren't running - group_location = None - self.debug("Child %s of %s is stopped" % (child.id, group.id)) - - elif nodes[0] != group_location: - rc = 0 - self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s" - % (child.id, group.id, nodes[0], group_location)) - else: - self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) - - return rc - - -class CloneAudit(PrimitiveAudit): - def name(self): - return "CloneAudit" - - def __call__(self): - rc = 1 - if not self.setup(): - return 1 - - for clone in self.resources: - if clone.type == "clone": - for child in self.resources: - if child.parent == clone.id and child.type == "primitive": - self.debug("Checking child %s of %s..." % (child.id, clone.id)) - # Check max and node_max - # Obtain with: - # crm_resource -g clone_max --meta -r child.id - # crm_resource -g clone_node_max --meta -r child.id - - return rc - - -class ColocationAudit(PrimitiveAudit): - def name(self): - return "ColocationAudit" - - def crm_location(self, resource): - (rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, verbose=1) - hosts = [] - if rc == 0: - for line in lines: - fields = line.split() - hosts.append(fields[0]) - - return hosts - - def __call__(self): - rc = 1 - if not self.setup(): - return 1 - - for coloc in self.constraints: - if coloc.type == "rsc_colocation": - source = self.crm_location(coloc.rsc) - target = self.crm_location(coloc.target) - if len(source) == 0: - self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) - else: - for node in source: - if not node in target: - rc = 0 - self.CM.log("Colocation audit (%s): %s running on %s (not in %s)" - % (coloc.id, coloc.rsc, node, repr(target))) - else: - self.debug("Colocation audit (%s): %s running on %s (in %s)" - % (coloc.id, coloc.rsc, node, repr(target))) - - return rc - - -class ControllerStateAudit(ClusterAudit): - def __init__(self, cm): - self.CM = cm - self.Stats = {"calls":0 - , "success":0 - , "failure":0 - , "skipped":0 - , "auditfail":0} - - def has_key(self, key): - return key in self.Stats - - def __setitem__(self, key, value): - self.Stats[key] = value - - def __getitem__(self, key): - return self.Stats[key] - - def incr(self, name): - '''Increment (or initialize) the value associated with the given name''' - if not name in self.Stats: - self.Stats[name] = 0 - self.Stats[name] = self.Stats[name]+1 - - def __call__(self): - passed = 1 - up_are_down = 0 - down_are_up = 0 - unstable_list = [] - - for node in self.CM.Env["nodes"]: - should_be = self.CM.ShouldBeStatus[node] - rc = self.CM.test_node_CM(node) - if rc > 0: - if should_be == "down": - down_are_up = down_are_up + 1 - if rc == 1: - unstable_list.append(node) - elif should_be == "up": - up_are_down = up_are_down + 1 - - if len(unstable_list) > 0: - passed = 0 - self.CM.log("Cluster is not stable: %d (of %d): %s" - % (len(unstable_list), self.CM.upcount(), repr(unstable_list))) - - if up_are_down > 0: - passed = 0 - self.CM.log("%d (of %d) nodes expected to be up were down." - % (up_are_down, len(self.CM.Env["nodes"]))) - - if down_are_up > 0: - passed = 0 - self.CM.log("%d (of %d) nodes expected to be down were up." - % (down_are_up, len(self.CM.Env["nodes"]))) - - return passed - - def name(self): - return "ControllerStateAudit" - - def is_applicable(self): - # @TODO Due to long-ago refactoring, this name test would never match, - # so this audit (and those derived from it) would never run. - # Uncommenting the next lines fixes the name test, but that then - # exposes pre-existing bugs that need to be fixed. - #if self.CM["Name"] == "crm-corosync": - # return 1 - return 0 - - -class CIBAudit(ClusterAudit): - def __init__(self, cm): - self.CM = cm - self.Stats = {"calls":0 - , "success":0 - , "failure":0 - , "skipped":0 - , "auditfail":0} - - def has_key(self, key): - return key in self.Stats - - def __setitem__(self, key, value): - self.Stats[key] = value - - def __getitem__(self, key): - return self.Stats[key] - - def incr(self, name): - '''Increment (or initialize) the value associated with the given name''' - if not name in self.Stats: - self.Stats[name] = 0 - self.Stats[name] = self.Stats[name]+1 - - def __call__(self): - passed = 1 - ccm_partitions = self.CM.find_partitions() - - if len(ccm_partitions) == 0: - self.debug("\tNo partitions to audit") - return 1 - - for partition in ccm_partitions: - self.debug("\tAuditing CIB consistency for: %s" % partition) - partition_passed = 0 - if self.audit_cib_contents(partition) == 0: - passed = 0 - - return passed - - def audit_cib_contents(self, hostlist): - passed = 1 - node0 = None - node0_xml = None - - partition_hosts = hostlist.split() - for node in partition_hosts: - node_xml = self.store_remote_cib(node, node0) - - if node_xml == None: - self.CM.log("Could not perform audit: No configuration from %s" % node) - passed = 0 - - elif node0 == None: - node0 = node - node0_xml = node_xml - - elif node0_xml == None: - self.CM.log("Could not perform audit: No configuration from %s" % node0) - passed = 0 - - else: - (rc, result) = self.CM.rsh( - node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) - - if rc != 0: - self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) - passed = 0 - - for line in result: - if not re.search("", line): - passed = 0 - self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) - else: - self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) - -# self.CM.rsh(node0, "rm -f %s" % node_xml) -# self.CM.rsh(node0, "rm -f %s" % node0_xml) - return passed - - def store_remote_cib(self, node, target): - combined = "" - filename = "/tmp/ctsaudit.%s.xml" % node - - if not target: - target = node - - (rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], verbose=1) - if rc != 0: - self.CM.log("Could not retrieve configuration") - return None - - self.CM.rsh("localhost", "rm -f %s" % filename) - for line in lines: - self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) - - if self.CM.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: - self.CM.log("Could not store configuration") - return None - return filename - - def name(self): - return "CibAudit" - - def is_applicable(self): - # @TODO Due to long-ago refactoring, this name test would never match, - # so this audit (and those derived from it) would never run. - # Uncommenting the next lines fixes the name test, but that then - # exposes pre-existing bugs that need to be fixed. - #if self.CM["Name"] == "crm-corosync": - # return 1 - return 0 - - -class PartitionAudit(ClusterAudit): - def __init__(self, cm): - self.CM = cm - self.Stats = {"calls":0 - , "success":0 - , "failure":0 - , "skipped":0 - , "auditfail":0} - self.NodeEpoch = {} - self.NodeState = {} - self.NodeQuorum = {} - - def has_key(self, key): - return key in self.Stats - - def __setitem__(self, key, value): - self.Stats[key] = value - - def __getitem__(self, key): - return self.Stats[key] - - def incr(self, name): - '''Increment (or initialize) the value associated with the given name''' - if not name in self.Stats: - self.Stats[name] = 0 - self.Stats[name] = self.Stats[name]+1 - - def __call__(self): - passed = 1 - ccm_partitions = self.CM.find_partitions() - - if ccm_partitions == None or len(ccm_partitions) == 0: - return 1 - - self.CM.cluster_stable(double_check=True) - - if len(ccm_partitions) != self.CM.partitions_expected: - self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) - passed = 0 - for partition in ccm_partitions: - self.CM.log("\t %s" % partition) - - for partition in ccm_partitions: - partition_passed = 0 - if self.audit_partition(partition) == 0: - passed = 0 - - return passed - - def trim_string(self, avalue): - if not avalue: - return None - if len(avalue) > 1: - return avalue[:-1] - - def trim2int(self, avalue): - if not avalue: - return None - if len(avalue) > 1: - return int(avalue[:-1]) - - def audit_partition(self, partition): - passed = 1 - dc_found = [] - dc_allowed_list = [] - lowest_epoch = None - node_list = partition.split() - - self.debug("Auditing partition: %s" % (partition)) - for node in node_list: - if self.CM.ShouldBeStatus[node] != "up": - self.CM.log("Warn: Node %s appeared out of nowhere" % (node)) - self.CM.ShouldBeStatus[node] = "up" - # not in itself a reason to fail the audit (not what we're - # checking for in this audit) - - (_, out) = self.CM.rsh(node, self.CM["StatusCmd"] % node, verbose=1) - self.NodeState[node] = out[0].strip() - - (_, out) = self.CM.rsh(node, self.CM["EpochCmd"], verbose=1) - self.NodeEpoch[node] = out[0].strip() - - (_, out) = self.CM.rsh(node, self.CM["QuorumCmd"], verbose=1) - self.NodeQuorum[node] = out[0].strip() - - self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoch[node], self.NodeQuorum[node])) - self.NodeState[node] = self.trim_string(self.NodeState[node]) - self.NodeEpoch[node] = self.trim2int(self.NodeEpoch[node]) - self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) - - if not self.NodeEpoch[node]: - self.CM.log("Warn: Node %s dissappeared: cant determin epoch" % (node)) - self.CM.ShouldBeStatus[node] = "down" - # not in itself a reason to fail the audit (not what we're - # checking for in this audit) - elif lowest_epoch == None or self.NodeEpoch[node] < lowest_epoch: - lowest_epoch = self.NodeEpoch[node] - - if not lowest_epoch: - self.CM.log("Lowest epoch not determined in %s" % (partition)) - passed = 0 - - for node in node_list: - if self.CM.ShouldBeStatus[node] == "up": - if self.CM.is_node_dc(node, self.NodeState[node]): - dc_found.append(node) - if self.NodeEpoch[node] == lowest_epoch: - self.debug("%s: OK" % node) - elif not self.NodeEpoch[node]: - self.debug("Check on %s ignored: no node epoch" % node) - elif not lowest_epoch: - self.debug("Check on %s ignored: no lowest epoch" % node) - else: - self.CM.log("DC %s is not the oldest node (%d vs. %d)" - % (node, self.NodeEpoch[node], lowest_epoch)) - passed = 0 - - if len(dc_found) == 0: - self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" - % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) - - elif len(dc_found) > 1: - self.CM.log("%d DCs (%s) found in cluster partition: %s" - % (len(dc_found), str(dc_found), str(node_list))) - passed = 0 - - if passed == 0: - for node in node_list: - if self.CM.ShouldBeStatus[node] == "up": - self.CM.log("epoch %s : %s" - % (self.NodeEpoch[node], self.NodeState[node])) - - return passed - - def name(self): - return "PartitionAudit" - - def is_applicable(self): - # @TODO Due to long-ago refactoring, this name test would never match, - # so this audit (and those derived from it) would never run. - # Uncommenting the next lines fixes the name test, but that then - # exposes pre-existing bugs that need to be fixed. - #if self.CM["Name"] == "crm-corosync": - # return 1 - return 0 - -AllAuditClasses.append(DiskAudit) -AllAuditClasses.append(FileAudit) -AllAuditClasses.append(LogAudit) -AllAuditClasses.append(ControllerStateAudit) -AllAuditClasses.append(PartitionAudit) -AllAuditClasses.append(PrimitiveAudit) -AllAuditClasses.append(GroupAudit) -AllAuditClasses.append(CloneAudit) -AllAuditClasses.append(ColocationAudit) -AllAuditClasses.append(CIBAudit) - - -def AuditList(cm): - result = [] - for auditclass in AllAuditClasses: - a = auditclass(cm) - if a.is_applicable(): - result.append(a) - return result diff --git a/cts/lab/CTSlab.py.in b/cts/lab/CTSlab.py.in index 71d3215a67..8734d3644f 100644 --- a/cts/lab/CTSlab.py.in +++ b/cts/lab/CTSlab.py.in @@ -1,137 +1,137 @@ #!@PYTHON@ """ Command-line interface to Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2001-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import sys, signal, os pdir = os.path.dirname(sys.path[0]) sys.path.insert(0, pdir) # So that things work from the source directory try: from cts.CM_corosync import * - from cts.CTSaudits import AuditList from cts.CTStests import TestList from cts.CTSscenarios import * from pacemaker._cts.CTS import CtsLab + from pacemaker._cts.audits import audit_list from pacemaker._cts.logging import LogFactory except ImportError as e: sys.stderr.write("abort: %s\n" % e) sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" % ' '.join(sys.path)) sys.exit(1) # These are globals so they can be used by the signal handler. scenario = None LogFactory().add_stderr() def sig_handler(signum, frame) : LogFactory().log("Interrupted by signal %d"%signum) if scenario: scenario.summarize() if signum == 15 : if scenario: scenario.TearDown() sys.exit(1) def plural_s(n, uppercase=False): if n == 1: return "" elif uppercase: return "S" else: return "s" if __name__ == '__main__': Environment = CtsLab(sys.argv[1:]) NumIter = Environment["iterations"] if NumIter is None: NumIter = 1 Tests = [] # Set the signal handler signal.signal(15, sig_handler) signal.signal(10, sig_handler) # Create the Cluster Manager object cm = None if Environment["Stack"] == "corosync 2+": cm = crm_corosync() else: LogFactory().log("Unknown stack: "+Environment["stack"]) sys.exit(1) if Environment["TruncateLog"]: if Environment["OutputFile"] is None: LogFactory().log("Ignoring truncate request because no output file specified") else: LogFactory().log("Truncating %s" % Environment["OutputFile"]) with open(Environment["OutputFile"], "w") as outputfile: outputfile.truncate(0) - Audits = AuditList(cm) + Audits = audit_list(cm) if Environment["ListTests"]: Tests = TestList(cm, Audits) LogFactory().log("Total %d tests"%len(Tests)) for test in Tests : LogFactory().log(str(test.name)); sys.exit(0) elif len(Environment["tests"]) == 0: Tests = TestList(cm, Audits) else: Chosen = Environment["tests"] for TestCase in Chosen: match = None for test in TestList(cm, Audits): if test.name == TestCase: match = test if not match: LogFactory().log("--choose: No applicable/valid tests chosen") sys.exit(1) else: Tests.append(match) # Scenario selection if Environment["scenario"] == "basic-sanity": scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests) elif Environment["scenario"] == "all-once": NumIter = len(Tests) scenario = AllOnce( cm, [ BootCluster(Environment) ], Audits, Tests) elif Environment["scenario"] == "sequence": scenario = Sequence( cm, [ BootCluster(Environment) ], Audits, Tests) elif Environment["scenario"] == "boot": scenario = Boot(cm, [ LeaveBooted(Environment)], Audits, []) else: scenario = RandomTests( cm, [ BootCluster(Environment) ], Audits, Tests) LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TEST" + plural_s(NumIter, True) + " ") LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"])) LogFactory().log("Schema: %s" % Environment["Schema"]) LogFactory().log("Scenario: %s" % scenario.__doc__) LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"]) LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"]) LogFactory().log("Random Seed: %s" % Environment["RandSeed"]) LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip()) LogFactory().log("System log files: %s" % Environment["LogFileName"]) if Environment.has_key("IPBase"): LogFactory().log("Base IP for resources: %s" % Environment["IPBase"]) LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"]) Environment.dump() rc = Environment.run(scenario, NumIter) sys.exit(rc) diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py index 37cb094cbe..1d17f448ac 100644 --- a/cts/lab/CTSscenarios.py +++ b/cts/lab/CTSscenarios.py @@ -1,563 +1,563 @@ """ Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import sys import time from cts.CTStests import CTSTest -from cts.CTSaudits import ClusterAudit +from pacemaker._cts.audits import ClusterAudit from pacemaker._cts.watcher import LogWatcher class ScenarioComponent(object): def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return True if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' raise ValueError("Abstract Class member (IsApplicable)") def SetUp(self, CM): '''Set up the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") class Scenario(object): ( '''The basic idea of a scenario is that of an ordered list of ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, and then after the tests have been run, they are torn down using TearDown() (in reverse order). A Scenario is applicable to a particular cluster manager iff each ScenarioComponent is applicable. A partially set up scenario is torn down if it fails during setup. ''') def __init__(self, ClusterManager, Components, Audits, Tests): "Initialize the Scenario from the list of ScenarioComponents" self.ClusterManager = ClusterManager self.Components = Components self.Audits = Audits self.Tests = Tests self.BadNews = None self.TestSets = [] self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0} self.Sets = [] #self.ns=CTS.NodeStatus(self.Env) for comp in Components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in Tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def IsApplicable(self): ( '''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() ''' ) for comp in self.Components: if not comp.IsApplicable(): return None return True def SetUp(self): '''Set up the Scenario. Return TRUE on success.''' self.ClusterManager.prepare() self.audit() # Also detects remote/local log config self.ClusterManager.ns.wait_for_all_nodes(self.ClusterManager.Env["nodes"]) self.audit() self.ClusterManager.install_support() self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"], self.ClusterManager.templates.get_patterns("BadNews"), self.ClusterManager.Env["nodes"], self.ClusterManager.Env["LogWatcher"], "BadNews", 0) self.BadNews.set_watch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self.Components): if not self.Components[j].SetUp(self.ClusterManager): # OOPS! We failed. Tear partial setups down. self.audit() self.ClusterManager.log("Tearing down partial setup") self.TearDown(j) return None j = j + 1 self.audit() return 1 def TearDown(self, max=None): '''Tear Down the Scenario - in reverse order.''' if max == None: max = len(self.Components)-1 j = max while j >= 0: self.Components[j].TearDown(self.ClusterManager) j = j - 1 self.audit() self.ClusterManager.install_support("uninstall") def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def run(self, Iterations): self.ClusterManager.oprofileStart() try: self.run_loop(Iterations) self.ClusterManager.oprofileStop() except: self.ClusterManager.oprofileStop() raise def run_loop(self, Iterations): raise ValueError("Abstract Class member (run_loop)") def run_test(self, test, testcount): nodechoice = self.ClusterManager.Env.random_node() ret = 1 where = "" did_run = 0 self.ClusterManager.instance_errorstoignore_clear() self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]") starttime = test.set_timer() if not test.setup(nodechoice): self.ClusterManager.log("Setup failed") ret = 0 elif not test.canrunnow(nodechoice): self.ClusterManager.log("Skipped") test.skipped() else: did_run = 1 ret = test(nodechoice) if not test.teardown(nodechoice): self.ClusterManager.log("Teardown failed") if self.ClusterManager.Env["continue"]: answer = "Y" else: try: answer = input('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) ret = 0 stoptime = time.time() self.ClusterManager.oprofileSave(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if not test["min_time"]: test["elapsed_time"] = elapsed_time test["min_time"] = test_time test["max_time"] = test_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if test_time < test["min_time"]: test["min_time"] = test_time if test_time > test["max_time"]: test["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self.ClusterManager.statall() did_run = 1 # Force the test count to be incremented anyway so test extraction works self.audit(test.errorstoignore()) return did_run def summarize(self): self.ClusterManager.log("****************") self.ClusterManager.log("Overall Results:" + repr(self.Stats)) self.ClusterManager.log("****************") stat_filter = { "calls":0, "failure":0, "skipped":0, "auditfail":0, } self.ClusterManager.log("Test Summary") for test in self.Tests: for key in list(stat_filter.keys()): stat_filter[key] = test.Stats[key] self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter)) self.ClusterManager.debug("Detailed Results") for test in self.Tests: self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats)) self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, LocalIgnore=[]): errcount = 0 ignorelist = [] ignorelist.append("CTS:") ignorelist.extend(LocalIgnore) ignorelist.extend(self.ClusterManager.errorstoignore()) ignorelist.extend(self.ClusterManager.instance_errorstoignore()) # This makes sure everything is stabilized before starting... failed = 0 for audit in self.Audits: if not audit(): - self.ClusterManager.log("Audit " + audit.name() + " FAILED.") + self.ClusterManager.log("Audit " + audit.name + " FAILED.") failed += 1 else: - self.ClusterManager.debug("Audit " + audit.name() + " passed.") + self.ClusterManager.debug("Audit " + audit.name + " passed.") while errcount < 1000: match = None if self.BadNews: match = self.BadNews.look(0) if match: add_err = 1 for ignore in ignorelist: if add_err == 1 and re.search(ignore, match): add_err = 0 if add_err == 1: self.ClusterManager.log("BadNews: " + match) self.incr("BadNews") errcount = errcount + 1 else: break else: if self.ClusterManager.Env["continue"]: answer = "Y" else: try: answer = input('Big problems. Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": self.ClusterManager.log("Shutting down.") self.summarize() self.TearDown() raise ValueError("Looks like we hit a BadNews jackpot!") if self.BadNews: self.BadNews.end() return failed class AllOnce(Scenario): '''Every Test Once''' # Accessable as __doc__ def run_loop(self, Iterations): testcount = 1 for test in self.Tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): '''Random Test Execution''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: test = self.ClusterManager.Env.random_gen.choice(self.Tests) self.run_test(test, testcount) testcount += 1 class BasicSanity(Scenario): '''Basic Cluster Sanity''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: test = self.Environment.random_gen.choice(self.Tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): '''Named Tests in Sequence''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: for test in self.Tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): '''Start the Cluster''' def run_loop(self, Iterations): testcount = 0 class BootCluster(ScenarioComponent): ( '''BootCluster is the most basic of ScenarioComponents. This ScenarioComponent simply starts the cluster manager on all the nodes. It is fairly robust as it waits for all nodes to come up before starting as they might have been rebooted or crashed for some reason beforehand. ''') def __init__(self, Env): pass def IsApplicable(self): '''BootCluster is so generic it is always Applicable''' return True def SetUp(self, CM): '''Basic Cluster Manager startup. Start everything''' CM.prepare() # Clear out the cobwebs ;-) CM.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all nodes.") return CM.startall(verbose=True, quick=True) def TearDown(self, CM, force=False): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Stopping Cluster Manager on all nodes") return CM.stopall(verbose=True, force=force) class LeaveBooted(BootCluster): def TearDown(self, CM): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Leaving Cluster running on all nodes") return 1 class PingFest(ScenarioComponent): ( '''PingFest does a flood ping to each node in the cluster from the test machine. If the LabEnvironment Parameter PingSize is set, it will be used as the size of ping packet requested (via the -s option). If it is not set, it defaults to 1024 bytes. According to the manual page for ping: Outputs packets as fast as they come back or one hundred times per second, whichever is more. For every ECHO_REQUEST sent a period ``.'' is printed, while for every ECHO_REPLY received a backspace is printed. This provides a rapid display of how many packets are being dropped. Only the super-user may use this option. This can be very hard on a net- work and should be used with caution. ''' ) def __init__(self, Env): self.Env = Env def IsApplicable(self): '''PingFests are always applicable ;-) ''' return True def SetUp(self, CM): '''Start the PingFest!''' self.PingSize = 1024 if "PingSize" in list(CM.Env.keys()): self.PingSize = CM.Env["PingSize"] CM.log("Starting %d byte flood pings" % self.PingSize) self.PingPids = [] for node in CM.Env["nodes"]: self.PingPids.append(self._pingchild(node)) CM.log("Ping PIDs: " + repr(self.PingPids)) return 1 def TearDown(self, CM): '''Stop it right now! My ears are pinging!!''' for pid in self.PingPids: if pid != None: CM.log("Stopping ping process %d" % pid) os.kill(pid, signal.SIGKILL) def _pingchild(self, node): Args = ["ping", "-qfn", "-s", str(self.PingSize), node] sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid < 0: self.Env.log("Cannot fork ping child") return None if pid > 0: return pid # Otherwise, we're the child process. os.execvp("ping", Args) self.Env.log("Cannot execvp ping: " + repr(Args)) sys.exit(1) class BasicSanityCheck(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["DoBSC"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on BSC node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on BSC node(s).") return CM.stopall() class Benchmark(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["benchmark"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM, force=True) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on all node(s).") return CM.stopall() class RollingUpgrade(ScenarioComponent): ( ''' Test a rolling upgrade between two versions of the stack ''') def __init__(self, Env): self.Env = Env def IsApplicable(self): if not self.Env["rpm-dir"]: return None if not self.Env["current-version"]: return None if not self.Env["previous-version"]: return None return True def install(self, node, version): target_dir = "/tmp/rpm-%s" % version src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version) self.CM.rsh(node, "mkdir -p %s" % target_dir) rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir)) self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir)) return self.success() def upgrade(self, node): return self.install(node, self.CM.Env["current-version"]) def downgrade(self, node): return self.install(node, self.CM.Env["previous-version"]) def SetUp(self, CM): print(repr(self)+"prepare") CM.prepare() # Clear out the cobwebs CM.stopall(force=True) CM.log("Downgrading all nodes to %s." % self.Env["previous-version"]) for node in self.Env["nodes"]: if not self.downgrade(node): CM.log("Couldn't downgrade %s" % node) return None return 1 def TearDown(self, CM): # Stop everything CM.log("Stopping Cluster Manager on Upgrade nodes.") CM.stopall() CM.log("Upgrading all nodes to %s." % self.Env["current-version"]) for node in self.Env["nodes"]: if not self.upgrade(node): CM.log("Couldn't upgrade %s" % node) return None return 1 diff --git a/cts/lab/CTStests.py b/cts/lab/CTStests.py index 7a85be55e6..f09a1e9584 100644 --- a/cts/lab/CTStests.py +++ b/cts/lab/CTStests.py @@ -1,3178 +1,3178 @@ """ Test-specific classes for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" # # SPECIAL NOTE: # # Tests may NOT implement any cluster-manager-specific code in them. # EXTEND the ClusterManager object to provide the base capabilities # the test needs if you need to do something that the current CM classes # do not. Otherwise you screw up the whole point of the object structure # in CTS. # # Thank you. # import os import re import time import subprocess import tempfile from stat import * -from cts.CTSaudits import * from pacemaker import BuildOptions from pacemaker._cts.CTS import NodeStatus +from pacemaker._cts.audits import AuditConstraint, AuditResource from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogWatcher AllTestClasses = [ ] class CTSTest(object): ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.Env = EnvFactory().getInstance() self.rsh = RemoteFactory().getInstance() self.logger = LogFactory() self.templates = PatternSelector(cm["Name"]) self.Audits = [] self.timeout = 120 self.passed = 1 self.is_loop = 0 self.is_unsafe = 0 self.is_experimental = 0 self.is_container = 0 self.is_valgrind = 0 self.benchmark = 0 # which tests to benchmark self.timer = {} # timers def log(self, args): self.logger.log(args) def debug(self, args): self.logger.debug(args) def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): if str(key) == "0": raise ValueError("Bad call to 'foo in X', should reference 'foo in X.Stats' instead") if key in self.Stats: return self.Stats[key] return None def log_mark(self, msg): self.debug("MARK: test %s %s %d" % (self.name,msg,time.time())) return def get_timer(self,key = "test"): try: return self.timer[key] except: return 0 def set_timer(self,key = "test"): self.timer[key] = time.time() return self.timer[key] def log_timer(self,key = "test"): elapsed = 0 if key in self.timer: elapsed = time.time() - self.timer[key] s = key == "test" and self.name or "%s:%s" % (self.name,key) self.debug("%s runtime: %.2f" % (s, elapsed)) del self.timer[key] return elapsed def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 # Reset the test passed boolean if name == "calls": self.passed = 1 def failure(self, reason="none"): '''Increment the failure count''' self.passed = 0 self.incr("failure") self.logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason) return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def audit(self): passed = 1 if len(self.Audits) > 0: for audit in self.Audits: if not audit(): - self.logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name())) + self.logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name)) self.incr("auditfail") passed = 0 return passed def setup(self, node): '''Setup the given test''' return self.success() def teardown(self, node): '''Tear down the given test''' return self.success() def create_watch(self, patterns, timeout, name=None): if not name: name = self.name return LogWatcher(self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], name, timeout) def local_badnews(self, prefix, watch, local_ignore=[]): errcount = 0 if not prefix: prefix = "LocalBadNews:" ignorelist = [] ignorelist.append(" CTS: ") ignorelist.append(prefix) ignorelist.extend(local_ignore) while errcount < 100: match = watch.look(0) if match: add_err = 1 for ignore in ignorelist: if add_err == 1 and re.search(ignore, match): add_err = 0 if add_err == 1: self.logger.log(prefix + " " + match) errcount = errcount + 1 else: break else: self.logger.log("Too many errors!") watch.end() return errcount def is_applicable(self): return self.is_applicable_common() def is_applicable_common(self): '''Return True if we are applicable in the current test configuration''' #raise ValueError("Abstract Class member (is_applicable)") if self.is_loop and not self.Env["loop-tests"]: return False elif self.is_unsafe and not self.Env["unsafe-tests"]: return False elif self.is_valgrind and not self.Env["valgrind-tests"]: return False elif self.is_experimental and not self.Env["experimental-tests"]: return False elif self.is_container and not self.Env["container-tests"]: return False elif self.Env["benchmark"] and self.benchmark == 0: return False return True def find_ocfs2_resources(self, node): self.r_o2cb = None self.r_ocfs2 = [] (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self.CM, line) if r.rtype == "o2cb" and r.parent != "NA": self.debug("Found o2cb: %s" % self.r_o2cb) self.r_o2cb = r.parent if re.search("^Constraint", line): c = AuditConstraint(self.CM, line) if c.type == "rsc_colocation" and c.target == self.r_o2cb: self.r_ocfs2.append(c.rsc) self.debug("Found ocfs2 filesystems: %s" % repr(self.r_ocfs2)) return len(self.r_ocfs2) def canrunnow(self, node): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] class StopTest(CTSTest): '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Stop" def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != "up": return self.skipped() patterns = [] # Technically we should always be able to notice ourselves stopping patterns.append(self.templates["Pat:We_stopped"] % node) # Any active node needs to notice this one left # (note that this won't work if we have multiple partitions) for other in self.Env["nodes"]: if self.CM.ShouldBeStatus[other] == "up" and other != node: patterns.append(self.templates["Pat:They_stopped"] %(other, self.CM.key_for_node(node))) #self.debug("Checking %s will notice %s left"%(other, node)) watch = self.create_watch(patterns, self.Env["DeadTime"]) watch.set_watch() if node == self.CM.OurNode: self.incr("us") else: if self.CM.upcount() <= 1: self.incr("all") else: self.incr("them") self.CM.StopaCM(node) watch_result = watch.look_for_all() failreason = None UnmatchedList = "||" if watch.unmatched: (_, output) = self.rsh(node, "/bin/ps axf", verbose=1) for line in output: self.debug(line) (_, output) = self.rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) for line in output: self.debug(line) for regex in watch.unmatched: self.logger.log ("ERROR: Shutdown pattern not found: %s" % (regex)) UnmatchedList += regex + "||"; failreason = "Missing shutdown pattern" self.CM.cluster_stable(self.Env["DeadTime"]) if not watch.unmatched or self.CM.upcount() == 0: return self.success() if len(watch.unmatched) >= self.CM.upcount(): return self.failure("no match against (%s)" % UnmatchedList) if failreason == None: return self.success() else: return self.failure(failreason) # # We don't register StopTest because it's better when called by # another test... # class StartTest(CTSTest): '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name = "start" self.debug = debug def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self.CM.upcount() == 0: self.incr("us") else: self.incr("them") if self.CM.ShouldBeStatus[node] != "down": return self.skipped() elif self.CM.StartaCM(node): return self.success() else: return self.failure("Startup %s on node %s failed" % (self.Env["Name"], node)) # # We don't register StartTest because it's better when called by # another test... # class FlipTest(CTSTest): '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "Flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'Flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == "up": self.incr("stopped") ret = self.stop(node) type = "up->down" # Give the cluster time to recognize it's gone... time.sleep(self.Env["StableTime"]) elif self.CM.ShouldBeStatus[node] == "down": self.incr("started") ret = self.start(node) type = "down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) class RestartTest(CTSTest): '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "Restart" self.start = StartTest(cm) self.stop = StopTest(cm) self.benchmark = 1 def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) ret1 = 1 if self.CM.StataCM(node): self.incr("WasStopped") if not self.start(node): return self.failure("start (setup) failure: "+node) self.set_timer() if not self.stop(node): return self.failure("stop failure: "+node) if not self.start(node): return self.failure("start failure: "+node) return self.success() # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) class StonithdTest(CTSTest): def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Stonithd" self.startall = SimulStartLite(cm) self.benchmark = 1 def __call__(self, node): self.incr("calls") if len(self.Env["nodes"]) < 2: return self.skipped() ret = self.startall(None) if not ret: return self.failure("Setup failed") is_dc = self.CM.is_node_dc(node) watchpats = [] watchpats.append(self.templates["Pat:Fencing_ok"] % node) watchpats.append(self.templates["Pat:NodeFenced"] % node) if not self.Env["at-boot"]: self.debug("Expecting %s to stay down" % node) self.CM.ShouldBeStatus[node] = "down" else: self.debug("Expecting %s to come up again %d" % (node, self.Env["at-boot"])) watchpats.append("%s.* S_STARTING -> S_PENDING" % node) watchpats.append("%s.* S_PENDING -> S_NOT_DC" % node) watch = self.create_watch(watchpats, 30 + self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"]) watch.set_watch() origin = self.Env.random_gen.choice(self.Env["nodes"]) (rc, _) = self.rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node) if rc == 124: # CRM_EX_TIMEOUT # Look for the patterns, usually this means the required # device was running on the node to be fenced - or that # the required devices were in the process of being loaded # and/or moved # # Effectively the node committed suicide so there will be # no confirmation, but pacemaker should be watching and # fence the node again self.logger.log("Fencing command on %s to fence %s timed out" % (origin, node)) elif origin != node and rc != 0: self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() self.debug("Waiting for fenced node to come back up") self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600) self.logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc)) elif origin == node and rc != 255: # 255 == broken pipe, ie. the node was fenced as expected self.logger.log("Locally originated fencing returned %d" % rc) self.set_timer("fence") matched = watch.look_for_all() self.log_timer("fence") self.set_timer("reform") if watch.unmatched: self.logger.log("Patterns not found: " + repr(watch.unmatched)) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() self.debug("Waiting for fenced node to come back up") self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self.CM.cluster_stable(self.Env["StartTime"]) if not matched: return self.failure("Didn't find all expected patterns") elif not is_stable: return self.failure("Cluster did not become stable") self.log_timer("reform") return self.success() def errorstoignore(self): return [ self.templates["Pat:Fencing_start"] % ".*", self.templates["Pat:Fencing_ok"] % ".*", self.templates["Pat:Fencing_active"], r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired", ] def is_applicable(self): if not self.is_applicable_common(): return False if "DoFencing" in list(self.Env.keys()): return self.Env["DoFencing"] return True AllTestClasses.append(StonithdTest) class StartOnebyOne(CTSTest): '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "StartOnebyOne" self.stopall = SimulStopLite(cm) self.start = StartTest(cm) self.ns = NodeStatus(cm.Env) def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Test setup failed") failed = [] self.set_timer() for node in self.Env["nodes"]: if not self.start(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to start: " + repr(failed)) return self.success() # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) class SimulStart(CTSTest): '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SimulStart" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Setup failed") if not self.startall(None): return self.failure("Startall failed") return self.success() # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) class SimulStop(CTSTest): '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SimulStop" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.stopall(None): return self.failure("Stopall failed") return self.success() # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) class StopOnebyOne(CTSTest): '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "StopOnebyOne" self.startall = SimulStartLite(cm) self.stop = StopTest(cm) def __call__(self, dummy): '''Perform the 'StopOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") failed = [] self.set_timer() for node in self.Env["nodes"]: if not self.stop(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to stop: " + repr(failed)) return self.success() # Register StopOnebyOne as a good test to run AllTestClasses.append(StopOnebyOne) class RestartOnebyOne(CTSTest): '''Restart all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "RestartOnebyOne" self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'RestartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") did_fail = [] self.set_timer() self.restart = RestartTest(self.CM) for node in self.Env["nodes"]: if not self.restart(node): did_fail.append(node) if did_fail: return self.failure("Could not restart %d nodes: %s" % (len(did_fail), repr(did_fail))) return self.success() # Register StopOnebyOne as a good test to run AllTestClasses.append(RestartOnebyOne) class PartialStart(CTSTest): '''Start a node - but tell it to stop before it finishes starting up''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "PartialStart" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) self.stop = StopTest(cm) #self.is_unsafe = 1 def __call__(self, node): '''Perform the 'PartialStart' test. ''' self.incr("calls") ret = self.stopall(None) if not ret: return self.failure("Setup failed") watchpats = [] watchpats.append("pacemaker-controld.*Connecting to .* cluster infrastructure") watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) watch.set_watch() self.CM.StartaCMnoBlock(node) ret = watch.look_for_all() if not ret: self.logger.log("Patterns not found: " + repr(watch.unmatched)) return self.failure("Setup of %s failed" % node) ret = self.stop(node) if not ret: return self.failure("%s did not stop in time" % node) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' # We might do some fencing in the 2-node case if we make it up far enough return [ r"Executing reboot fencing operation", r"Requesting fencing \([^)]+\) of node ", ] # Register StopOnebyOne as a good test to run AllTestClasses.append(PartialStart) class StandbyTest(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "Standby" self.benchmark = 1 self.start = StartTest(cm) self.startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resouces, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): self.incr("calls") ret = self.startall(None) if not ret: return self.failure("Start all nodes failed") self.debug("Make sure node %s is active" % node) if self.CM.StandbyStatus(node) != "off": if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.debug("Getting resources running on node %s" % node) rsc_on_node = self.CM.active_resources(node) watchpats = [] watchpats.append(r"State transition .* -> S_POLICY_ENGINE") watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) watch.set_watch() self.debug("Setting node %s to standby mode" % node) if not self.CM.SetStandbyMode(node, "on"): return self.failure("can't set node %s to standby mode" % node) self.set_timer("on") ret = watch.look_for_all() if not ret: self.logger.log("Patterns not found: " + repr(watch.unmatched)) self.CM.SetStandbyMode(node, "off") return self.failure("cluster didn't react to standby change on %s" % node) self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "on": return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status)) self.log_timer("on") self.debug("Checking resources") bad_run = self.CM.active_resources(node) if len(bad_run) > 0: rc = self.failure("%s set to standby, %s is still running on it" % (node, repr(bad_run))) self.debug("Setting node %s to active mode" % node) self.CM.SetStandbyMode(node, "off") return rc self.debug("Setting node %s to active mode" % node) if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) self.set_timer("off") self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.log_timer("off") return self.success() AllTestClasses.append(StandbyTest) class ValgrindTest(CTSTest): '''Check for memory leaks''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "Valgrind" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) self.is_valgrind = 1 self.is_loop = 1 def setup(self, node): self.incr("calls") ret = self.stopall(None) if not ret: return self.failure("Stop all nodes failed") # @TODO Edit /etc/sysconfig/pacemaker on all nodes to enable valgrind, # and clear any valgrind logs from previous runs. For now, we rely on # the user to do this manually. ret = self.startall(None) if not ret: return self.failure("Start all nodes failed") return self.success() def teardown(self, node): # Return all nodes to normal # @TODO Edit /etc/sysconfig/pacemaker on all nodes to disable valgrind ret = self.stopall(None) if not ret: return self.failure("Stop all nodes failed") return self.success() def find_leaks(self): # Check for leaks # (no longer used but kept in case feature is restored) leaked = [] self.stop = StopTest(self.CM) for node in self.Env["nodes"]: rc = self.stop(node) if not rc: self.failure("Couldn't shut down %s" % node) (rc, _) = self.rsh(node, "grep -e indirectly.*lost:.*[1-9] -e definitely.*lost:.*[1-9] -e (ERROR|error).*SUMMARY:.*[1-9].*errors %s" % self.logger.logPat) if rc != 1: leaked.append(node) self.failure("Valgrind errors detected on %s" % node) (_, output) = self.rsh(node, "grep -e lost: -e SUMMARY: %s" % self.logger.logPat, verbose=1) for line in output: self.logger.log(line) (_, output) = self.rsh(node, "cat %s" % self.logger.logPat, verbose=1) for line in output: self.debug(line) self.rsh(node, "rm -f %s" % self.logger.logPat, verbose=1) return leaked def __call__(self, node): #leaked = self.find_leaks() #if len(leaked) > 0: # return self.failure("Nodes %s leaked" % repr(leaked)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [ r"pacemaker-based.*: \*\*\*\*\*\*\*\*\*\*\*\*\*", r"pacemaker-based.*: .* avoid confusing Valgrind", r"HA_VALGRIND_ENABLED", ] class StandbyLoopTest(ValgrindTest): '''Check for memory leaks by putting a node in and out of standby for an hour''' # @TODO This is not a useful test for memory leaks def __init__(self, cm): ValgrindTest.__init__(self,cm) self.name = "StandbyLoop" def __call__(self, node): lpc = 0 delay = 2 failed = 0 done = time.time() + self.Env["loop-minutes"] * 60 while time.time() <= done and not failed: lpc = lpc + 1 time.sleep(delay) if not self.CM.SetStandbyMode(node, "on"): self.failure("can't set node %s to standby mode" % node) failed = lpc time.sleep(delay) if not self.CM.SetStandbyMode(node, "off"): self.failure("can't set node %s to active mode" % node) failed = lpc leaked = self.find_leaks() if failed: return self.failure("Iteration %d failed" % failed) elif len(leaked) > 0: return self.failure("Nodes %s leaked" % repr(leaked)) return self.success() #AllTestClasses.append(StandbyLoopTest) class BandwidthTest(CTSTest): # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which the cluster uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) (handle, self.tempfile) = tempfile.mkstemp(".cts") os.close(handle) self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount() < 1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] port = int(port) ret = self.startall(None) if not ret: return self.failure("Test setup failed") time.sleep(5) # We get extra messages right after startup. fstmpfile = "/var/run/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile) (rc, _) = self.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) self.rsh.copy(farfile, self.tempfile) Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.logger.log("Could not compute bandwidth.") return self.success() intband = int(Bandwidth + 0.5) self.logger.log("...bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): fp = open(file, "r") fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count + 1 linesplit = line.split(" ") for j in range(len(linesplit)-1): if linesplit[j] == "udp": break if linesplit[j] == "length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.logger.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = T1.split(":") time2split = timesplit[2].split(".") time1 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = line.split(" ") for j in range(len(linessplit)-1): if linessplit[j] == "udp": break if linessplit[j] == "length:": break try: sum = int(linessplit[j+1]) + sum except ValueError: self.logger.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = T2.split(":") time2split = timesplit[2].split(".") time2 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return int((sum*8)/time) def is_applicable(self): '''BandwidthTest never applicable''' return False AllTestClasses.append(BandwidthTest) ################################################################### class MaintenanceMode(CTSTest): ################################################################### def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "MaintenanceMode" self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.max = 30 #self.is_unsafe = 1 self.benchmark = 1 self.action = "asyncmon" self.interval = 0 self.rid = "maintenanceDummy" def toggleMaintenanceMode(self, node, action): pats = [] pats.append(self.templates["Pat:DC_IDLE"]) # fail the resource right after turning Maintenance mode on # verify it is not recovered until maintenance mode is turned off if action == "On": pats.append(self.templates["Pat:RscOpFail"] % (self.action, self.rid)) else: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid)) pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid)) watch = self.create_watch(pats, 60) watch.set_watch() self.debug("Turning maintenance mode %s" % action) self.rsh(node, self.templates["MaintenanceMode%s" % (action)]) if (action == "On"): self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node)) self.set_timer("recover%s" % (action)) watch.look_for_all() self.log_timer("recover%s" % (action)) if watch.unmatched: self.debug("Failed to find patterns when turning maintenance mode %s" % action) return repr(watch.unmatched) return "" def insertMaintenanceDummy(self, node): pats = [] pats.append(("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self.rid))) watch = self.create_watch(pats, 60) watch.set_watch() self.CM.AddDummyRsc(node, self.rid) self.set_timer("addDummy") watch.look_for_all() self.log_timer("addDummy") if watch.unmatched: self.debug("Failed to find patterns when adding maintenance dummy resource") return repr(watch.unmatched) return "" def removeMaintenanceDummy(self, node): pats = [] pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid)) watch = self.create_watch(pats, 60) watch.set_watch() self.CM.RemoveDummyRsc(node, self.rid) self.set_timer("removeDummy") watch.look_for_all() self.log_timer("removeDummy") if watch.unmatched: self.debug("Failed to find patterns when removing maintenance dummy resource") return repr(watch.unmatched) return "" def managedRscList(self, node): rscList = [] (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): tmp = AuditResource(self.CM, line) - if tmp.managed(): + if tmp.managed: rscList.append(tmp.id) return rscList def verifyResources(self, node, rscList, managed): managedList = list(rscList) managed_str = "managed" if not managed: managed_str = "unmanaged" (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): tmp = AuditResource(self.CM, line) - if managed and not tmp.managed(): + if managed and not tmp.managed: continue - elif not managed and tmp.managed(): + elif not managed and tmp.managed: continue elif managedList.count(tmp.id): managedList.remove(tmp.id) if len(managedList) == 0: self.debug("Found all %s resources on %s" % (managed_str, node)) return True self.logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managedList)) return False def __call__(self, node): '''Perform the 'MaintenanceMode' test. ''' self.incr("calls") verify_managed = False verify_unmanaged = False failPat = "" ret = self.startall(None) if not ret: return self.failure("Setup failed") # get a list of all the managed resources. We use this list # after enabling maintenance mode to verify all managed resources # become un-managed. After maintenance mode is turned off, we use # this list to verify all the resources become managed again. managedResources = self.managedRscList(node) if len(managedResources) == 0: self.logger.log("No managed resources on %s" % node) return self.skipped() # insert a fake resource we can fail during maintenance mode # so we can verify recovery does not take place until after maintenance # mode is disabled. failPat = failPat + self.insertMaintenanceDummy(node) # toggle maintenance mode ON, then fail dummy resource. failPat = failPat + self.toggleMaintenanceMode(node, "On") # verify all the resources are now unmanaged if self.verifyResources(node, managedResources, False): verify_unmanaged = True # Toggle maintenance mode OFF, verify dummy is recovered. failPat = failPat + self.toggleMaintenanceMode(node, "Off") # verify all the resources are now managed again if self.verifyResources(node, managedResources, True): verify_managed = True # Remove our maintenance dummy resource. failPat = failPat + self.removeMaintenanceDummy(node) self.CM.cluster_stable() if failPat != "": return self.failure("Unmatched patterns: %s" % (failPat)) elif verify_unmanaged is False: return self.failure("Failed to verify resources became unmanaged during maintenance mode") elif verify_managed is False: return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [ r"Updating failcount for %s" % self.rid, r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self.rid, r"Unknown operation: fail", self.templates["Pat:RscOpOK"] % (self.action, self.rid), r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval), ] AllTestClasses.append(MaintenanceMode) class ResourceRecover(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "ResourceRecover" self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.max = 30 self.rid = None self.rid_alt = None #self.is_unsafe = 1 self.benchmark = 1 # these are the values used for the new LRM API call self.action = "asyncmon" self.interval = 0 def __call__(self, node): '''Perform the 'ResourceRecover' test. ''' self.incr("calls") ret = self.startall(None) if not ret: return self.failure("Setup failed") # List all resources active on the node (skip test if none) resourcelist = self.CM.active_resources(node) if len(resourcelist) == 0: self.logger.log("No active resources on %s" % node) return self.skipped() # Choose one resource at random rsc = self.choose_resource(node, resourcelist) if rsc is None: return self.failure("Could not get details of resource '%s'" % self.rid) if rsc.id == rsc.clone_id: self.debug("Failing " + rsc.id) else: self.debug("Failing " + rsc.id + " (also known as " + rsc.clone_id + ")") # Log patterns to watch for (failure, plus restart if managed) pats = [] pats.append(self.templates["Pat:CloneOpFail"] % (self.action, rsc.id, rsc.clone_id)) - if rsc.managed(): + if rsc.managed: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid)) - if rsc.unique(): + if rsc.unique: pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid)) else: # Anonymous clones may get restarted with a different clone number pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*")) # Fail resource. (Ideally, we'd fail it twice, to ensure the fail count # is incrementing properly, but it might restart on a different node. # We'd have to temporarily ban it from all other nodes and ensure the # migration-threshold hasn't been reached.) if self.fail_resource(rsc, node, pats) is None: return None # self.failure() already called return self.success() def choose_resource(self, node, resourcelist): """ Choose a random resource to target """ self.rid = self.Env.random_gen.choice(resourcelist) self.rid_alt = self.rid (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if line.startswith("Resource: "): rsc = AuditResource(self.CM, line) if rsc.id == self.rid: # Handle anonymous clones that get renamed self.rid = rsc.clone_id return rsc return None def get_failcount(self, node): """ Check the fail count of targeted resource on given node """ (rc, lines) = self.rsh(node, "crm_failcount --quiet --query --resource %s " "--operation %s --interval %d " "--node %s" % (self.rid, self.action, self.interval, node), verbose=1) if rc != 0 or len(lines) != 1: self.logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(map(str.strip, lines)))) return -1 try: failcount = int(lines[0]) except (IndexError, ValueError): self.logger.log("crm_failcount output on %s unparseable: %s" % (node, ' '.join(lines))) return -1 return failcount def fail_resource(self, rsc, node, pats): """ Fail the targeted resource, and verify as expected """ orig_failcount = self.get_failcount(node) watch = self.create_watch(pats, 60) watch.set_watch() self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node)) self.set_timer("recover") watch.look_for_all() self.log_timer("recover") self.CM.cluster_stable() recovered = self.CM.ResourceLocation(self.rid) if watch.unmatched: return self.failure("Patterns not found: %s" % repr(watch.unmatched)) - elif rsc.unique() and len(recovered) > 1: + elif rsc.unique and len(recovered) > 1: return self.failure("%s is now active on more than one node: %s"%(self.rid, repr(recovered))) elif len(recovered) > 0: self.debug("%s is running on: %s" % (self.rid, repr(recovered))) - elif rsc.managed(): + elif rsc.managed: return self.failure("%s was not recovered and is inactive" % self.rid) new_failcount = self.get_failcount(node) if new_failcount != (orig_failcount + 1): return self.failure("%s fail count is %d not %d" % (self.rid, new_failcount, orig_failcount + 1)) return 0 # Anything but None is success def errorstoignore(self): '''Return list of errors which should be ignored''' return [ r"Updating failcount for %s" % self.rid, r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self.rid, self.rid_alt), r"Unknown operation: fail", self.templates["Pat:RscOpOK"] % (self.action, self.rid), r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval), ] AllTestClasses.append(ResourceRecover) class ComponentFail(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "ComponentFail" self.startall = SimulStartLite(cm) self.complist = cm.Components() self.patterns = [] self.okerrpatterns = [] self.is_unsafe = 1 def __call__(self, node): '''Perform the 'ComponentFail' test. ''' self.incr("calls") self.patterns = [] self.okerrpatterns = [] # start all nodes ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.CM.cluster_stable(self.Env["StableTime"]): return self.failure("Setup failed - unstable") node_is_dc = self.CM.is_node_dc(node, None) # select a component to kill chosen = self.Env.random_gen.choice(self.complist) while chosen.dc_only and node_is_dc == 0: chosen = self.Env.random_gen.choice(self.complist) self.debug("...component %s (dc=%d)" % (chosen.name, node_is_dc)) self.incr(chosen.name) if chosen.name != "corosync": self.patterns.append(self.templates["Pat:ChildKilled"] %(node, chosen.name)) self.patterns.append(self.templates["Pat:ChildRespawn"] %(node, chosen.name)) self.patterns.extend(chosen.pats) if node_is_dc: self.patterns.extend(chosen.dc_pats) # @TODO this should be a flag in the Component if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]: # Ignore actions for fence devices if fencer will respawn # (their registration will be lost, and probes will fail) self.okerrpatterns = [ self.templates["Pat:Fencing_active"] ] (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self.CM, line) if r.rclass == "stonith": self.okerrpatterns.append(self.templates["Pat:Fencing_recover"] % r.id) self.okerrpatterns.append(self.templates["Pat:Fencing_probe"] % r.id) # supply a copy so self.patterns doesn't end up empty tmpPats = [] tmpPats.extend(self.patterns) self.patterns.extend(chosen.badnews_ignore) # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status stonithPats = [] stonithPats.append(self.templates["Pat:Fencing_ok"] % node) stonith = self.create_watch(stonithPats, 0) stonith.set_watch() # set the watch for stable watch = self.create_watch( tmpPats, self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"]) watch.set_watch() # kill the component chosen.kill(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() self.debug("Waiting for any fenced node to come back up") self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") self.CM.cluster_stable(self.Env["StartTime"]) self.debug("Checking if %s was shot" % node) shot = stonith.look(60) if shot: self.debug("Found: " + repr(shot)) self.okerrpatterns.append(self.templates["Pat:Fencing_start"] % node) if not self.Env["at-boot"]: self.CM.ShouldBeStatus[node] = "down" # If fencing occurred, chances are many (if not all) the expected logs # will not be sent - or will be lost when the node reboots return self.success() # check for logs indicating a graceful recovery matched = watch.look_for_all(allow_multiple_matches=True) if watch.unmatched: self.logger.log("Patterns not found: " + repr(watch.unmatched)) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self.CM.cluster_stable(self.Env["StartTime"]) if not matched: return self.failure("Didn't find all expected %s patterns" % chosen.name) elif not is_stable: return self.failure("Cluster did not become stable after killing %s" % chosen.name) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' # Note that okerrpatterns refers to the last time we ran this test # The good news is that this works fine for us... self.okerrpatterns.extend(self.patterns) return self.okerrpatterns AllTestClasses.append(ComponentFail) class SplitBrainTest(CTSTest): '''It is used to test split-brain. when the path between the two nodes break check the two nodes both take over the resource''' def __init__(self,cm): CTSTest.__init__(self,cm) self.name = "SplitBrain" self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.is_experimental = 1 def isolate_partition(self, partition): other_nodes = [] other_nodes.extend(self.Env["nodes"]) for node in partition: try: other_nodes.remove(node) except ValueError: self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"]) + " from " +repr(partition)) if len(other_nodes) == 0: return 1 self.debug("Creating partition: " + repr(partition)) self.debug("Everyone else: " + repr(other_nodes)) for node in partition: if not self.CM.isolate_node(node, other_nodes): self.logger.log("Could not isolate %s" % node) return 0 return 1 def heal_partition(self, partition): other_nodes = [] other_nodes.extend(self.Env["nodes"]) for node in partition: try: other_nodes.remove(node) except ValueError: self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"])) if len(other_nodes) == 0: return 1 self.debug("Healing partition: " + repr(partition)) self.debug("Everyone else: " + repr(other_nodes)) for node in partition: self.CM.unisolate_node(node, other_nodes) def __call__(self, node): '''Perform split-brain test''' self.incr("calls") self.passed = 1 partitions = {} ret = self.startall(None) if not ret: return self.failure("Setup failed") while 1: # Retry until we get multiple partitions partitions = {} p_max = len(self.Env["nodes"]) for node in self.Env["nodes"]: p = self.Env.random_gen.randint(1, p_max) if not p in partitions: partitions[p] = [] partitions[p].append(node) p_max = len(list(partitions.keys())) if p_max > 1: break # else, try again self.debug("Created %d partitions" % p_max) for key in list(partitions.keys()): self.debug("Partition["+str(key)+"]:\t"+repr(partitions[key])) # Disabling STONITH to reduce test complexity for now self.rsh(node, "crm_attribute -V -n stonith-enabled -v false") for key in list(partitions.keys()): self.isolate_partition(partitions[key]) count = 30 while count > 0: if len(self.CM.find_partitions()) != p_max: time.sleep(10) else: break else: self.failure("Expected partitions were not created") # Target number of partitions formed - wait for stability if not self.CM.cluster_stable(): self.failure("Partitioned cluster not stable") # Now audit the cluster state self.CM.partitions_expected = p_max if not self.audit(): self.failure("Audits failed") self.CM.partitions_expected = 1 # And heal them again for key in list(partitions.keys()): self.heal_partition(partitions[key]) # Wait for a single partition to form count = 30 while count > 0: if len(self.CM.find_partitions()) != 1: time.sleep(10) count -= 1 else: break else: self.failure("Cluster did not reform") # Wait for it to have the right number of members count = 30 while count > 0: members = [] partitions = self.CM.find_partitions() if len(partitions) > 0: members = partitions[0].split() if len(members) != len(self.Env["nodes"]): time.sleep(10) count -= 1 else: break else: self.failure("Cluster did not completely reform") # Wait up to 20 minutes - the delay is more preferable than # trying to continue with in a messed up state if not self.CM.cluster_stable(1200): self.failure("Reformed cluster not stable") if self.Env["continue"]: answer = "Y" else: try: answer = input('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("Reformed cluster not stable") # Turn fencing back on if self.Env["DoFencing"]: self.rsh(node, "crm_attribute -V -D -n stonith-enabled") self.CM.cluster_stable() if self.passed: return self.success() return self.failure("See previous errors") def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ r"Another DC detected:", r"(ERROR|error).*: .*Application of an update diff failed", r"pacemaker-controld.*:.*not in our membership list", r"CRIT:.*node.*returning after partition", ] def is_applicable(self): if not self.is_applicable_common(): return False return len(self.Env["nodes"]) > 2 AllTestClasses.append(SplitBrainTest) class Reattach(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "Reattach" self.startall = SimulStartLite(cm) self.restart1 = RestartTest(cm) self.stopall = SimulStopLite(cm) self.is_unsafe = 0 # Handled by canrunnow() def _is_managed(self, node): (_, is_managed) = self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1) is_managed = is_managed[0].strip() return is_managed == "true" def _set_unmanaged(self, node): self.debug("Disable resource management") self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false") def _set_managed(self, node): self.debug("Re-enable resource management") self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D") def setup(self, node): attempt = 0 if not self.startall(None): return None # Make sure we are really _really_ stable and that all # resources, including those that depend on transient node # attributes, are started while not self.CM.cluster_stable(double_check=True): if attempt < 5: attempt += 1 self.debug("Not stable yet, re-testing") else: self.logger.log("Cluster is not stable") return None return 1 def teardown(self, node): # Make sure 'node' is up start = StartTest(self.CM) start(node) if not self._is_managed(node): self.logger.log("Attempting to re-enable resource management on %s" % node) self._set_managed(node) self.CM.cluster_stable() if not self._is_managed(node): self.logger.log("Could not re-enable resource management") return 0 return 1 def canrunnow(self, node): '''Return TRUE if we can meaningfully run right now''' if self.find_ocfs2_resources(node): self.logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present") return 0 return 1 def __call__(self, node): self.incr("calls") pats = [] # Conveniently, the scheduler will display this message when disabling # management, even if fencing is not enabled, so we can rely on it. managed = self.create_watch(["No fencing will be done"], 60) managed.set_watch() self._set_unmanaged(node) if not managed.look_for_all(): self.logger.log("Patterns not found: " + repr(managed.unmatched)) return self.failure("Resource management not disabled") pats = [] pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*")) pats.append(self.templates["Pat:RscOpOK"] % ("stop", ".*")) pats.append(self.templates["Pat:RscOpOK"] % ("promote", ".*")) pats.append(self.templates["Pat:RscOpOK"] % ("demote", ".*")) pats.append(self.templates["Pat:RscOpOK"] % ("migrate", ".*")) watch = self.create_watch(pats, 60, "ShutdownActivity") watch.set_watch() self.debug("Shutting down the cluster") ret = self.stopall(None) if not ret: self._set_managed(node) return self.failure("Couldn't shut down the cluster") self.debug("Bringing the cluster back up") ret = self.startall(None) time.sleep(5) # allow ping to update the CIB if not ret: self._set_managed(node) return self.failure("Couldn't restart the cluster") if self.local_badnews("ResourceActivity:", watch): self._set_managed(node) return self.failure("Resources stopped or started during cluster restart") watch = self.create_watch(pats, 60, "StartupActivity") watch.set_watch() # Re-enable resource management (and verify it happened). self._set_managed(node) self.CM.cluster_stable() if not self._is_managed(node): return self.failure("Could not re-enable resource management") # Ignore actions for STONITH resources ignore = [] (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self.CM, line) if r.rclass == "stonith": self.debug("Ignoring start actions for %s" % r.id) ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id)) if self.local_badnews("ResourceActivity:", watch, ignore): return self.failure("Resources stopped or started after resource management was re-enabled") return ret def errorstoignore(self): '''Return list of errors which should be ignored''' return [ r"resource( was|s were) active at shutdown", ] def is_applicable(self): return True AllTestClasses.append(Reattach) class SpecialTest1(CTSTest): '''Set up a custom test to cause quorum failure issues for Andrew''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SpecialTest1" self.startall = SimulStartLite(cm) self.restart1 = RestartTest(cm) self.stopall = SimulStopLite(cm) def __call__(self, node): '''Perform the 'SpecialTest1' test for Andrew. ''' self.incr("calls") # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Could not stop all nodes") # Test config recovery when the other nodes come up self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*") # Start the selected node ret = self.restart1(node) if not ret: return self.failure("Could not start "+node) # Start all remaining nodes ret = self.startall(None) if not ret: return self.failure("Could not start the remaining nodes") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' # Errors that occur as a result of the CIB being wiped return [ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed", r"error.*: Resource start-up disabled since no STONITH resources have been defined", r"error.*: Either configure some or disable STONITH with the stonith-enabled option", r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity", ] AllTestClasses.append(SpecialTest1) class HAETest(CTSTest): '''Set up a custom test to cause quorum failure issues for Andrew''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "HAETest" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) self.is_loop = 1 def setup(self, node): # Start all remaining nodes ret = self.startall(None) if not ret: return self.failure("Couldn't start all nodes") return self.success() def teardown(self, node): # Stop everything ret = self.stopall(None) if not ret: return self.failure("Couldn't stop all nodes") return self.success() def wait_on_state(self, node, resource, expected_clones, attempts=240): while attempts > 0: active = 0 (rc, lines) = self.rsh(node, "crm_resource -r %s -W -Q" % resource, verbose=1) # Hack until crm_resource does the right thing if rc == 0 and lines: active = len(lines) if len(lines) == expected_clones: return 1 elif rc == 1: self.debug("Resource %s is still inactive" % resource) elif rc == 234: self.logger.log("Unknown resource %s" % resource) return 0 elif rc == 246: self.logger.log("Cluster is inactive") return 0 elif rc != 0: self.logger.log("Call to crm_resource failed, rc=%d" % rc) return 0 else: self.debug("Resource %s is active on %d times instead of %d" % (resource, active, expected_clones)) attempts -= 1 time.sleep(1) return 0 def find_dlm(self, node): self.r_dlm = None (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self.CM, line) if r.rtype == "controld" and r.parent != "NA": self.debug("Found dlm: %s" % self.r_dlm) self.r_dlm = r.parent return 1 return 0 def find_hae_resources(self, node): self.r_dlm = None self.r_o2cb = None self.r_ocfs2 = [] if self.find_dlm(node): self.find_ocfs2_resources(node) def is_applicable(self): if not self.is_applicable_common(): return False if self.Env["Schema"] == "hae": return True return None class HAERoleTest(HAETest): def __init__(self, cm): '''Lars' mount/unmount test for the HA extension. ''' HAETest.__init__(self,cm) self.name = "HAERoleTest" def change_state(self, node, resource, target): (rc, _) = self.rsh(node, "crm_resource -V -r %s -p target-role -v %s --meta" % (resource, target)) return rc def __call__(self, node): self.incr("calls") lpc = 0 failed = 0 delay = 2 done = time.time() + self.Env["loop-minutes"]*60 self.find_hae_resources(node) clone_max = len(self.Env["nodes"]) while time.time() <= done and not failed: lpc = lpc + 1 self.change_state(node, self.r_dlm, "Stopped") if not self.wait_on_state(node, self.r_dlm, 0): self.failure("%s did not go down correctly" % self.r_dlm) failed = lpc self.change_state(node, self.r_dlm, "Started") if not self.wait_on_state(node, self.r_dlm, clone_max): self.failure("%s did not come up correctly" % self.r_dlm) failed = lpc if not self.wait_on_state(node, self.r_o2cb, clone_max): self.failure("%s did not come up correctly" % self.r_o2cb) failed = lpc for fs in self.r_ocfs2: if not self.wait_on_state(node, fs, clone_max): self.failure("%s did not come up correctly" % fs) failed = lpc if failed: return self.failure("iteration %d failed" % failed) return self.success() AllTestClasses.append(HAERoleTest) class HAEStandbyTest(HAETest): '''Set up a custom test to cause quorum failure issues for Andrew''' def __init__(self, cm): HAETest.__init__(self,cm) self.name = "HAEStandbyTest" def change_state(self, node, resource, target): (rc, _) = self.rsh(node, "crm_standby -V -l reboot -v %s" % (target)) return rc def __call__(self, node): self.incr("calls") lpc = 0 failed = 0 done = time.time() + self.Env["loop-minutes"]*60 self.find_hae_resources(node) clone_max = len(self.Env["nodes"]) while time.time() <= done and not failed: lpc = lpc + 1 self.change_state(node, self.r_dlm, "true") if not self.wait_on_state(node, self.r_dlm, clone_max-1): self.failure("%s did not go down correctly" % self.r_dlm) failed = lpc self.change_state(node, self.r_dlm, "false") if not self.wait_on_state(node, self.r_dlm, clone_max): self.failure("%s did not come up correctly" % self.r_dlm) failed = lpc if not self.wait_on_state(node, self.r_o2cb, clone_max): self.failure("%s did not come up correctly" % self.r_o2cb) failed = lpc for fs in self.r_ocfs2: if not self.wait_on_state(node, fs, clone_max): self.failure("%s did not come up correctly" % fs) failed = lpc if failed: return self.failure("iteration %d failed" % failed) return self.success() AllTestClasses.append(HAEStandbyTest) class NearQuorumPointTest(CTSTest): ''' This test brings larger clusters near the quorum point (50%). In addition, it will test doing starts and stops at the same time. Here is how I think it should work: - loop over the nodes and decide randomly which will be up and which will be down Use a 50% probability for each of up/down. - figure out what to do to get into that state from the current state - in parallel, bring up those going up and bring those going down. ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "NearQuorumPoint" def __call__(self, dummy): '''Perform the 'NearQuorumPoint' test. ''' self.incr("calls") startset = [] stopset = [] stonith = self.CM.prepare_fencing_watcher("NearQuorumPoint") #decide what to do with each node for node in self.Env["nodes"]: action = self.Env.random_gen.choice(["start","stop"]) #action = self.Env.random_gen.choice(["start","stop","no change"]) if action == "start" : startset.append(node) elif action == "stop" : stopset.append(node) self.debug("start nodes:" + repr(startset)) self.debug("stop nodes:" + repr(stopset)) #add search patterns watchpats = [ ] for node in stopset: if self.CM.ShouldBeStatus[node] == "up": watchpats.append(self.templates["Pat:We_stopped"] % node) for node in startset: if self.CM.ShouldBeStatus[node] == "down": #watchpats.append(self.templates["Pat:NonDC_started"] % node) watchpats.append(self.templates["Pat:Local_started"] % node) else: for stopping in stopset: if self.CM.ShouldBeStatus[stopping] == "up": watchpats.append(self.templates["Pat:They_stopped"] % (node, self.CM.key_for_node(stopping))) if len(watchpats) == 0: return self.skipped() if len(startset) != 0: watchpats.append(self.templates["Pat:DC_IDLE"]) watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) watch.set_watch() #begin actions for node in stopset: if self.CM.ShouldBeStatus[node] == "up": self.CM.StopaCMnoBlock(node) for node in startset: if self.CM.ShouldBeStatus[node] == "down": self.CM.StartaCMnoBlock(node) #get the result if watch.look_for_all(): self.CM.cluster_stable() self.CM.fencing_cleanup("NearQuorumPoint", stonith) return self.success() self.logger.log("Warn: Patterns not found: " + repr(watch.unmatched)) #get the "bad" nodes upnodes = [] for node in stopset: if self.CM.StataCM(node) == 1: upnodes.append(node) downnodes = [] for node in startset: if self.CM.StataCM(node) == 0: downnodes.append(node) self.CM.fencing_cleanup("NearQuorumPoint", stonith) if upnodes == [] and downnodes == []: self.CM.cluster_stable() # Make sure they're completely down with no residule for node in stopset: self.rsh(node, self.templates["StopCmd"]) return self.success() if len(upnodes) > 0: self.logger.log("Warn: Unstoppable nodes: " + repr(upnodes)) if len(downnodes) > 0: self.logger.log("Warn: Unstartable nodes: " + repr(downnodes)) return self.failure() def is_applicable(self): return True AllTestClasses.append(NearQuorumPointTest) class RollingUpgradeTest(CTSTest): '''Perform a rolling upgrade of the cluster''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "RollingUpgrade" self.start = StartTest(cm) self.stop = StopTest(cm) self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) def setup(self, node): # Start all remaining nodes ret = self.stopall(None) if not ret: return self.failure("Couldn't stop all nodes") for node in self.Env["nodes"]: if not self.downgrade(node, None): return self.failure("Couldn't downgrade %s" % node) ret = self.startall(None) if not ret: return self.failure("Couldn't start all nodes") return self.success() def teardown(self, node): # Stop everything ret = self.stopall(None) if not ret: return self.failure("Couldn't stop all nodes") for node in self.Env["nodes"]: if not self.upgrade(node, None): return self.failure("Couldn't upgrade %s" % node) return self.success() def install(self, node, version, start=1, flags="--force"): target_dir = "/tmp/rpm-%s" % version src_dir = "%s/%s" % (self.Env["rpm-dir"], version) self.logger.log("Installing %s on %s with %s" % (version, node, flags)) if not self.stop(node): return self.failure("stop failure: "+node) self.rsh(node, "mkdir -p %s" % target_dir) self.rsh(node, "rm -f %s/*.rpm" % target_dir) (_, lines) = self.rsh(node, "ls -1 %s/*.rpm" % src_dir, verbose=1) for line in lines: line = line[:-1] rc = self.rsh.copy("%s" % (line), "%s:%s/" % (node, target_dir)) self.rsh(node, "rpm -Uvh %s %s/*.rpm" % (flags, target_dir)) if start and not self.start(node): return self.failure("start failure: "+node) return self.success() def upgrade(self, node, start=1): return self.install(node, self.Env["current-version"], start) def downgrade(self, node, start=1): return self.install(node, self.Env["previous-version"], start, "--force --nodeps") def __call__(self, node): '''Perform the 'Rolling Upgrade' test. ''' self.incr("calls") for node in self.Env["nodes"]: if self.upgrade(node): return self.failure("Couldn't upgrade %s" % node) self.CM.cluster_stable() return self.success() def is_applicable(self): if not self.is_applicable_common(): return None if not "rpm-dir" in list(self.Env.keys()): return None if not "current-version" in list(self.Env.keys()): return None if not "previous-version" in list(self.Env.keys()): return None return 1 # Register RestartTest as a good test to run AllTestClasses.append(RollingUpgradeTest) class BSC_AddResource(CTSTest): '''Add a resource to the cluster''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "AddResource" self.resource_offset = 0 self.cib_cmd = """cibadmin -C -o %s -X '%s' """ def __call__(self, node): self.incr("calls") self.resource_offset = self.resource_offset + 1 r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset) start_pat = "pacemaker-controld.*%s_start_0.*confirmed.*ok" patterns = [] patterns.append(start_pat % r_id) watch = self.create_watch(patterns, self.Env["DeadTime"]) watch.set_watch() ip = self.NextIP() if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip): return self.failure("Make resource %s failed" % r_id) failed = 0 watch_result = watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Pattern not found: %s" % (regex)) failed = 1 if failed: return self.failure("Resource pattern(s) not found") if not self.CM.cluster_stable(self.Env["DeadTime"]): return self.failure("Unstable cluster") return self.success() def NextIP(self): ip = self.Env["IPBase"] if ":" in ip: fields = ip.rpartition(":") fields[2] = str(hex(int(fields[2], 16)+1)) print(str(hex(int(f[2], 16)+1))) else: fields = ip.rpartition('.') fields[2] = str(int(fields[2])+1) ip = fields[0] + fields[1] + fields[3]; self.Env["IPBase"] = ip return ip.strip() def make_ip_resource(self, node, id, rclass, type, ip): self.logger.log("Creating %s:%s:%s (%s) on %s" % (rclass,type,id,ip,node)) rsc_xml=""" """ % (id, rclass, type, id, id, ip) node_constraint = """ """ % (id, id, id, id, node) rc = 0 (rc, _) = self.rsh(node, self.cib_cmd % ("constraints", node_constraint), verbose=1) if rc != 0: self.logger.log("Constraint creation failed: %d" % rc) return None (rc, _) = self.rsh(node, self.cib_cmd % ("resources", rsc_xml), verbose=1) if rc != 0: self.logger.log("Resource creation failed: %d" % rc) return None return 1 def is_applicable(self): if self.Env["DoBSC"]: return True return None AllTestClasses.append(BSC_AddResource) class SimulStopLite(CTSTest): '''Stop any active nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SimulStopLite" def __call__(self, dummy): '''Perform the 'SimulStopLite' setup work. ''' self.incr("calls") self.debug("Setup: " + self.name) # We ignore the "node" parameter... watchpats = [ ] for node in self.Env["nodes"]: if self.CM.ShouldBeStatus[node] == "up": self.incr("WasStarted") watchpats.append(self.templates["Pat:We_stopped"] % node) if len(watchpats) == 0: return self.success() # Stop all the nodes - at about the same time... watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) watch.set_watch() self.set_timer() for node in self.Env["nodes"]: if self.CM.ShouldBeStatus[node] == "up": self.CM.StopaCMnoBlock(node) if watch.look_for_all(): # Make sure they're completely down with no residule for node in self.Env["nodes"]: self.rsh(node, self.templates["StopCmd"]) return self.success() did_fail = 0 up_nodes = [] for node in self.Env["nodes"]: if self.CM.StataCM(node) == 1: did_fail = 1 up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: " + repr(up_nodes)) self.logger.log("Warn: All nodes stopped but CTS didn't detect: " + repr(watch.unmatched)) return self.failure("Missing log message: "+repr(watch.unmatched)) def is_applicable(self): '''SimulStopLite is a setup test and never applicable''' return False class SimulStartLite(CTSTest): '''Start any stopped nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SimulStartLite" def __call__(self, dummy): '''Perform the 'SimulStartList' setup work. ''' self.incr("calls") self.debug("Setup: " + self.name) # We ignore the "node" parameter... node_list = [] for node in self.Env["nodes"]: if self.CM.ShouldBeStatus[node] == "down": self.incr("WasStopped") node_list.append(node) self.set_timer() while len(node_list) > 0: # Repeat until all nodes come up watchpats = [ ] uppat = self.templates["Pat:NonDC_started"] if self.CM.upcount() == 0: uppat = self.templates["Pat:Local_started"] watchpats.append(self.templates["Pat:DC_IDLE"]) for node in node_list: watchpats.append(uppat % node) watchpats.append(self.templates["Pat:InfraUp"] % node) watchpats.append(self.templates["Pat:PacemakerUp"] % node) # Start all the nodes - at about the same time... watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) watch.set_watch() stonith = self.CM.prepare_fencing_watcher(self.name) for node in node_list: self.CM.StartaCMnoBlock(node) watch.look_for_all() node_list = self.CM.fencing_cleanup(self.name, stonith) if node_list == None: return self.failure("Cluster did not stabilize") # Remove node_list messages from watch.unmatched for node in node_list: self.logger.debug("Dealing with stonith operations for %s" % repr(node_list)) if watch.unmatched: try: watch.unmatched.remove(uppat % node) except: self.debug("Already matched: %s" % (uppat % node)) try: watch.unmatched.remove(self.templates["Pat:InfraUp"] % node) except: self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node)) try: watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node) except: self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node)) if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" %(regex)) if not self.CM.cluster_stable(): return self.failure("Cluster did not stabilize") did_fail = 0 unstable = [] for node in self.Env["nodes"]: if self.CM.StataCM(node) == 0: did_fail = 1 unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: " + repr(unstable)) unstable = [] for node in self.Env["nodes"]: if not self.CM.node_stable(node): did_fail = 1 unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: " + repr(unstable)) return self.success() def is_applicable(self): '''SimulStartLite is a setup test and never applicable''' return False def TestList(cm, audits): result = [] for testclass in AllTestClasses: bound_test = testclass(cm) if bound_test.is_applicable(): bound_test.Audits = audits result.append(bound_test) return result class RemoteLXC(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "RemoteLXC" self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.num_containers = 2 self.is_container = 1 self.failed = 0 self.fail_string = "" def start_lxc_simple(self, node): # restore any artifacts laying around from a previous test. self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null") # generate the containers, put them in the config, add some resources to them pats = [ ] watch = self.create_watch(pats, 120) watch.set_watch() pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc1")) pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc2")) pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc-ms")) pats.append(self.templates["Pat:RscOpOK"] % ("promote", "lxc-ms")) self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -g -a -m -s -c %d &>/dev/null" % self.num_containers) self.set_timer("remoteSimpleInit") watch.look_for_all() self.log_timer("remoteSimpleInit") if watch.unmatched: self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched)) self.failed = 1 def cleanup_lxc_simple(self, node): pats = [ ] # if the test failed, attempt to clean up the cib and libvirt environment # as best as possible if self.failed == 1: # restore libvirt and cib self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null") return watch = self.create_watch(pats, 120) watch.set_watch() pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container1")) pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container2")) self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -p &>/dev/null") self.set_timer("remoteSimpleCleanup") watch.look_for_all() self.log_timer("remoteSimpleCleanup") if watch.unmatched: self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched)) self.failed = 1 # cleanup libvirt self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null") def __call__(self, node): '''Perform the 'RemoteLXC' test. ''' self.incr("calls") ret = self.startall(None) if not ret: return self.failure("Setup failed, start all nodes failed.") (rc, _) = self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -v &>/dev/null") if rc == 1: self.log("Environment test for lxc support failed.") return self.skipped() self.start_lxc_simple(node) self.cleanup_lxc_simple(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() if self.failed == 1: return self.failure(self.fail_string) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [ r"Updating failcount for ping", r"schedulerd.*: Recover\s+(ping|lxc-ms|container)\s+\(.*\)", # The orphaned lxc-ms resource causes an expected transition error # that is a result of the scheduler not having knowledge that the # promotable resource used to be a clone. As a result, it looks like that # resource is running in multiple locations when it shouldn't... But in # this instance we know why this error is occurring and that it is expected. r"Calculated [Tt]ransition .*pe-error", r"Resource lxc-ms .* is active on 2 nodes attempting recovery", r"Unknown operation: fail", r"VirtualDomain.*ERROR: Unable to determine emulator", ] AllTestClasses.append(RemoteLXC) class RemoteDriver(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = self.__class__.__name__ self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.stop = StopTest(cm) self.remote_rsc = "remote-rsc" self.cib_cmd = """cibadmin -C -o %s -X '%s' """ self.reset() def reset(self): self.pcmk_started = 0 self.failed = False self.fail_string = "" self.remote_node_added = 0 self.remote_rsc_added = 0 self.remote_use_reconnect_interval = self.Env.random_gen.choice([True,False]) def fail(self, msg): """ Mark test as failed. """ self.failed = True # Always log the failure. self.logger.log(msg) # Use first failure as test status, as it's likely to be most useful. if not self.fail_string: self.fail_string = msg def get_othernode(self, node): for othernode in self.Env["nodes"]: if othernode == node: # we don't want to try and use the cib that we just shutdown. # find a cluster node that is not our soon to be remote-node. continue else: return othernode def del_rsc(self, node, rsc): othernode = self.get_othernode(node) (rc, _) = self.rsh(othernode, "crm_resource -D -r %s -t primitive" % (rsc)) if rc != 0: self.fail("Removal of resource '%s' failed" % rsc) def add_rsc(self, node, rsc_xml): othernode = self.get_othernode(node) (rc, _) = self.rsh(othernode, self.cib_cmd % ("resources", rsc_xml)) if rc != 0: self.fail("resource creation failed") def add_primitive_rsc(self, node): rsc_xml = """ """ % { "node": self.remote_rsc } self.add_rsc(node, rsc_xml) if not self.failed: self.remote_rsc_added = 1 def add_connection_rsc(self, node): rsc_xml = """ """ % { "node": self.remote_node, "server": node } if self.remote_use_reconnect_interval: # Set reconnect interval on resource rsc_xml = rsc_xml + """ """ % (self.remote_node) rsc_xml = rsc_xml + """ """ % { "node": self.remote_node } self.add_rsc(node, rsc_xml) if not self.failed: self.remote_node_added = 1 def disable_services(self, node): self.corosync_enabled = self.Env.service_is_enabled(node, "corosync") if self.corosync_enabled: self.Env.disable_service(node, "corosync") self.pacemaker_enabled = self.Env.service_is_enabled(node, "pacemaker") if self.pacemaker_enabled: self.Env.disable_service(node, "pacemaker") def restore_services(self, node): if self.corosync_enabled: self.Env.enable_service(node, "corosync") if self.pacemaker_enabled: self.Env.enable_service(node, "pacemaker") def stop_pcmk_remote(self, node): # disable pcmk remote for i in range(10): (rc, _) = self.rsh(node, "service pacemaker_remote stop") if rc != 0: time.sleep(6) else: break def start_pcmk_remote(self, node): for i in range(10): (rc, _) = self.rsh(node, "service pacemaker_remote start") if rc != 0: time.sleep(6) else: self.pcmk_started = 1 break def freeze_pcmk_remote(self, node): """ Simulate a Pacemaker Remote daemon failure. """ # We freeze the process. self.rsh(node, "killall -STOP pacemaker-remoted") def resume_pcmk_remote(self, node): # We resume the process. self.rsh(node, "killall -CONT pacemaker-remoted") def start_metal(self, node): # Cluster nodes are reused as remote nodes in remote tests. If cluster # services were enabled at boot, in case the remote node got fenced, the # cluster node would join instead of the expected remote one. Meanwhile # pacemaker_remote would not be able to start. Depending on the chances, # the situations might not be able to be orchestrated gracefully any more. # # Temporarily disable any enabled cluster serivces. self.disable_services(node) pcmk_started = 0 # make sure the resource doesn't already exist for some reason self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_rsc)) self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_node)) if not self.stop(node): self.fail("Failed to shutdown cluster node %s" % node) return self.start_pcmk_remote(node) if self.pcmk_started == 0: self.fail("Failed to start pacemaker_remote on node %s" % node) return # Convert node to baremetal now that it has shutdown the cluster stack pats = [ ] watch = self.create_watch(pats, 120) watch.set_watch() pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node)) pats.append(self.templates["Pat:DC_IDLE"]) self.add_connection_rsc(node) self.set_timer("remoteMetalInit") watch.look_for_all() self.log_timer("remoteMetalInit") if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def migrate_connection(self, node): if self.failed: return pats = [ ] pats.append(self.templates["Pat:RscOpOK"] % ("migrate_to", self.remote_node)) pats.append(self.templates["Pat:RscOpOK"] % ("migrate_from", self.remote_node)) pats.append(self.templates["Pat:DC_IDLE"]) watch = self.create_watch(pats, 120) watch.set_watch() (rc, _) = self.rsh(node, "crm_resource -M -r %s" % (self.remote_node), verbose=1) if rc != 0: self.fail("failed to move remote node connection resource") return self.set_timer("remoteMetalMigrate") watch.look_for_all() self.log_timer("remoteMetalMigrate") if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return def fail_rsc(self, node): if self.failed: return watchpats = [ ] watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("stop", self.remote_rsc, self.remote_node)) watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node)) watchpats.append(self.templates["Pat:DC_IDLE"]) watch = self.create_watch(watchpats, 120) watch.set_watch() self.debug("causing dummy rsc to fail.") self.rsh(node, "rm -f /var/run/resource-agents/Dummy*") self.set_timer("remoteRscFail") watch.look_for_all() self.log_timer("remoteRscFail") if watch.unmatched: self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched) def fail_connection(self, node): if self.failed: return watchpats = [ ] watchpats.append(self.templates["Pat:Fencing_ok"] % self.remote_node) watchpats.append(self.templates["Pat:NodeFenced"] % self.remote_node) watch = self.create_watch(watchpats, 120) watch.set_watch() # freeze the pcmk remote daemon. this will result in fencing self.debug("Force stopped active remote node") self.freeze_pcmk_remote(node) self.debug("Waiting for remote node to be fenced.") self.set_timer("remoteMetalFence") watch.look_for_all() self.log_timer("remoteMetalFence") if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return self.debug("Waiting for the remote node to come back up") self.CM.ns.wait_for_node(node, 120); pats = [ ] watch = self.create_watch(pats, 240) watch.set_watch() pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node)) if self.remote_rsc_added == 1: pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node)) # start the remote node again watch it integrate back into cluster. self.start_pcmk_remote(node) if self.pcmk_started == 0: self.fail("Failed to start pacemaker_remote on node %s" % node) return self.debug("Waiting for remote node to rejoin cluster after being fenced.") self.set_timer("remoteMetalRestart") watch.look_for_all() self.log_timer("remoteMetalRestart") if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return def add_dummy_rsc(self, node): if self.failed: return # verify we can put a resource on the remote node pats = [ ] watch = self.create_watch(pats, 120) watch.set_watch() pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node)) pats.append(self.templates["Pat:DC_IDLE"]) # Add a resource that must live on remote-node self.add_primitive_rsc(node) # force that rsc to prefer the remote node. (rc, _) = self.CM.rsh(node, "crm_resource -M -r %s -N %s -f" % (self.remote_rsc, self.remote_node), verbose=1) if rc != 0: self.fail("Failed to place remote resource on remote node.") return self.set_timer("remoteMetalRsc") watch.look_for_all() self.log_timer("remoteMetalRsc") if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def test_attributes(self, node): if self.failed: return # This verifies permanent attributes can be set on a remote-node. It also # verifies the remote-node can edit its own cib node section remotely. (rc, line) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % (self.remote_node), verbose=1) if rc != 0: self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line)) return (rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % (self.remote_node), verbose=1) if rc != 0: self.fail("Failed to get remote-node attribute") return (rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % (self.remote_node), verbose=1) if rc != 0: self.fail("Failed to delete remote-node attribute") return def cleanup_metal(self, node): self.restore_services(node) if self.pcmk_started == 0: return pats = [ ] watch = self.create_watch(pats, 120) watch.set_watch() if self.remote_rsc_added == 1: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_rsc)) if self.remote_node_added == 1: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_node)) self.set_timer("remoteMetalCleanup") self.resume_pcmk_remote(node) if self.remote_rsc_added == 1: # Remove dummy resource added for remote node tests self.debug("Cleaning up dummy rsc put on remote node") self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % self.remote_rsc) self.del_rsc(node, self.remote_rsc) if self.remote_node_added == 1: # Remove remote node's connection resource self.debug("Cleaning up remote node connection resource") self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % (self.remote_node)) self.del_rsc(node, self.remote_node) watch.look_for_all() self.log_timer("remoteMetalCleanup") if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) self.stop_pcmk_remote(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() if self.remote_node_added == 1: # Remove remote node itself self.debug("Cleaning up node entry for remote node") self.rsh(self.get_othernode(node), "crm_node --force --remove %s" % self.remote_node) def setup_env(self, node): self.remote_node = "remote-%s" % (node) # we are assuming if all nodes have a key, that it is # the right key... If any node doesn't have a remote # key, we regenerate it everywhere. if self.rsh.exists_on_all("/etc/pacemaker/authkey", self.Env["nodes"]): return # create key locally (handle, keyfile) = tempfile.mkstemp(".cts") os.close(handle) subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # sync key throughout the cluster for node in self.Env["nodes"]: self.rsh(node, "mkdir -p --mode=0750 /etc/pacemaker") self.rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % node) self.rsh(node, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey") self.rsh(node, "chmod 0640 /etc/pacemaker/authkey") os.unlink(keyfile) def is_applicable(self): if not self.is_applicable_common(): return False for node in self.Env["nodes"]: (rc, _) = self.rsh(node, "which pacemaker-remoted >/dev/null 2>&1") if rc != 0: return False return True def start_new_test(self, node): self.incr("calls") self.reset() ret = self.startall(None) if not ret: return self.failure("setup failed: could not start all nodes") self.setup_env(node) self.start_metal(node) self.add_dummy_rsc(node) return True def __call__(self, node): return self.failure("This base class is not meant to be called directly.") def errorstoignore(self): '''Return list of errors which should be ignored''' return [ r"""is running on remote.*which isn't allowed""", r"""Connection terminated""", r"""Could not send remote""", ] # RemoteDriver is just a base class for other tests, so it is not added to AllTestClasses class RemoteBasic(RemoteDriver): def __call__(self, node): '''Perform the 'RemoteBaremetal' test. ''' if not self.start_new_test(node): return self.failure(self.fail_string) self.test_attributes(node) self.cleanup_metal(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() if self.failed: return self.failure(self.fail_string) return self.success() AllTestClasses.append(RemoteBasic) class RemoteStonithd(RemoteDriver): def __call__(self, node): '''Perform the 'RemoteStonithd' test. ''' if not self.start_new_test(node): return self.failure(self.fail_string) self.fail_connection(node) self.cleanup_metal(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() if self.failed: return self.failure(self.fail_string) return self.success() def is_applicable(self): if not RemoteDriver.is_applicable(self): return False if "DoFencing" in list(self.Env.keys()): return self.Env["DoFencing"] return True def errorstoignore(self): ignore_pats = [ r"Lost connection to Pacemaker Remote node", r"Software caused connection abort", r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor", r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*", r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)", r"error: Result of monitor operation for .* on remote-.*: Internal communication failure", ] ignore_pats.extend(RemoteDriver.errorstoignore(self)) return ignore_pats AllTestClasses.append(RemoteStonithd) class RemoteMigrate(RemoteDriver): def __call__(self, node): '''Perform the 'RemoteMigrate' test. ''' if not self.start_new_test(node): return self.failure(self.fail_string) self.migrate_connection(node) self.cleanup_metal(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() if self.failed: return self.failure(self.fail_string) return self.success() def is_applicable(self): if not RemoteDriver.is_applicable(self): return 0 # This test requires at least three nodes: one to convert to a # remote node, one to host the connection originally, and one # to migrate the connection to. if len(self.Env["nodes"]) < 3: return 0 return 1 AllTestClasses.append(RemoteMigrate) class RemoteRscFailure(RemoteDriver): def __call__(self, node): '''Perform the 'RemoteRscFailure' test. ''' if not self.start_new_test(node): return self.failure(self.fail_string) # This is an important step. We are migrating the connection # before failing the resource. This verifies that the migration # has properly maintained control over the remote-node. self.migrate_connection(node) self.fail_rsc(node) self.cleanup_metal(node) self.debug("Waiting for the cluster to recover") self.CM.cluster_stable() if self.failed: return self.failure(self.fail_string) return self.success() def errorstoignore(self): ignore_pats = [ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)", r"Dummy.*: No process state file found", ] ignore_pats.extend(RemoteDriver.errorstoignore(self)) return ignore_pats def is_applicable(self): if not RemoteDriver.is_applicable(self): return 0 # This test requires at least three nodes: one to convert to a # remote node, one to host the connection originally, and one # to migrate the connection to. if len(self.Env["nodes"]) < 3: return 0 return 1 AllTestClasses.append(RemoteRscFailure) # vim:ts=4:sw=4:et: diff --git a/cts/lab/ClusterManager.py b/cts/lab/ClusterManager.py index fda4cfb0bb..34a3d3edfe 100644 --- a/cts/lab/ClusterManager.py +++ b/cts/lab/ClusterManager.py @@ -1,940 +1,940 @@ """ ClusterManager class for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = """Copyright 2000-2023 the Pacemaker project contributors. Certain portions by Huang Zhen are copyright 2004 International Business Machines. The version control history for this file may have further details.""" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import time from collections import UserDict from cts.CIB import ConfigFactory -from cts.CTStests import AuditResource from pacemaker.buildoptions import BuildOptions from pacemaker._cts.CTS import NodeStatus, Process +from pacemaker._cts.audits import AuditResource from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogWatcher class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def __InitialConditions(self): #if os.geteuid() != 0: # raise ValueError("Must Be Root!") None def _finalConditions(self): for key in list(self.keys()): if self[key] == None: raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") def __init__(self): self.Env = EnvFactory().getInstance() self.templates = PatternSelector(self.Env["Name"]) self.__InitialConditions() self.logger = LogFactory() self.TestLoggingLevel=0 self.data = {} self.name = self.Env["Name"] self.rsh = RemoteFactory().getInstance() self.ShouldBeStatus={} self.ns = NodeStatus(self.Env) self.OurNode = os.uname()[1].lower() self.__instance_errorstoignore = [] self.cib_installed = 0 self.config = None self.cluster_monitor = 0 self.use_short_names = 1 if self.Env["DoBSC"]: del self.templates["Pat:They_stopped"] self._finalConditions() self.check_transitions = 0 self.check_elections = 0 self.CIBsync = {} self.CibFactory = ConfigFactory(self) self.cib = self.CibFactory.createConfig(self.Env["Schema"]) def __getitem__(self, key): if key == "Name": return self.name print("FIXME: Getting %s from %s" % (key, repr(self))) if key in self.data: return self.data[key] return self.templates.get_patterns(key) def __setitem__(self, key, value): print("FIXME: Setting %s=%s on %s" % (key, value, repr(self))) self.data[key] = value def key_for_node(self, node): return node def instance_errorstoignore_clear(self): '''Allows the test scenario to reset instance errors to ignore on each iteration.''' self.__instance_errorstoignore = [] def instance_errorstoignore(self): '''Return list of errors which are 'normal' for a specific test instance''' return self.__instance_errorstoignore def log(self, args): self.logger.log(args) def debug(self, args): self.logger.debug(args) def upcount(self): '''How many nodes are up?''' count = 0 for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": count = count + 1 return count def install_support(self, command="install"): for node in self.Env["nodes"]: self.rsh(node, BuildOptions.DAEMON_DIR + "/cts-support " + command) def prepare_fencing_watcher(self, name): # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced upnode = None if self.HasQuorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonithPats = [] for peer in self.Env["nodes"]: if self.ShouldBeStatus[peer] != "up": stonithPats.append(self.templates["Pat:Fencing_ok"] % peer) stonithPats.append(self.templates["Pat:Fencing_start"] % peer) stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0) stonith.set_watch() return stonith def fencing_cleanup(self, node, stonith): peer_list = [] peer_state = {} self.debug("Looking for nodes that were fenced as a result of %s starting" % node) # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.HasQuorum(None) if not q and len(self.Env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"]))) return peer_list for n in self.Env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug("looking for: " + repr(stonith.regexes)) shot = stonith.look(0) while shot: line = repr(shot) self.debug("Found: " + line) del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.Env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self.logger.log("ERROR: Unknown stonith match: %s" % line) elif not peer in peer_list: self.debug("Found peer: " + peer) peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) if self.Env["at-boot"]: self.ShouldBeStatus[peer] = "up" else: self.ShouldBeStatus[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while len(stonith.regexes) and shot: line = repr(shot) self.debug("Found: " + line) del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too self.ns.wait_for_node(peer, self.Env["DeadTime"]) # Poll until it comes up if self.Env["at-boot"]: if not self.StataCM(peer): time.sleep(self.Env["StartTime"]) if not self.StataCM(peer): self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) return None return peer_list def StartaCM(self, node, verbose=False): '''Start up the cluster manager on a given node''' if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node)) else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) ret = 1 if not node in self.ShouldBeStatus: self.ShouldBeStatus[node] = "down" if self.ShouldBeStatus[node] != "down": return 1 patterns = [] # Technically we should always be able to notice ourselves starting patterns.append(self.templates["Pat:Local_started"] % node) if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) watch = LogWatcher( self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10) self.install_config(node) self.ShouldBeStatus[node] = "any" if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): self.logger.log ("%s was already started" % (node)) return 1 stonith = self.prepare_fencing_watcher(node) watch.set_watch() (rc, _) = self.rsh(node, self.templates["StartCmd"]) if rc != 0: self.logger.log ("Warn: Start command failed on node %s" % (node)) self.fencing_cleanup(node, stonith) return None self.ShouldBeStatus[node] = "up" watch_result = watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) if watch_result and self.cluster_stable(self.Env["DeadTime"]): #self.debug("Found match: "+ repr(watch_result)) self.fencing_cleanup(node, stonith) return 1 elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): self.fencing_cleanup(node, stonith) return 1 self.logger.log ("Warn: Start failed for node %s" % (node)) return None def StartaCMnoBlock(self, node, verbose=False): '''Start up the cluster manager on a given node with none-block mode''' if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node)) else: self.debug("Starting %s on node %s" % (self["Name"], node)) self.install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=False) self.ShouldBeStatus[node] = "up" return 1 def StopaCM(self, node, verbose=False, force=False): '''Stop the cluster manager on a given node''' if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node)) else: self.debug("Stopping %s on node %s" % (self["Name"], node)) if self.ShouldBeStatus[node] != "up" and force == False: return 1 (rc, _) = self.rsh(node, self.templates["StopCmd"]) if rc == 0: # Make sure we can continue even if corosync leaks # fdata-* is the old name #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*") self.ShouldBeStatus[node] = "down" self.cluster_stable(self.Env["DeadTime"]) return 1 else: self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) return None def StopaCMnoBlock(self, node): '''Stop the cluster manager on a given node with none-block mode''' self.debug("Stopping %s on node %s" % (self["Name"], node)) self.rsh(node, self.templates["StopCmd"], synchronous=False) self.ShouldBeStatus[node] = "down" return 1 def RereadCM(self, node): '''Force the cluster manager on a given node to reread its config This may be a no-op on certain cluster managers. ''' (rc, _) = self.rsh(node, self.templates["RereadCmd"]) if rc == 0: return 1 else: self.logger.log ("Could not force %s on node %s to reread its config" % (self["Name"], node)) return None def startall(self, nodelist=None, verbose=False, quick=False): '''Start the cluster manager on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.ShouldBeStatus[node] == "down": self.ns.wait_for_all_nodes(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... if not self.StartaCM(node, verbose=verbose): return 0 return 1 # Approximation of SimulStartList for --boot watchpats = [ ] watchpats.append(self.templates["Pat:DC_IDLE"]) for node in nodelist: watchpats.append(self.templates["Pat:InfraUp"] % node) watchpats.append(self.templates["Pat:PacemakerUp"] % node) watchpats.append(self.templates["Pat:Local_started"] % node) watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node)) # Start all the nodes - at about the same time... watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10) watch.set_watch() if not self.StartaCM(nodelist[0], verbose=verbose): return 0 for node in nodelist: self.StartaCMnoBlock(node, verbose=verbose) watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) if not self.cluster_stable(): self.logger.log("Cluster did not stabilize") return 0 return 1 def stopall(self, nodelist=None, verbose=False, force=False): '''Stop the cluster managers on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up" or force == True: if not self.StopaCM(node, verbose=verbose, force=force): ret = 0 return ret def rereadall(self, nodelist=None): '''Force the cluster managers on every node in the cluster to reread their config files. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": self.RereadCM(node) def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.StataCM(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): '''isolate the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node != target: rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node)) if rc != 0: self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) return None else: self.debug("Communication cut between %s and %s" % (target, node)) return 1 def unisolate_node(self, target, nodes=None): '''fix the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node != target: restored = 0 # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False) self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False) self.debug("Communication restored between %s and %s" % (target, node)) def oprofileStart(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileStart(n) elif node in self.Env["oprofile"]: self.debug("Enabling oprofile on %s" % node) self.rsh(node, "opcontrol --init") self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") self.rsh(node, "opcontrol --start") self.rsh(node, "opcontrol --reset") def oprofileSave(self, test, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileSave(test, n) elif node in self.Env["oprofile"]: self.rsh(node, "opcontrol --dump") self.rsh(node, "opcontrol --save=cts.%d" % test) # Read back with: opreport -l session:cts.0 image:/c* if None: self.rsh(node, "opcontrol --reset") else: self.oprofileStop(node) self.oprofileStart(node) def oprofileStop(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileStop(n) elif node in self.Env["oprofile"]: self.debug("Stopping oprofile on %s" % node) self.rsh(node, "opcontrol --reset") self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") def errorstoignore(self): # At some point implement a more elegant solution that # also produces a report at the end """ Return a list of known error messages that should be ignored """ return self.templates.get_patterns("BadNewsIgnore") def install_config(self, node): if not self.ns.wait_for_node(node): self.log("Node %s is not up." % node) return None if not node in self.CIBsync and self.Env["ClobberCIB"]: self.CIBsync[node] = 1 self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*") # Only install the CIB on the first node, all the other ones will pick it up from there if self.cib_installed == 1: return None self.cib_installed = 1 if self.Env["CIBfilename"] == None: self.log("Installing Generated CIB on node %s" % (node)) self.cib.install(node) else: self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node)) if self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) != 0: raise ValueError("Can not scp file to %s %d"%(node)) self.rsh(node, "chown " + BuildOptions.DAEMON_USER + " " + BuildOptions.CIB_DIR + "/cib.xml") def prepare(self): '''Finish the Initialization process. Prepare to test...''' self.partitions_expected = 1 for node in self.Env["nodes"]: self.ShouldBeStatus[node] = "" if self.Env["experimental-tests"]: self.unisolate_node(node) self.StataCM(node) def test_node_CM(self, node): '''Report the status of the cluster manager on a given node''' watchpats = [ ] watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") watchpats.append(self.templates["Pat:NonDC_started"] % node) watchpats.append(self.templates["Pat:DC_started"] % node) idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle") idle_watch.set_watch() (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) if not out: out = "" else: out = out[0].strip() self.debug("Node %s status: '%s'" %(node, out)) if out.find('ok') < 0: if self.ShouldBeStatus[node] == "up": self.log( "Node status for %s is %s but we think it should be %s" % (node, "down", self.ShouldBeStatus[node])) self.ShouldBeStatus[node] = "down" return 0 if self.ShouldBeStatus[node] == "down": self.log( "Node status for %s is %s but we think it should be %s: %s" % (node, "up", self.ShouldBeStatus[node], out)) self.ShouldBeStatus[node] = "up" # check the output first - because syslog-ng loses messages if out.find('S_NOT_DC') != -1: # Up and stable return 2 if out.find('S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" % (node, out)) return 1 # Up and stable return 2 # Is the node up or is the node down def StataCM(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) > 0: return 1 return None # Being up and being stable is not the same question... def node_stable(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) == 2: return 1 self.log("Warn: Node %s not stable" % (node)) return None def partition_stable(self, nodes, timeout=None): watchpats = [ ] watchpats.append("Current ping state: S_IDLE") watchpats.append(self.templates["Pat:DC_IDLE"]) self.debug("Waiting for cluster stability...") if timeout == None: timeout = self.Env["DeadTime"] if len(nodes) < 3: self.debug("Cluster is inactive") return 1 idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout) idle_watch.set_watch() for node in nodes.split(): # have each node dump its current state self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) ret = idle_watch.look() while ret: self.debug(ret) for node in nodes.split(): if re.search(node, ret): return 1 ret = idle_watch.look() self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout)) return None def cluster_stable(self, timeout=None, double_check=False): partitions = self.find_partitions() for partition in partitions: if not self.partition_stable(partition, timeout): return None if double_check: # Make sure we are really stable and that all resources, # including those that depend on transient node attributes, # are started if they were going to be time.sleep(5) for partition in partitions: if not self.partition_stable(partition, timeout): return None return 1 def is_node_dc(self, node, status_line=None): rc = 0 if not status_line: (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) if out: status_line = out[0].strip() if not status_line: rc = 0 elif status_line.find('S_IDLE') != -1: rc = 1 elif status_line.find('S_INTEGRATION') != -1: rc = 1 elif status_line.find('S_FINALIZE_JOIN') != -1: rc = 1 elif status_line.find('S_POLICY_ENGINE') != -1: rc = 1 elif status_line.find('S_TRANSITION_ENGINE') != -1: rc = 1 return rc def active_resources(self, node): (_, output) = self.rsh(node, "crm_resource -c", verbose=1) resources = [] for line in output: if re.search("^Resource", line): tmp = AuditResource(self, line) if tmp.type == "primitive" and tmp.host == node: resources.append(tmp.id) return resources def ResourceLocation(self, rid): ResourceNodes = [] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": cmd = self.templates["RscRunning"] % (rid) (rc, lines) = self.rsh(node, cmd) if rc == 127: self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) for line in lines: self.log("Output: "+line) elif rc == 0: ResourceNodes.append(node) return ResourceNodes def find_partitions(self): ccm_partitions = [] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) if not out: self.log("no partition details for %s" % node) continue partition = out[0].strip() if len(partition) > 2: nodes = partition.split() nodes.sort() partition = ' '.join(nodes) found = 0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug("Adding partition from %s: %s" % (node, partition)) ccm_partitions.append(partition) else: self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) else: self.log("bad partition details for %s" % node) else: self.debug("Node %s is down... skipping" % node) self.debug("Found partitions: %s" % repr(ccm_partitions) ) return ccm_partitions def HasQuorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] for node in node_list: if self.ShouldBeStatus[node] == "up": (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) quorum = quorum[0].strip() if quorum.find("1") != -1: return 1 elif quorum.find("0") != -1: return 0 else: self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum) return 0 def Components(self): complist = [] common_ignore = [ "Pending action:", "(ERROR|error): crm_log_message_adv:", "(ERROR|error): MSG: No message to dump", "pending LRM operations at shutdown", "Lost connection to the CIB manager", "Connection to the CIB terminated...", "Sending message to the CIB manager FAILED", "Action A_RECOVER .* not supported", "(ERROR|error): stonithd_op_result_ready: not signed on", "pingd.*(ERROR|error): send_update: Could not send update", "send_ipc_message: IPC Channel to .* is not connected", "unconfirmed_actions: Waiting on .* unconfirmed actions", "cib_native_msgready: Message pending on command channel", r": Performing A_EXIT_1 - forcefully exiting ", r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.", ] stonith_ignore = [ r"Updating failcount for child_DoFencing", r"error.*: Fencer connection failed \(will retry\)", "pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.", ] stonith_ignore.extend(common_ignore) ccm = Process(self, "ccm", pats = [ "State transition .* S_RECOVERY", "pacemaker-controld.*Action A_RECOVER .* not supported", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", r"pacemaker-controld.*: Could not recover from internal error", "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", # these status numbers are likely wrong now r"pacemaker-controld.*exited with status 2", r"attrd.*exited with status 1", r"cib.*exited with status 2", # Not if it was fenced # "A new node joined the cluster", # "WARN: determine_online_status: Node .* is unclean", # "Scheduling node .* for fencing", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", # "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE", # "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN", "State transition S_STARTING -> S_PENDING", ], badnews_ignore = common_ignore) based = Process(self, "pacemaker-based", pats = [ "State transition .* S_RECOVERY", "Lost connection to the CIB manager", "Connection to the CIB manager terminated", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", r"pacemaker-controld.*: Could not recover from internal error", # these status numbers are likely wrong now r"pacemaker-controld.*exited with status 2", r"attrd.*exited with status 1", ], badnews_ignore = common_ignore) execd = Process(self, "pacemaker-execd", pats = [ "State transition .* S_RECOVERY", "LRM Connection failed", "pacemaker-controld.*I_ERROR.*lrm_connection_destroy", "State transition S_STARTING -> S_PENDING", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", r"pacemaker-controld.*: Could not recover from internal error", # this status number is likely wrong now r"pacemaker-controld.*exited with status 2", ], badnews_ignore = common_ignore) controld = Process(self, "pacemaker-controld", pats = [ # "WARN: determine_online_status: Node .* is unclean", # "Scheduling node .* for fencing", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", "State transition .* S_IDLE", "State transition S_STARTING -> S_PENDING", ], badnews_ignore = common_ignore) schedulerd = Process(self, "pacemaker-schedulerd", pats = [ "State transition .* S_RECOVERY", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", r"pacemaker-controld.*: Could not recover from internal error", r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed", "pacemaker-controld.*I_ERROR.*save_cib_contents", # this status number is likely wrong now r"pacemaker-controld.*exited with status 2", ], badnews_ignore = common_ignore, dc_only=True) if self.Env["DoFencing"]: complist.append(Process(self, "stoniths", dc_pats = [ r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed", "Attempting connection to fencing daemon", ], badnews_ignore = stonith_ignore)) ccm.pats.extend([ # these status numbers are likely wrong now r"attrd.*exited with status 1", r"pacemaker-(based|controld).*exited with status 2", ]) based.pats.extend([ # these status numbers are likely wrong now r"attrd.*exited with status 1", r"pacemaker-controld.*exited with status 2", ]) execd.pats.extend([ # these status numbers are likely wrong now r"pacemaker-controld.*exited with status 2", ]) complist.append(ccm) complist.append(based) complist.append(execd) complist.append(controld) complist.append(schedulerd) return complist def StandbyStatus(self, node): (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) if not out: return "off" out = out[0].strip() self.debug("Standby result: "+out) return out # status == "on" : Enter Standby mode # status == "off": Enter Active mode def SetStandbyMode(self, node, status): current_status = self.StandbyStatus(node) cmd = self.templates["StandbyCmd"] % (node, status) self.rsh(node, cmd) return True def AddDummyRsc(self, node, rid): rsc_xml = """ ' '""" % (rid, rid) constraint_xml = """ ' ' """ % (rid, node, node, rid) self.rsh(node, self.templates['CibAddXml'] % (rsc_xml)) self.rsh(node, self.templates['CibAddXml'] % (constraint_xml)) def RemoveDummyRsc(self, node, rid): constraint = "\"//rsc_location[@rsc='%s']\"" % (rid) rsc = "\"//primitive[@id='%s']\"" % (rid) self.rsh(node, self.templates['CibDelXpath'] % constraint) self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/cts/lab/Makefile.am b/cts/lab/Makefile.am index 27e39b3b40..f620266d65 100644 --- a/cts/lab/Makefile.am +++ b/cts/lab/Makefile.am @@ -1,31 +1,30 @@ # # Copyright 2001-2023 the Pacemaker project contributors # # The version control history for this file may have further details. # # This source code is licensed under the GNU General Public License version 2 # or later (GPLv2+) WITHOUT ANY WARRANTY. # MAINTAINERCLEANFILES = Makefile.in noinst_SCRIPTS = cluster_test \ OCFIPraTest.py # Commands intended to be run only via other commands halibdir = $(CRM_DAEMON_DIR) dist_halib_SCRIPTS = cts-log-watcher ctslibdir = $(pythondir)/cts ctslib_PYTHON = __init__.py \ CIB.py \ cib_xml.py \ ClusterManager.py \ CM_corosync.py \ - CTSaudits.py \ CTSscenarios.py \ CTStests.py ctsdir = $(datadir)/$(PACKAGE)/tests/cts cts_SCRIPTS = CTSlab.py \ cts diff --git a/cts/lab/__init__.py b/cts/lab/__init__.py index abed502d1e..ba6a429ea2 100644 --- a/cts/lab/__init__.py +++ b/cts/lab/__init__.py @@ -1,15 +1,14 @@ """Python modules for Pacemaker's Cluster Test Suite (CTS) This package provides the following modules: CIB cib_xml CM_common CM_corosync -CTSaudits CTS CTSscenarios CTStests patterns watcher """ diff --git a/python/pacemaker/_cts/CTS.py b/python/pacemaker/_cts/CTS.py index 4ca7e59b91..f31fd9016e 100644 --- a/python/pacemaker/_cts/CTS.py +++ b/python/pacemaker/_cts/CTS.py @@ -1,244 +1,237 @@ """ Main classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["CtsLab", "NodeStatus", "Process"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import sys import time import traceback from pacemaker.exitstatus import ExitStatus from pacemaker._cts.environment import EnvFactory +from pacemaker._cts.input import should_continue from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory class CtsLab: """ A class that defines the Lab Environment for the Cluster Test System. It defines those things which are expected to change from test environment to test environment for the same cluster manager. This is where you define the set of nodes that are in your test lab, what kind of reset mechanism you use, etc. All this data is stored as key/value pairs in an Environment instance constructed from arguments passed to this class. The CTS code ignores names it doesn't know about or need. Individual tests have access to this information, and it is perfectly acceptable to provide hints, tweaks, fine-tuning directions, or other information to the tests through this mechanism. """ def __init__(self, args=None): """ Create a new CtsLab instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like has_key, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. """ self._env = EnvFactory().getInstance(args) self._logger = LogFactory() def dump(self): """ Print the current environment """ self._env.dump() def has_key(self, key): """ Does the given environment key exist? """ return key in list(self._env.keys()) def __getitem__(self, key): """ Return the given environment key, or raise KeyError if it does not exist """ # Throughout this file, pylint has trouble understanding that EnvFactory # and RemoteFactory are singleton instances that can be treated as callable # and subscriptable objects. Various warnings are disabled because of this. # See also a comment about self._rsh in environment.py. # pylint: disable=unsubscriptable-object return self._env[key] def __setitem__(self, key, value): """ Set the given environment key to the given value, overriding any previous value """ # pylint: disable=unsupported-assignment-operation self._env[key] = value def run(self, scenario, iterations): """ Run the given scenario the given number of times. Returns: ExitStatus.OK on success, or ExitStatus.ERROR on error """ if not scenario: self._logger.log("No scenario was defined") return ExitStatus.ERROR self._logger.log("Cluster nodes: ") # pylint: disable=unsubscriptable-object for node in self._env["nodes"]: self._logger.log(" * %s" % (node)) if not scenario.SetUp(): return ExitStatus.ERROR # We want to alert on any exceptions caused by running a scenario, so # here it's okay to disable the pylint warning. # pylint: disable=bare-except try: scenario.run(iterations) except: self._logger.log("Exception by %s" % sys.exc_info()[0]) self._logger.traceback(traceback) scenario.summarize() scenario.TearDown() return ExitStatus.ERROR scenario.TearDown() scenario.summarize() if scenario.Stats["failure"] > 0: return ExitStatus.ERROR if scenario.Stats["success"] != iterations: self._logger.log("No failure count but success != requested iterations") return ExitStatus.ERROR return ExitStatus.OK class NodeStatus: """ A class for querying the status of cluster nodes - are nodes up? Do they respond to SSH connections? """ def __init__(self, env): """ Create a new NodeStatus instance Arguments: env -- An Environment instance """ self._env = env def _node_booted(self, node): """ Return True if the given node is booted (responds to pings) """ # pylint: disable=not-callable (rc, _) = RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, verbose=0) return rc == 0 def _sshd_up(self, node): """ Return true if sshd responds on the given node """ # pylint: disable=not-callable (rc, _) = RemoteFactory().getInstance()(node, "true", verbose=0) return rc == 0 def wait_for_node(self, node, timeout=300): """ Wait for a node to become available. Should the timeout be reached, the user will be given a choice whether to continue or not. If not, ValueError will be raised. Returns: True when the node is available, or False if the timeout is reached. """ initial_timeout = timeout anytimeouts = False while timeout > 0: if self._node_booted(node) and self._sshd_up(node): if anytimeouts: # Fudge to wait for the system to finish coming up time.sleep(30) LogFactory().debug("Node %s now up" % node) return True time.sleep(30) if not anytimeouts: LogFactory().debug("Waiting for node %s to come up" % node) anytimeouts = True timeout -= 1 LogFactory().log("%s did not come up within %d tries" % (node, initial_timeout)) - if self._env["continue"]: - answer = "Y" - else: - try: - answer = input('Continue? [nY]') - except EOFError: - answer = "n" - - if answer and answer == "n": + if not should_continue(self._env["continue"]): raise ValueError("%s did not come up within %d tries" % (node, initial_timeout)) return False def wait_for_all_nodes(self, nodes, timeout=300): """ Return True when all nodes come up, or False if the timeout is reached """ for node in nodes: if not self.wait_for_node(node, timeout): return False return True class Process: """ A class for managing a Pacemaker daemon """ # pylint: disable=invalid-name def __init__(self, cm, name, dc_only=False, pats=None, dc_pats=None, badnews_ignore=None): """ Create a new Process instance. Arguments: cm -- A ClusterManager instance name -- The command being run dc_only -- Should this daemon be killed only on the DC? pats -- Regexes we expect to find in log files dc_pats -- Additional DC-specific regexes we expect to find in log files badnews_ignore -- Regexes for lines in the log that can be ignored """ self._cm = cm self.badnews_ignore = badnews_ignore self.dc_only = dc_only self.dc_pats = dc_pats self.name = name self.pats = pats if self.badnews_ignore is None: self.badnews_ignore = [] if self.dc_pats is None: self.dc_pats = [] if self.pats is None: self.pats = [] def kill(self, node): """ Kill the instance of this process running on the given node """ (rc, _) = self._cm.rsh(node, "killall -9 %s" % self.name) if rc != 0: self._cm.log ("ERROR: Kill %s failed on node %s" % (self.name, node)) diff --git a/python/pacemaker/_cts/Makefile.am b/python/pacemaker/_cts/Makefile.am index 3b3e3f8e11..c3ded161e5 100644 --- a/python/pacemaker/_cts/Makefile.am +++ b/python/pacemaker/_cts/Makefile.am @@ -1,24 +1,26 @@ # # Copyright 2023 the Pacemaker project contributors # # The version control history for this file may have further details. # # This source code is licensed under the GNU General Public License version 2 # or later (GPLv2+) WITHOUT ANY WARRANTY. # MAINTAINERCLEANFILES = Makefile.in pkgpythondir = $(pythondir)/$(PACKAGE)/_cts pkgpython_PYTHON = CTS.py \ __init__.py \ + audits.py \ corosync.py \ environment.py \ errors.py \ + input.py \ logging.py \ patterns.py \ process.py \ remote.py \ test.py \ watcher.py diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py new file mode 100644 index 0000000000..fe8a9e552a --- /dev/null +++ b/python/pacemaker/_cts/audits.py @@ -0,0 +1,1033 @@ +""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """ + +__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"] +__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import re +import time +import uuid + +from pacemaker.buildoptions import BuildOptions +from pacemaker._cts.input import should_continue +from pacemaker._cts.watcher import LogKind, LogWatcher + + +class ClusterAudit: + """ The base class for various kinds of auditors. Specific audit implementations + should be built on top of this one. Audits can do all kinds of checks on the + system. The basic interface for callers is the `__call__` method, which + returns True if the audit passes and False if it fails. + """ + + def __init__(self, cm): + """ Create a new ClusterAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + # pylint: disable=invalid-name + self._cm = cm + self.name = None + + def __call__(self): + raise NotImplementedError + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. + This method must be implemented by all subclasses. + """ + + raise NotImplementedError + + def log(self, args): + """ Log a message """ + + self._cm.log("audit: %s" % args) + + def debug(self, args): + """ Log a debug message """ + + self._cm.debug("audit: %s" % args) + + +class LogAudit(ClusterAudit): + """ Audit each cluster node to verify that some logging system is usable. + This is done by logging a unique test message and then verifying that + we can read back that test message using logging tools. + """ + + def __init__(self, cm): + """ Create a new LogAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.name = "LogAudit" + + def _restart_cluster_logging(self, nodes=None): + """ Restart logging on the given nodes, or all if none are given """ + + if not nodes: + nodes = self._cm.Env["nodes"] + + self._cm.debug("Restarting logging on: %s" % repr(nodes)) + + for node in nodes: + if self._cm.Env["have_systemd"]: + (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket") + if rc != 0: + self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) + + (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service") + if rc != 0: + self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) + + (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"]) + if rc != 0: + self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node)) + + def _create_watcher(self, patterns, kind): + """ Create a new LogWatcher instance for the given patterns """ + + watch = LogWatcher(self._cm.Env["LogFileName"], patterns, + self._cm.Env["nodes"], kind, "LogAudit", 5, + silent=True) + watch.set_watch() + return watch + + def _test_logging(self): + """ Perform the log audit """ + + patterns = [] + prefix = "Test message from" + suffix = str(uuid.uuid4()) + watch = {} + + for node in self._cm.Env["nodes"]: + # Look for the node name in two places to make sure + # that syslog is logging with the correct hostname + m = re.search("^([^.]+).*", node) + if m: + simple = m.group(1) + else: + simple = node + + patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) + + watch_pref = self._cm.Env["LogWatcher"] + if watch_pref == LogKind.ANY: + kinds = [ LogKind.FILE ] + if self._cm.Env["have_systemd"]: + kinds += [ LogKind.JOURNAL ] + kinds += [ LogKind.REMOTE_FILE ] + for k in kinds: + watch[k] = self._create_watcher(patterns, k) + self._cm.log("Logging test message with identifier %s" % suffix) + else: + watch[watch_pref] = self._create_watcher(patterns, watch_pref) + + for node in self._cm.Env["nodes"]: + cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix) + + (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0) + if rc != 0: + self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) + + for k in list(watch.keys()): + w = watch[k] + if watch_pref == LogKind.ANY: + self._cm.log("Checking for test message in %s logs" % k) + w.look_for_all(silent=True) + if w.unmatched: + for regex in w.unmatched: + self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind)) + else: + if watch_pref == LogKind.ANY: + self._cm.log("Found test message in %s logs" % k) + self._cm.Env["LogWatcher"] = k + return 1 + + return False + + def __call__(self): + max_attempts = 3 + attempt = 0 + + self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) + while attempt <= max_attempts and not self._test_logging(): + attempt += 1 + self._restart_cluster_logging() + time.sleep(60*attempt) + + if attempt > max_attempts: + self._cm.log("ERROR: Cluster logging unrecoverable.") + return False + + return True + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + if self._cm.Env["DoBSC"] or self._cm.Env["LogAuditDisabled"]: + return False + + return True + + +class DiskAudit(ClusterAudit): + """ Audit disk usage on cluster nodes to verify that there is enough free + space left on whichever mounted file system holds the logs. + + Warn on: less than 100 MB or 10% of free space + Error on: less than 10 MB or 5% of free space + """ + + def __init__(self, cm): + """ Create a new DiskAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.name = "DiskspaceAudit" + + def __call__(self): + result = True + + # @TODO Use directory of PCMK_logfile if set on host + dfcmd = "df -BM " + BuildOptions.LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'" + + self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) + for node in self._cm.Env["nodes"]: + (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1) + if not dfout: + self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) + continue + + dfout = dfout[0].strip() + + try: + (used, remain) = dfout.split() + used_percent = int(used) + remaining_mb = int(remain) + except (ValueError, TypeError): + self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]" + % (dfout, node, used, remain)) + else: + if remaining_mb < 10 or used_percent > 95: + self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" + % (node, used_percent, remaining_mb)) + result = False + + if not should_continue(self._cm.Env): + raise ValueError("Disk full on %s" % node) + + elif remaining_mb < 100 or used_percent > 90: + self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) + + return result + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + return not self._cm.Env["DoBSC"] + + +class FileAudit(ClusterAudit): + """ Audit the filesystem looking for various failure conditions: + + * The presence of core dumps from corosync or Pacemaker daemons + * Stale IPC files + """ + + def __init__(self, cm): + """ Create a new FileAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.known = [] + self.name = "FileAudit" + + def __call__(self): + result = True + + self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) + for node in self._cm.Env["nodes"]: + + (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) + for line in lsout: + line = line.strip() + + if line not in self.known: + result = False + self.known.append(line) + self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line)) + + (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) + for line in lsout: + line = line.strip() + + if line not in self.known: + result = False + self.known.append(line) + self._cm.log("Warning: Corosync core file on %s: %s" % (node, line)) + + if node in self._cm.ShouldBeStatus and self._cm.ShouldBeStatus[node] == "down": + clean = False + (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) + + for line in lsout: + result = False + clean = True + self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line)) + + if clean: + (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) + + for line in lsout: + self._cm.debug("ps[%s]: %s" % (node, line)) + + self._cm.rsh(node, "rm -rf /dev/shm/qb-*") + + else: + self._cm.debug("Skipping %s" % node) + + return result + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + return True + + +class AuditResource: + """ A base class for storing information about a cluster resource """ + + def __init__(self, cm, line): + """ Create a new AuditResource instance + + Arguments: + + cm -- A ClusterManager instance + line -- One line of output from `crm_resource` describing a single + resource + """ + + # pylint: disable=invalid-name + fields = line.split() + self._cm = cm + self.line = line + self.type = fields[1] + self.id = fields[2] + self.clone_id = fields[3] + self.parent = fields[4] + self.rprovider = fields[5] + self.rclass = fields[6] + self.rtype = fields[7] + self.host = fields[8] + self.needs_quorum = fields[9] + self.flags = int(fields[10]) + self.flags_s = fields[11] + + if self.parent == "NA": + self.parent = None + + @property + def unique(self): + """ Is this resource unique? """ + + return self.flags & 0x20 + + @property + def orphan(self): + """ Is this resource an orphan? """ + + return self.flags & 0x01 + + @property + def managed(self): + """ Is this resource managed by the cluster? """ + + return self.flags & 0x02 + + +class AuditConstraint: + """ A base class for storing information about a cluster constraint """ + + def __init__(self, cm, line): + """ Create a new AuditConstraint instance + + Arguments: + + cm -- A ClusterManager instance + line -- One line of output from `crm_resource` describing a single + constraint + """ + + # pylint: disable=invalid-name + fields = line.split() + self._cm = cm + self.line = line + self.type = fields[1] + self.id = fields[2] + self.rsc = fields[3] + self.target = fields[4] + self.score = fields[5] + self.rsc_role = fields[6] + self.target_role = fields[7] + + if self.rsc_role == "NA": + self.rsc_role = None + + if self.target_role == "NA": + self.target_role = None + + +class PrimitiveAudit(ClusterAudit): + """ Audit primitive resources to verify a variety of conditions, including that + they are active and managed only when expected; they are active on the + expected clusted node; and that they are not orphaned. + """ + + def __init__(self, cm): + """ Create a new PrimitiveAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.name = "PrimitiveAudit" + + self._active_nodes = [] + self._constraints = [] + self._inactive_nodes = [] + self._resources = [] + self._target = None + + def _audit_resource(self, resource, quorum): + """ Perform the audit of a single resource """ + + rc = True + active = self._cm.ResourceLocation(resource.id) + + if len(active) == 1: + if quorum: + self.debug("Resource %s active on %s" % (resource.id, repr(active))) + + elif resource.needs_quorum == 1: + self._cm.log("Resource %s active without quorum: %s" + % (resource.id, repr(active))) + rc = False + + elif not resource.managed: + self._cm.log("Resource %s not managed. Active on %s" + % (resource.id, repr(active))) + + elif not resource.unique: + # TODO: Figure out a clever way to actually audit these resource types + if len(active) > 1: + self.debug("Non-unique resource %s is active on: %s" + % (resource.id, repr(active))) + else: + self.debug("Non-unique resource %s is not active" % resource.id) + + elif len(active) > 1: + self._cm.log("Resource %s is active multiple times: %s" + % (resource.id, repr(active))) + rc = False + + elif resource.orphan: + self.debug("Resource %s is an inactive orphan" % resource.id) + + elif not self._inactive_nodes: + self._cm.log("WARN: Resource %s not served anywhere" % resource.id) + rc = False + + elif self._cm.Env["warn-inactive"]: + if quorum or not resource.needs_quorum: + self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self._inactive_nodes))) + else: + self.debug("Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self._inactive_nodes))) + + elif quorum or not resource.needs_quorum: + self.debug("Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self._inactive_nodes))) + + return rc + + def _setup(self): + """ Verify cluster nodes are active, and collect resource and colocation + information used for performing the audit. + """ + + for node in self._cm.Env["nodes"]: + if self._cm.ShouldBeStatus[node] == "up": + self._active_nodes.append(node) + else: + self._inactive_nodes.append(node) + + for node in self._cm.Env["nodes"]: + if self._target is None and self._cm.ShouldBeStatus[node] == "up": + self._target = node + + if not self._target: + # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource + # with CIB_file=/path/to/cib.xml even when the cluster isn't running + self.debug("No nodes active - skipping %s" % self.name) + return False + + (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1) + + for line in lines: + if re.search("^Resource", line): + self._resources.append(AuditResource(self._cm, line)) + elif re.search("^Constraint", line): + self._constraints.append(AuditConstraint(self._cm, line)) + else: + self._cm.log("Unknown entry: %s" % line) + + return True + + def __call__(self): + result = True + + if not self._setup(): + return result + + quorum = self._cm.HasQuorum(None) + for resource in self._resources: + if resource.type == "primitive" and not self._audit_resource(resource, quorum): + result = False + + return result + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self._cm["Name"] == "crm-corosync": + # return True + return False + + +class GroupAudit(PrimitiveAudit): + """ Audit group resources to verify that each of its child primitive + resources is active on the expected cluster node. + """ + + def __init__(self, cm): + """ Create a new GroupAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + PrimitiveAudit.__init__(self, cm) + self.name = "GroupAudit" + + def __call__(self): + result = True + + if not self._setup(): + return result + + for group in self._resources: + if group.type != "group": + continue + + first_match = True + group_location = None + + for child in self._resources: + if child.parent != group.id: + continue + + nodes = self._cm.ResourceLocation(child.id) + + if first_match and len(nodes) > 0: + group_location = nodes[0] + + first_match = False + + if len(nodes) > 1: + result = False + self._cm.log("Child %s of %s is active more than once: %s" + % (child.id, group.id, repr(nodes))) + + elif not nodes: + # Groups are allowed to be partially active + # However we do need to make sure later children aren't running + group_location = None + self.debug("Child %s of %s is stopped" % (child.id, group.id)) + + elif nodes[0] != group_location: + result = False + self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s" + % (child.id, group.id, nodes[0], group_location)) + else: + self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) + + return result + + +class CloneAudit(PrimitiveAudit): + """ Audit clone resources. NOTE: Currently, this class does not perform + any actual audit functions. + """ + + def __init__(self, cm): + """ Create a new CloneAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + PrimitiveAudit.__init__(self, cm) + self.name = "CloneAudit" + + def __call__(self): + result = True + + if not self._setup(): + return result + + for clone in self._resources: + if clone.type != "clone": + continue + + for child in self._resources: + if child.parent == clone.id and child.type == "primitive": + self.debug("Checking child %s of %s..." % (child.id, clone.id)) + # Check max and node_max + # Obtain with: + # crm_resource -g clone_max --meta -r child.id + # crm_resource -g clone_node_max --meta -r child.id + + return result + + +class ColocationAudit(PrimitiveAudit): + """ Audit cluster resources to verify that those that should be colocated + with each other actually are. + """ + + def __init__(self, cm): + """ Create a new ColocationAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + PrimitiveAudit.__init__(self, cm) + self.name = "ColocationAudit" + + def _crm_location(self, resource): + """ Return a list of cluster nodes where a given resource is running """ + + (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1) + hosts = [] + + if rc == 0: + for line in lines: + fields = line.split() + hosts.append(fields[0]) + + return hosts + + def __call__(self): + result = True + + if not self._setup(): + return result + + for coloc in self._constraints: + if coloc.type != "rsc_colocation": + continue + + source = self._crm_location(coloc.rsc) + target = self._crm_location(coloc.target) + + if not source: + self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) + else: + for node in source: + if not node in target: + result = False + self._cm.log("Colocation audit (%s): %s running on %s (not in %s)" + % (coloc.id, coloc.rsc, node, repr(target))) + else: + self.debug("Colocation audit (%s): %s running on %s (in %s)" + % (coloc.id, coloc.rsc, node, repr(target))) + + return result + + +class ControllerStateAudit(ClusterAudit): + """ Audit cluster nodes to verify that those we expect to be active are + active, and those that are expected to be inactive are inactive. + """ + + def __init__(self, cm): + """ Create a new ControllerStateAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.name = "ControllerStateAudit" + + def __call__(self): + result = True + up_are_down = 0 + down_are_up = 0 + unstable_list = [] + + for node in self._cm.Env["nodes"]: + should_be = self._cm.ShouldBeStatus[node] + rc = self._cm.test_node_CM(node) + + if rc > 0: + if should_be == "down": + down_are_up += 1 + + if rc == 1: + unstable_list.append(node) + + elif should_be == "up": + up_are_down += 1 + + if len(unstable_list) > 0: + result = False + self._cm.log("Cluster is not stable: %d (of %d): %s" + % (len(unstable_list), self._cm.upcount(), repr(unstable_list))) + + if up_are_down > 0: + result = False + self._cm.log("%d (of %d) nodes expected to be up were down." + % (up_are_down, len(self._cm.Env["nodes"]))) + + if down_are_up > 0: + result = False + self._cm.log("%d (of %d) nodes expected to be down were up." + % (down_are_up, len(self._cm.Env["nodes"]))) + + return result + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self._cm["Name"] == "crm-corosync": + # return True + return False + + +class CIBAudit(ClusterAudit): + """ Audit the CIB by verifying that it is identical across cluster nodes """ + + def __init__(self, cm): + """ Create a new CIBAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.name = "CibAudit" + + def __call__(self): + result = True + ccm_partitions = self._cm.find_partitions() + + if not ccm_partitions: + self.debug("\tNo partitions to audit") + return result + + for partition in ccm_partitions: + self.debug("\tAuditing CIB consistency for: %s" % partition) + + if self._audit_cib_contents(partition) == 0: + result = False + + return result + + def _audit_cib_contents(self, hostlist): + """ Perform the CIB audit on the given hosts """ + + passed = True + node0 = None + node0_xml = None + + partition_hosts = hostlist.split() + for node in partition_hosts: + node_xml = self._store_remote_cib(node, node0) + + if node_xml is None: + self._cm.log("Could not perform audit: No configuration from %s" % node) + passed = False + + elif node0 is None: + node0 = node + node0_xml = node_xml + + elif node0_xml is None: + self._cm.log("Could not perform audit: No configuration from %s" % node0) + passed = False + + else: + (rc, result) = self._cm.rsh( + node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) + + if rc != 0: + self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) + passed = False + + for line in result: + if not re.search("", line): + passed = False + self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) + else: + self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) + + return passed + + def _store_remote_cib(self, node, target): + """ Store a copy of the given node's CIB on the given target node. If + no target is given, store the CIB on the given node. + """ + + filename = "/tmp/ctsaudit.%s.xml" % node + + if not target: + target = node + + (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1) + if rc != 0: + self._cm.log("Could not retrieve configuration") + return None + + self._cm.rsh("localhost", "rm -f %s" % filename) + for line in lines: + self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) + + if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: + self._cm.log("Could not store configuration") + return None + + return filename + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self._cm["Name"] == "crm-corosync": + # return True + return False + + +class PartitionAudit(ClusterAudit): + """ Audit each partition in a cluster to verify a variety of conditions: + + * The number of partitions and the nodes in each is as expected + * Each node is active when it should be active and inactive when it + should be inactive + * The status and epoch of each node is as expected + * A partition has quorum + * A partition has a DC when expected + """ + + def __init__(self, cm): + """ Create a new PartitionAudit instance + + Arguments: + + cm -- A ClusterManager instance + """ + + ClusterAudit.__init__(self, cm) + self.name = "PartitionAudit" + + self._node_epoch = {} + self._node_state = {} + self._node_quorum = {} + + def __call__(self): + result = True + ccm_partitions = self._cm.find_partitions() + + if not ccm_partitions: + return result + + self._cm.cluster_stable(double_check=True) + + if len(ccm_partitions) != self._cm.partitions_expected: + self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) + result = False + + for partition in ccm_partitions: + self._cm.log("\t %s" % partition) + + for partition in ccm_partitions: + if self._audit_partition(partition) == 0: + result = False + + return result + + def _trim_string(self, avalue): + """ Remove the last character from a multi-character string """ + + if not avalue: + return None + + if len(avalue) > 1: + return avalue[:-1] + + return avalue + + def _trim2int(self, avalue): + """ Remove the last character from a multi-character string and convert + the result to an int. + """ + + trimmed = self._trim_string(avalue) + if trimmed: + return int(trimmed) + + return None + + def _audit_partition(self, partition): + """ Perform the audit of a single partition """ + + passed = True + dc_found = [] + dc_allowed_list = [] + lowest_epoch = None + node_list = partition.split() + + self.debug("Auditing partition: %s" % partition) + for node in node_list: + if self._cm.ShouldBeStatus[node] != "up": + self._cm.log("Warn: Node %s appeared out of nowhere" % node) + self._cm.ShouldBeStatus[node] = "up" + # not in itself a reason to fail the audit (not what we're + # checking for in this audit) + + (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1) + self._node_state[node] = out[0].strip() + + (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1) + self._node_epoch[node] = out[0].strip() + + (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1) + self._node_quorum[node] = out[0].strip() + + self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node])) + self._node_state[node] = self._trim_string(self._node_state[node]) + self._node_epoch[node] = self._trim2int(self._node_epoch[node]) + self._node_quorum[node] = self._trim_string(self._node_quorum[node]) + + if not self._node_epoch[node]: + self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node) + self._cm.ShouldBeStatus[node] = "down" + # not in itself a reason to fail the audit (not what we're + # checking for in this audit) + elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch: + lowest_epoch = self._node_epoch[node] + + if not lowest_epoch: + self._cm.log("Lowest epoch not determined in %s" % partition) + passed = False + + for node in node_list: + if self._cm.ShouldBeStatus[node] != "up": + continue + + if self._cm.is_node_dc(node, self._node_state[node]): + dc_found.append(node) + if self._node_epoch[node] == lowest_epoch: + self.debug("%s: OK" % node) + elif not self._node_epoch[node]: + self.debug("Check on %s ignored: no node epoch" % node) + elif not lowest_epoch: + self.debug("Check on %s ignored: no lowest epoch" % node) + else: + self._cm.log("DC %s is not the oldest node (%d vs. %d)" + % (node, self._node_epoch[node], lowest_epoch)) + passed = False + + if not dc_found: + self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)" + % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) + + elif len(dc_found) > 1: + self._cm.log("%d DCs (%s) found in cluster partition: %s" + % (len(dc_found), str(dc_found), str(node_list))) + passed = False + + if not passed: + for node in node_list: + if self._cm.ShouldBeStatus[node] == "up": + self._cm.log("epoch %s : %s" + % (self._node_epoch[node], self._node_state[node])) + + return passed + + def is_applicable(self): + """ Return True if this audit is applicable in the current test configuration. """ + + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self._cm["Name"] == "crm-corosync": + # return True + return False + + +# pylint: disable=invalid-name +def audit_list(cm): + """ Return a list of instances of applicable audits that can be performed + for the given ClusterManager. + """ + + result = [] + + for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit, + PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit, + ColocationAudit, CIBAudit]: + a = auditclass(cm) + if a.is_applicable(): + result.append(a) + + return result diff --git a/python/pacemaker/_cts/input.py b/python/pacemaker/_cts/input.py new file mode 100644 index 0000000000..7e734f6ffb --- /dev/null +++ b/python/pacemaker/_cts/input.py @@ -0,0 +1,18 @@ +""" User input related utilities for CTS """ + +__all__ = ["should_continue"] +__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +def should_continue(env): + """ On failure, prompt the user to see if we should continue """ + + if env["continue"]: + return True + + try: + answer = input("Continue? [yN]") + except EOFError: + answer = "n" + + return answer in ["y", "Y"]