diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index 57d3b1c33c..9a53d910f6 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1022 +1,1022 @@
"""Auditing classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
"""
The base class for various kinds of auditors.
Specific audit implementations should be built on top of this one. Audits
can do all kinds of checks on the system. The basic interface for callers
is the `__call__` method, which returns True if the audit passes and False
if it fails.
"""
def __init__(self, cm):
"""
Create a new ClusterAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
"""Perform the audit action."""
raise NotImplementedError
def is_applicable(self):
"""
Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
"""Log a message."""
self._cm.log("audit: %s" % args)
def debug(self, args):
"""Log a debug message."""
self._cm.debug("audit: %s" % args)
class LogAudit(ClusterAudit):
"""
Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that we
can read back that test message using logging tools.
"""
def __init__(self, cm):
"""
Create a new LogAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
"""Restart logging on the given nodes, or all if none are given."""
if not nodes:
nodes = self._cm.env["nodes"]
self._cm.debug("Restarting logging on: %r" % nodes)
for node in nodes:
if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log("ERROR: Cannot start 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"])
if rc != 0:
self._cm.log("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node))
def _create_watcher(self, patterns, kind):
"""Create a new LogWatcher instance for the given patterns."""
watch = LogWatcher(self._cm.env["LogFileName"], patterns,
self._cm.env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
"""Perform the log audit."""
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self._cm.env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
watch_pref = self._cm.env["LogWatcher"]
if watch_pref == LogKind.ANY:
kinds = [LogKind.FILE]
if self._cm.env["have_systemd"]:
kinds += [LogKind.JOURNAL]
kinds += [LogKind.REMOTE_FILE]
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log("Logging test message with identifier %s" % suffix)
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.env["nodes"]:
cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in list(watch.keys()):
w = watch[k]
if watch_pref == LogKind.ANY:
self._cm.log("Checking for test message in %s logs" % k)
w.look_for_all(silent=True)
if w.unmatched:
for regex in w.unmatched:
self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
else:
if watch_pref == LogKind.ANY:
self._cm.log("Found test message in %s logs" % k)
self._cm.env["LogWatcher"] = k
return 1
return False
def __call__(self):
"""Perform the audit action."""
max_attempts = 3
attempt = 0
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60 * attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
return False
return True
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
if self._cm.env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
"""
Audit disk usage on cluster nodes.
Verify that there is enough free space left on whichever mounted file
system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
"""
Create a new DiskAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
"""Perform the audit action."""
result = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
% (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
% (node, used_percent, remaining_mb))
result = False
if not should_continue(self._cm.env):
raise ValueError("Disk full on %s" % node)
elif remaining_mb < 100 or used_percent > 90:
self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
return True
class FileAudit(ClusterAudit):
"""
Audit the filesystem looking for various failure conditions.
Check for:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
"""
Create a new FileAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def __call__(self):
"""Perform the audit action."""
result = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = False
clean = True
self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug("ps[%s]: %s" % (node, line))
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug("Skipping %s" % node)
return result
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
return True
class AuditResource:
"""A base class for storing information about a cluster resource."""
def __init__(self, cm, line):
"""
Create a new AuditResource instance.
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
"""Return True if this resource is unique."""
return self.flags & 0x20
@property
def orphan(self):
"""Return True if this resource is an orphan."""
return self.flags & 0x01
@property
def managed(self):
"""Return True if this resource is managed by the cluster."""
return self.flags & 0x02
class AuditConstraint:
"""A base class for storing information about a cluster constraint."""
def __init__(self, cm, line):
"""
Create a new AuditConstraint instance.
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
"""
Audit primitive resources to verify a variety of conditions.
Check that:
* Resources are active and managed only when expected
* Resources are active on the expected cluster node
* Resources are not orphaned
"""
def __init__(self, cm):
"""
Create a new PrimitiveAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
"""Perform the audit of a single resource."""
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %r" % (resource.id, active))
elif resource.needs_quorum == 1:
self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
rc = False
elif not resource.managed:
self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
rc = False
elif resource.orphan:
self.debug("Resource %s is an inactive orphan" % resource.id)
elif not self._inactive_nodes:
self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
rc = False
elif self._cm.env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
return rc
def _setup(self):
"""
Verify cluster nodes are active.
Collect resource and colocation information used for performing the audit.
"""
for node in self._cm.env["nodes"]:
if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.env["nodes"]:
if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name)
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log("Unknown entry: %s" % line)
return True
def __call__(self):
"""Perform the audit action."""
result = True
if not self._setup():
return result
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
result = False
return result
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- #if self._cm["Name"] == "crm-corosync":
- # return True
+ # if self._cm["Name"] == "crm-corosync":
+ # return True
return False
class GroupAudit(PrimitiveAudit):
"""
Audit group resources.
Check that:
* Each of its child primitive resources is active on the expected cluster node
"""
def __init__(self, cm):
"""
Create a new GroupAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
result = True
if not self._setup():
return result
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
result = False
self._cm.log("Child %s of %s is active more than once: %r"
% (child.id, group.id, nodes))
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
result = False
self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
% (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return result
class CloneAudit(PrimitiveAudit):
"""
Audit clone resources.
NOTE: Currently, this class does not perform any actual audit functions.
"""
def __init__(self, cm):
"""
Create a new CloneAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
result = True
if not self._setup():
return result
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return result
class ColocationAudit(PrimitiveAudit):
"""
Audit cluster resources.
Check that:
* Resources are colocated with the expected resource
"""
def __init__(self, cm):
"""
Create a new ColocationAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
"""Return a list of cluster nodes where a given resource is running."""
(rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
result = True
if not self._setup():
return result
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if node not in target:
result = False
self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
% (coloc.id, coloc.rsc, node, target))
else:
self.debug("Colocation audit (%s): %s running on %s (in %r)"
% (coloc.id, coloc.rsc, node, target))
return result
class ControllerStateAudit(ClusterAudit):
"""Verify active and inactive resources."""
def __init__(self, cm):
"""
Create a new ControllerStateAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
result = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.env["nodes"]:
should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
result = False
self._cm.log("Cluster is not stable: %d (of %d): %r"
% (len(unstable_list), self._cm.upcount(), unstable_list))
if up_are_down > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be up were down."
% (up_are_down, len(self._cm.env["nodes"])))
if down_are_up > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be down were up."
% (down_are_up, len(self._cm.env["nodes"])))
return result
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- #if self._cm["Name"] == "crm-corosync":
- # return True
+ # if self._cm["Name"] == "crm-corosync":
+ # return True
return False
class CIBAudit(ClusterAudit):
"""Audit the CIB by verifying that it is identical across cluster nodes."""
def __init__(self, cm):
"""
Create a new CIBAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return result
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
if self._audit_cib_contents(partition) == 0:
result = False
return result
def _audit_cib_contents(self, hostlist):
"""Perform the CIB audit on the given hosts."""
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node)
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node0)
passed = False
else:
(rc, result) = self._cm.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
return passed
def _store_remote_cib(self, node, target):
"""
Store a copy of the given node's CIB on the given target node.
If no target is given, store the CIB on the given node.
"""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- #if self._cm["Name"] == "crm-corosync":
- # return True
+ # if self._cm["Name"] == "crm-corosync":
+ # return True
return False
class PartitionAudit(ClusterAudit):
"""
Audit each partition in a cluster to verify a variety of conditions.
Check that:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
"""
Create a new PartitionAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return result
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
result = False
for partition in ccm_partitions:
self._cm.log("\t %s" % partition)
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
result = False
return result
def _trim_string(self, avalue):
"""Remove the last character from a multi-character string."""
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
"""Remove the last character from a multi-character string and convert the result to an int."""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
"""Perform the audit of a single partition."""
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % partition)
for node in node_list:
if self._cm.expected_status[node] != "up":
self._cm.log("Warn: Node %s appeared out of nowhere" % node)
self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log("Lowest epoch not determined in %s" % partition)
passed = False
for node in node_list:
if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self._node_epoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self._cm.log("DC %s is not the oldest node (%d vs. %d)"
% (node, self._node_epoch[node], lowest_epoch))
passed = False
if not dc_found:
self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
% (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self._cm.log("%d DCs (%s) found in cluster partition: %s"
% (len(dc_found), str(dc_found), str(node_list)))
passed = False
if not passed:
for node in node_list:
if self._cm.expected_status[node] == "up":
self._cm.log("epoch %s : %s"
% (self._node_epoch[node], self._node_state[node]))
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- #if self._cm["Name"] == "crm-corosync":
- # return True
+ # if self._cm["Name"] == "crm-corosync":
+ # return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
"""Return a list of instances of applicable audits that can be performed."""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py
index d26c88a33b..7de8c0e37d 100644
--- a/python/pacemaker/_cts/scenarios.py
+++ b/python/pacemaker/_cts/scenarios.py
@@ -1,424 +1,424 @@
"""Test scenario classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = [
"AllOnce",
"Boot",
"BootCluster",
"LeaveBooted",
"RandomTests",
"Sequence",
]
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker._cts.audits import ClusterAudit
from pacemaker._cts.input import should_continue
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.watcher import LogWatcher
class ScenarioComponent:
"""
The base class for all scenario components.
A scenario component is one single step in a scenario. Each component is
basically just a setup and teardown method.
"""
def __init__(self, cm, env):
"""
Create a new ScenarioComponent instance.
Arguments:
cm -- A ClusterManager instance
env -- An Environment instance
"""
# pylint: disable=invalid-name
self._cm = cm
self._env = env
def is_applicable(self):
"""
Return True if this component is applicable in the given Environment.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def setup(self):
"""
Set up the component, returning True on success.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def teardown(self):
"""
Tear down the given component.
This method must be provided by all subclasses.
"""
raise NotImplementedError
class Scenario:
"""
The base class for scenarios.
A scenario is an ordered list of ScenarioComponent objects. A scenario
proceeds by setting up all its components in sequence, running a list of
tests and audits, and then tearing down its components in reverse.
"""
def __init__(self, cm, components, audits, tests):
"""
Create a new Scenario instance.
Arguments:
cm -- A ClusterManager instance
components -- A list of ScenarioComponents comprising this Scenario
audits -- A list of ClusterAudits that will be performed as
part of this Scenario
tests -- A list of CTSTests that will be run
"""
# pylint: disable=invalid-name
self.stats = {
"success": 0,
"failure": 0,
"BadNews": 0,
"skipped": 0
}
self.tests = tests
self._audits = audits
self._bad_news = None
self._cm = cm
self._components = components
for comp in components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def is_applicable(self):
"""Return True if all ScenarioComponents are applicable."""
for comp in self._components:
if not comp.is_applicable():
return False
return True
def setup(self):
"""
Set up the scenario, returning True on success.
If setup fails at some point, tear down those components that did
successfully set up.
"""
self._cm.prepare()
- self.audit() # Also detects remote/local log config
+ self.audit() # Also detects remote/local log config
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
self.audit()
self._cm.install_support()
self._bad_news = LogWatcher(self._cm.env["LogFileName"],
self._cm.templates.get_patterns("BadNews"),
self._cm.env["nodes"],
self._cm.env["LogWatcher"],
"BadNews", 0)
- self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
+ self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self._components):
if not self._components[j].setup():
# OOPS! We failed. Tear partial setups down.
self.audit()
self._cm.log("Tearing down partial setup")
self.teardown(j)
return False
j += 1
self.audit()
return True
def teardown(self, n_components=None):
"""
Tear down the scenario in the reverse order it was set up.
If n_components is not None, only tear down that many components.
"""
if not n_components:
n_components = len(self._components) - 1
j = n_components
while j >= 0:
self._components[j].teardown()
j -= 1
self.audit()
self._cm.install_support("uninstall")
def incr(self, name):
"""Increment the given stats key."""
if name not in self.stats:
self.stats[name] = 0
self.stats[name] += 1
def run(self, iterations):
"""Run all tests in the scenario the given number of times."""
self._cm.oprofile_start()
try:
self._run_loop(iterations)
self._cm.oprofile_stop()
except:
self._cm.oprofile_stop()
raise
def _run_loop(self, iterations):
"""Run all the tests the given number of times."""
raise NotImplementedError
def run_test(self, test, testcount):
"""
Run the given test.
testcount is the number of tests (including this one) that have been
run across all iterations.
"""
nodechoice = self._cm.env.random_node()
ret = True
did_run = False
self._cm.clear_instance_errors_to_ignore()
choice = "(%s)" % nodechoice
self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount))
starttime = test.set_timer()
if not test.setup(nodechoice):
self._cm.log("Setup failed")
ret = False
else:
did_run = True
ret = test(nodechoice)
if not test.teardown(nodechoice):
self._cm.log("Teardown failed")
if not should_continue(self._cm.env):
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = False
stoptime = time.time()
self._cm.oprofile_save(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if "min_time" not in test.stats:
test.stats["elapsed_time"] = elapsed_time
test.stats["min_time"] = test_time
test.stats["max_time"] = test_time
else:
test.stats["elapsed_time"] += elapsed_time
if test_time < test.stats["min_time"]:
test.stats["min_time"] = test_time
if test_time > test.stats["max_time"]:
test.stats["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self._cm.statall()
did_run = True # Force the test count to be incremented anyway so test extraction works
self.audit(test.errors_to_ignore)
return did_run
def summarize(self):
"""Output scenario results."""
self._cm.log("****************")
self._cm.log("Overall Results:%r" % self.stats)
self._cm.log("****************")
stat_filter = {
"calls": 0,
"failure": 0,
"skipped": 0,
"auditfail": 0,
}
self._cm.log("Test Summary")
for test in self.tests:
for key in stat_filter:
stat_filter[key] = test.stats[key]
name = "Test %s:" % test.name
self._cm.log("{:<25} {!r}".format(name, stat_filter))
self._cm.debug("Detailed Results")
for test in self.tests:
name = "Test %s:" % test.name
self._cm.debug("{:<25} {!r}".format(name, stat_filter))
self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, local_ignore=None):
"""
Perform all scenario audits and log results.
If there are too many failures, prompt the user to confirm that the
scenario should continue running.
"""
errcount = 0
ignorelist = ["CTS:"]
if local_ignore:
ignorelist.extend(local_ignore)
ignorelist.extend(self._cm.errors_to_ignore)
ignorelist.extend(self._cm.instance_errors_to_ignore)
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self._audits:
if not audit():
self._cm.log("Audit %s FAILED." % audit.name)
failed += 1
else:
self._cm.debug("Audit %s passed." % audit.name)
while errcount < 1000:
match = None
if self._bad_news:
match = self._bad_news.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._cm.log("BadNews: %s" % match)
self.incr("BadNews")
errcount += 1
else:
break
else:
print("Big problems")
if not should_continue(self._cm.env):
self._cm.log("Shutting down.")
self.summarize()
self.teardown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self._bad_news:
self._bad_news.end()
return failed
class AllOnce(Scenario):
"""Every Test Once."""
def _run_loop(self, iterations):
testcount = 1
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
"""Random Test Execution."""
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
test = self._cm.env.random_gen.choice(self.tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
"""Named Tests in Sequence."""
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
"""Start the Cluster."""
def _run_loop(self, iterations):
return
class BootCluster(ScenarioComponent):
"""
Start the cluster manager on all nodes.
Wait for each to come up before starting in order to account for the
possibility that a given node might have been rebooted or crashed
beforehand.
"""
def is_applicable(self):
"""Return whether this scenario is applicable."""
return True
def setup(self):
"""Set up the component, returning True on success."""
self._cm.prepare()
# Clear out the cobwebs ;-)
self._cm.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
self._cm.log("Starting Cluster Manager on all nodes.")
return self._cm.startall(verbose=True, quick=True)
def teardown(self):
"""Tear down the component."""
self._cm.log("Stopping Cluster Manager on all nodes")
self._cm.stopall(verbose=True, force=False)
class LeaveBooted(BootCluster):
"""Leave all nodes up when the scenario is complete."""
def teardown(self):
"""Tear down the component."""
self._cm.log("Leaving Cluster running on all nodes")
diff --git a/python/pacemaker/_cts/test.py b/python/pacemaker/_cts/test.py
index 0498060b11..86bffc5683 100644
--- a/python/pacemaker/_cts/test.py
+++ b/python/pacemaker/_cts/test.py
@@ -1,594 +1,594 @@
"""
A module providing base classes.
These classes are used for defining regression tests and groups of regression
tests. Everything exported here should be considered an abstract class that
needs to be subclassed in order to do anything useful. Various functions
will raise NotImplementedError if not overridden by a subclass.
"""
__copyright__ = "Copyright 2009-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+)"
__all__ = ["Test", "Tests"]
import io
import os
import re
import shlex
import signal
import subprocess
import sys
import time
from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError
from pacemaker._cts.process import pipe_communicate
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
def find_validator(rng_file):
"""
Return the command line used to validate XML output.
If no validator is found, return None.
"""
if os.access("/usr/bin/xmllint", os.X_OK):
if rng_file is None:
return ["xmllint", "-"]
return ["xmllint", "--relaxng", rng_file, "-"]
return None
def rng_directory():
"""Return the directory containing RNG schema files."""
if "PCMK_schema_directory" in os.environ:
return os.environ["PCMK_schema_directory"]
if os.path.exists("%s/cts-fencing.in" % sys.path[0]):
return "xml"
return BuildOptions.SCHEMA_DIR
class Pattern:
"""A class for checking log files for a given pattern."""
def __init__(self, pat, negative=False, regex=False):
"""
Create a new Pattern instance.
Arguments:
pat -- The string to search for
negative -- If True, pat must not be found in any input
regex -- If True, pat is a regex and not a substring
"""
self._pat = pat
self.negative = negative
self.regex = regex
def __str__(self):
return self._pat
def match(self, line):
"""Return True if this pattern is found in the given line."""
if self.regex:
return re.search(self._pat, line) is not None
return self._pat in line
class Test:
"""
The base class for a single regression test.
A single regression test may still run multiple commands as part of its
execution.
"""
def __init__(self, name, description, **kwargs):
"""
Create a new Test instance.
This method must be provided by all subclasses, which must call
Test.__init__ first.
Arguments:
description -- A user-readable description of the test, helpful in
identifying what test is running or has failed.
name -- The name of the test. Command line tools use this
attribute to allow running only tests with the exact
name, or tests whose name matches a given pattern.
This should be unique among all tests.
Keyword arguments:
force_wait --
logdir -- The base directory under which to create a directory
to store output and temporary data.
timeout -- How long to wait for the test to complete.
verbose -- Whether to print additional information, including
verbose command output and daemon log files.
"""
self.description = description
self.executed = False
self.name = name
self.force_wait = kwargs.get("force_wait", False)
self.logdir = kwargs.get("logdir", "/tmp")
self.timeout = kwargs.get("timeout", 2)
self.verbose = kwargs.get("verbose", False)
self._cmds = []
self._patterns = []
self._daemon_location = None
self._daemon_output = ""
self._daemon_process = None
self._result_exitcode = ExitStatus.OK
self._result_txt = ""
- ###
- ### PROPERTIES
- ###
+ #
+ # PROPERTIES
+ #
@property
def exitcode(self):
"""
Return the final exitcode of the Test.
If all commands pass, this property will be ExitStatus.OK. Otherwise,
this property will be the exitcode of the first command to fail.
"""
return self._result_exitcode
@exitcode.setter
def exitcode(self, value):
self._result_exitcode = value
@property
def logpath(self):
"""
Return the path to the log for whatever daemon is being tested.
Note that this requires all subclasses to set self._daemon_location
before accessing this property or an exception will be raised.
"""
return os.path.join(self.logdir, "%s.log" % self._daemon_location)
- ###
- ### PRIVATE METHODS
- ###
+ #
+ # PRIVATE METHODS
+ #
def _kill_daemons(self):
"""Kill any running daemons in preparation for executing the test."""
raise NotImplementedError("_kill_daemons not provided by subclass")
def _match_log_patterns(self):
"""
Check test output for expected patterns.
Set self.exitcode and self._result_txt as appropriate. Not all subclass
will need to do this.
"""
if len(self._patterns) == 0:
return
n_failed_matches = 0
n_negative_matches = 0
output = self._daemon_output.split("\n")
for pat in self._patterns:
positive_match = False
for line in output:
if pat.match(line):
if pat.negative:
n_negative_matches += 1
if self.verbose:
print("This pattern should not have matched = '%s" % pat)
break
positive_match = True
break
if not pat.negative and not positive_match:
n_failed_matches += 1
print("Pattern Not Matched = '%s'" % pat)
if n_failed_matches > 0 or n_negative_matches > 0:
msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches."
self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches)
self.exitcode = ExitStatus.ERROR
def _new_cmd(self, cmd, args, exitcode, **kwargs):
"""
Add a command to be executed as part of this test.
Arguments:
cmd -- The program to run.
args -- Commands line arguments to pass to cmd, as a string.
exitcode -- The expected exit code of cmd. This can be used to
run a command that is expected to fail.
Keyword arguments:
stdout_match -- If not None, a string that is expected to be
present in the stdout of cmd. This can be a
regular expression.
no_wait -- Do not wait for cmd to complete.
stdout_negative_match -- If not None, a string that is expected to be
missing in the stdout of cmd. This can be a
regualr expression.
kill -- A command to be run after cmd, typically in
order to kill a failed process. This should be
the entire command line including arguments as
a single string.
validate -- If True, the output of cmd will be passed to
xmllint for validation. If validation fails,
XmlValidationError will be raised.
check_rng -- If True and validate is True, command output
will additionally be checked against the
api-result.rng file.
check_stderr -- If True, the stderr of cmd will be included in
output.
env -- If not None, variables to set in the environment
"""
self._cmds.append(
{
"args": args,
"check_rng": kwargs.get("check_rng", True),
"check_stderr": kwargs.get("check_stderr", True),
"cmd": cmd,
"expected_exitcode": exitcode,
"kill": kwargs.get("kill", None),
"no_wait": kwargs.get("no_wait", False),
"stdout_match": kwargs.get("stdout_match", None),
"stdout_negative_match": kwargs.get("stdout_negative_match", None),
"validate": kwargs.get("validate", True),
"env": kwargs.get("env", None),
}
)
def _start_daemons(self):
"""Start any necessary daemons in preparation for executing the test."""
raise NotImplementedError("_start_daemons not provided by subclass")
- ###
- ### PUBLIC METHODS
- ###
+ #
+ # PUBLIC METHODS
+ #
def add_cmd(self, cmd, args, validate=True, check_rng=True, check_stderr=True,
env=None):
"""Add a simple command to be executed as part of this test."""
self._new_cmd(cmd, args, ExitStatus.OK, validate=validate, check_rng=check_rng,
check_stderr=check_stderr, env=env)
def add_cmd_and_kill(self, cmd, args, kill_proc):
"""Add a command and system command to be executed as part of this test."""
self._new_cmd(cmd, args, ExitStatus.OK, kill=kill_proc)
def add_cmd_check_stdout(self, cmd, args, match, no_match=None, env=None):
"""Add a simple command with expected output to be executed as part of this test."""
self._new_cmd(cmd, args, ExitStatus.OK, stdout_match=match,
stdout_negative_match=no_match, env=env)
def add_cmd_expected_fail(self, cmd, args, exitcode=ExitStatus.ERROR):
"""Add a command that is expected to fail to be executed as part of this test."""
self._new_cmd(cmd, args, exitcode)
def add_cmd_no_wait(self, cmd, args):
"""Add a simple command to be executed (without waiting) as part of this test."""
self._new_cmd(cmd, args, ExitStatus.OK, no_wait=True)
def add_log_pattern(self, pattern, negative=False, regex=False):
"""Add a pattern that should appear in the test's logs."""
self._patterns.append(Pattern(pattern, negative=negative, regex=regex))
def _signal_dict(self):
"""Return a dictionary mapping signal numbers to their names."""
# FIXME: When we support python >= 3.5, this function can be replaced with:
# signal.Signals(self.daemon_process.returncode).name
return {
getattr(signal, _signame): _signame
for _signame in dir(signal)
if _signame.startswith("SIG") and not _signame.startswith("SIG_")
}
def clean_environment(self):
"""Clean up the host after executing a test."""
if self._daemon_process:
if self._daemon_process.poll() is None:
self._daemon_process.terminate()
self._daemon_process.wait()
else:
rc = self._daemon_process.returncode
signame = self._signal_dict().get(-rc, "RET=%s" % rc)
msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)."
self._result_txt = msg % (self.name, self._daemon_location, signame)
self.exitcode = ExitStatus.ERROR
self._daemon_process = None
self._daemon_output = ""
# the default for utf-8 encoding would error out if e.g. memory corruption
# makes fenced output any kind of 8 bit value - while still interesting
# for debugging and we'd still like the regression-test to go over the
# full set of test-cases
with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile:
for line in logfile.readlines():
self._daemon_output += line
if self.verbose:
print("Daemon Output Start")
print(self._daemon_output)
print("Daemon Output End")
def print_result(self, filler):
"""Print the result of the last test execution."""
print("%s%s" % (filler, self._result_txt))
def run(self):
"""Execute this test."""
i = 1
self.start_environment()
if self.verbose:
print("\n--- START TEST - %s" % self.name)
self._result_txt = "SUCCESS - '%s'" % (self.name)
self.exitcode = ExitStatus.OK
for cmd in self._cmds:
try:
self.run_cmd(cmd)
except ExitCodeError as e:
print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode']))
self.set_error(i, cmd)
break
except OutputNotFoundError as e:
print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e))
self.set_error(i, cmd)
break
except OutputFoundError as e:
print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e))
self.set_error(i, cmd)
break
except XmlValidationError as e:
print("Step %d FAILED - xmllint failed: %s" % (i, e))
self.set_error(i, cmd)
break
if self.verbose:
print("Step %d SUCCESS" % (i))
i += 1
self.clean_environment()
if self.exitcode == ExitStatus.OK:
self._match_log_patterns()
print(self._result_txt)
if self.verbose:
print("--- END TEST - %s\n" % self.name)
self.executed = True
def run_cmd(self, args):
"""Execute a command as part of this test."""
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print("\n\nRunning: %s" % " ".join(cmd))
# FIXME: Using "with" here breaks fencing merge tests.
# pylint: disable=consider-using-with
if args['env']:
new_env = os.environ.copy()
new_env.update(args['env'])
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=new_env)
else:
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print("Also running: %s" % args['kill'])
- ### Typically, the kill argument is used to detect some sort of
- ### failure. Without yielding for a few seconds here, the process
- ### launched earlier that is listening for the failure may not have
- ### time to connect to pacemaker-execd.
+ # Typically, the kill argument is used to detect some sort of
+ # failure. Without yielding for a few seconds here, the process
+ # launched earlier that is listening for the failure may not have
+ # time to connect to pacemaker-execd.
time.sleep(2)
subprocess.Popen(shlex.split(args['kill']))
if not args['no_wait']:
test.wait()
else:
return ExitStatus.OK
output = pipe_communicate(test, check_stderr=args['check_stderr'])
if self.verbose:
print(output)
if test.returncode != args['expected_exitcode']:
raise ExitCodeError(test.returncode)
if args['stdout_match'] is not None and \
re.search(args['stdout_match'], output) is None:
raise OutputNotFoundError(output)
if args['stdout_negative_match'] is not None and \
re.search(args['stdout_negative_match'], output) is not None:
raise OutputFoundError(output)
if args['validate']:
if args['check_rng']:
rng_file = "%s/api/api-result.rng" % rng_directory()
else:
rng_file = None
cmd = find_validator(rng_file)
if not cmd:
raise XmlValidationError("Could not find validator for %s" % rng_file)
if self.verbose:
print("\nRunning: %s" % " ".join(cmd))
with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator:
output = pipe_communicate(validator, check_stderr=True, stdin=output)
if self.verbose:
print(output)
if validator.returncode != 0:
raise XmlValidationError(output)
return ExitStatus.OK
def set_error(self, step, cmd):
"""Record failure of this test."""
msg = "FAILURE - '%s' failed at step %d. Command: %s %s"
self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args'])
self.exitcode = ExitStatus.ERROR
def start_environment(self):
"""Prepare the host for executing a test."""
if os.path.exists(self.logpath):
os.remove(self.logpath)
self._kill_daemons()
self._start_daemons()
logfile = None
init_time = time.time()
update_time = init_time
while True:
# FIXME: Eventually use 'with' here, which seems complicated given
# everything happens in a loop.
# pylint: disable=consider-using-with
time.sleep(0.1)
if not self.force_wait and logfile is None \
and os.path.exists(self.logpath):
logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1")
if not self.force_wait and logfile is not None:
for line in logfile.readlines():
if "successfully started" in line:
return
now = time.time()
if self.timeout > 0 and (now - init_time) >= self.timeout:
if not self.force_wait:
print("\tDaemon %s doesn't seem to have been initialized within %fs."
"\n\tConsider specifying a longer '--timeout' value."
% (self._daemon_location, self.timeout))
return
if self.verbose and (now - update_time) >= 5:
print("Waiting for %s to be initialized: %fs ..."
% (self._daemon_location, now - init_time))
update_time = now
class Tests:
"""The base class for a collection of regression tests."""
def __init__(self, **kwargs):
"""
Create a new Tests instance.
This method must be provided by all subclasses, which must call
Tests.__init__ first.
Keywork arguments:
force_wait --
logdir -- The base directory under which to create a directory
to store output and temporary data.
timeout -- How long to wait for the test to complete.
verbose -- Whether to print additional information, including
verbose command output and daemon log files.
"""
self.force_wait = kwargs.get("force_wait", False)
self.logdir = kwargs.get("logdir", "/tmp")
self.timeout = kwargs.get("timeout", 2)
self.verbose = kwargs.get("verbose", False)
self._tests = []
def exit(self):
"""Exit (with error status code if any test failed)."""
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
sys.exit(ExitStatus.ERROR)
sys.exit(ExitStatus.OK)
def print_list(self):
"""List all registered tests."""
print("\n==== %d TESTS FOUND ====" % len(self._tests))
print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION"))
print("%35s - %s" % ("--------------------", "--------------------"))
for test in self._tests:
print("%35s - %s" % (test.name, test.description))
print("==== END OF LIST ====\n")
def print_results(self):
"""Print summary of results of executed tests."""
failures = 0
success = 0
print("\n\n======= FINAL RESULTS ==========")
print("\n--- FAILURE RESULTS:")
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
failures += 1
test.print_result(" ")
else:
success += 1
if failures == 0:
print(" None")
print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures))
def run_single(self, name):
"""Run a single named test."""
for test in self._tests:
if test.name == name:
test.run()
break
def run_tests(self):
"""Run all tests."""
for test in self._tests:
test.run()
def run_tests_matching(self, pattern):
"""Run all tests whose name matches a pattern."""
for test in self._tests:
if test.name.count(pattern) != 0:
test.run()
diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py
index 1f4ac44246..343cc6e162 100644
--- a/python/pacemaker/_cts/tests/nearquorumpointtest.py
+++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py
@@ -1,121 +1,121 @@
"""Randomly start and stop nodes to bring the cluster close to the quorum point."""
__all__ = ["NearQuorumPointTest"]
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class NearQuorumPointTest(CTSTest):
"""Randomly start and stop nodes to bring the cluster close to the quorum point."""
def __init__(self, cm):
"""
Create a new NearQuorumPointTest instance.
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "NearQuorumPoint"
def __call__(self, dummy):
"""Perform this test."""
self.incr("calls")
startset = []
stopset = []
stonith = self._cm.prepare_fencing_watcher()
- #decide what to do with each node
+ # decide what to do with each node
for node in self._env["nodes"]:
action = self._env.random_gen.choice(["start", "stop"])
if action == "start":
startset.append(node)
elif action == "stop":
stopset.append(node)
self.debug("start nodes:%r" % startset)
self.debug("stop nodes:%r" % stopset)
- #add search patterns
+ # add search patterns
watchpats = []
for node in stopset:
if self._cm.expected_status[node] == "up":
watchpats.append(self.templates["Pat:We_stopped"] % node)
for node in startset:
if self._cm.expected_status[node] == "down":
watchpats.append(self.templates["Pat:Local_started"] % node)
else:
for stopping in stopset:
if self._cm.expected_status[stopping] == "up":
watchpats.append(self.templates["Pat:They_stopped"] % (node, stopping))
if not watchpats:
return self.skipped()
if startset:
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, self._env["DeadTime"] + 10)
watch.set_watch()
- #begin actions
+ # begin actions
for node in stopset:
if self._cm.expected_status[node] == "up":
self._cm.stop_cm_async(node)
for node in startset:
if self._cm.expected_status[node] == "down":
self._cm.start_cm_async(node)
- #get the result
+ # get the result
if watch.look_for_all():
self._cm.cluster_stable()
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
return self.success()
self._logger.log("Warn: Patterns not found: %r" % watch.unmatched)
- #get the "bad" nodes
+ # get the "bad" nodes
upnodes = []
for node in stopset:
if self._cm.stat_cm(node):
upnodes.append(node)
downnodes = []
for node in startset:
if not self._cm.stat_cm(node):
downnodes.append(node)
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
if not upnodes and not downnodes:
self._cm.cluster_stable()
# Make sure they're completely down with no residule
for node in stopset:
self._rsh(node, self.templates["StopCmd"])
return self.success()
if upnodes:
self._logger.log("Warn: Unstoppable nodes: %r" % upnodes)
if downnodes:
self._logger.log("Warn: Unstartable nodes: %r" % downnodes)
return self.failure()
diff --git a/python/pacemaker/_cts/tests/reattach.py b/python/pacemaker/_cts/tests/reattach.py
index b34ec30e24..ca3b541f46 100644
--- a/python/pacemaker/_cts/tests/reattach.py
+++ b/python/pacemaker/_cts/tests/reattach.py
@@ -1,210 +1,210 @@
"""Restart the cluster and verify resources remain running."""
__all__ = ["Reattach"]
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.simulstoplite import SimulStopLite
from pacemaker._cts.tests.starttest import StartTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class Reattach(CTSTest):
"""Restart the cluster and verify that resources remain running throughout."""
def __init__(self, cm):
"""
Create a new Reattach instance.
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "Reattach"
self._startall = SimulStartLite(cm)
self._stopall = SimulStopLite(cm)
def _is_managed(self, node):
"""Return whether resources are managed by the cluster."""
(_, is_managed) = self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
is_managed = is_managed[0].strip()
return is_managed == "true"
def _set_unmanaged(self, node):
"""Disable resource management."""
self.debug("Disable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
def _set_managed(self, node):
"""Enable resource management."""
self.debug("Re-enable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
def _disable_incompatible_rscs(self, node):
"""
Disable resources that are incompatible with this test.
Starts and stops of stonith-class resources are implemented internally
by Pacemaker, which means that they must stop when Pacemaker is
stopped, even if unmanaged. Disable them before running the Reattach
test so they don't affect resource placement.
OCFS2 resources must be disabled too for some reason.
Set target-role to "Stopped" for any of these resources in the CIB.
"""
self.debug("Disable incompatible (stonith/OCFS2) resources")
xml = """'
' --scope rsc_defaults"""
return self._rsh(node, self._cm.templates['CibAddXml'] % xml)
def _enable_incompatible_rscs(self, node):
"""Re-enable resources that were incompatible with this test."""
self.debug("Re-enable incompatible (stonith/OCFS2) resources")
xml = """"""
return self._rsh(node, """cibadmin --delete --xml-text '%s'""" % xml)
def _reprobe(self, node):
"""
Reprobe all resources.
The placement of some resources (such as promotable-1 in the
lab-generated CIB) is affected by constraints using node-attribute-based
rules. An earlier test may have erased the relevant node attribute, so
do a reprobe, which should add the attribute back.
"""
return self._rsh(node, """crm_resource --refresh""")
def setup(self, node):
"""Set up this test."""
if not self._startall(None):
return self.failure("Startall failed")
(rc, _) = self._disable_incompatible_rscs(node)
if rc != ExitStatus.OK:
return self.failure("Couldn't modify CIB to stop incompatible resources")
(rc, _) = self._reprobe(node)
if rc != ExitStatus.OK:
return self.failure("Couldn't reprobe resources")
if not self._cm.cluster_stable(double_check=True):
return self.failure("Cluster did not stabilize after setup")
return self.success()
def teardown(self, node):
"""Tear down this test."""
# Make sure 'node' is up
start = StartTest(self._cm)
start(node)
if not self._is_managed(node):
self._set_managed(node)
(rc, _) = self._enable_incompatible_rscs(node)
if rc != ExitStatus.OK:
return self.failure("Couldn't modify CIB to re-enable incompatible resources")
if not self._cm.cluster_stable():
return self.failure("Cluster did not stabilize after teardown")
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
return self.success()
def __call__(self, node):
"""Perform this test."""
self.incr("calls")
# Conveniently, the scheduler will display this message when disabling
# management, even if fencing is not enabled, so we can rely on it.
managed = self.create_watch(["No fencing will be done"], 60)
managed.set_watch()
self._set_unmanaged(node)
if not managed.look_for_all():
self._logger.log("Patterns not found: %r" % managed.unmatched)
return self.failure("Resource management not disabled")
pats = [
self.templates["Pat:RscOpOK"] % ("start", ".*"),
self.templates["Pat:RscOpOK"] % ("stop", ".*"),
self.templates["Pat:RscOpOK"] % ("promote", ".*"),
self.templates["Pat:RscOpOK"] % ("demote", ".*"),
self.templates["Pat:RscOpOK"] % ("migrate", ".*")
]
watch = self.create_watch(pats, 60, "ShutdownActivity")
watch.set_watch()
self.debug("Shutting down the cluster")
ret = self._stopall(None)
if not ret:
self._set_managed(node)
return self.failure("Couldn't shut down the cluster")
self.debug("Bringing the cluster back up")
ret = self._startall(None)
- time.sleep(5) # allow ping to update the CIB
+ time.sleep(5) # allow ping to update the CIB
if not ret:
self._set_managed(node)
return self.failure("Couldn't restart the cluster")
if self.local_badnews("ResourceActivity:", watch):
self._set_managed(node)
return self.failure("Resources stopped or started during cluster restart")
watch = self.create_watch(pats, 60, "StartupActivity")
watch.set_watch()
# Re-enable resource management (and verify it happened).
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
# Ignore actions for STONITH resources
ignore = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.debug("Ignoring start actions for %s" % r.id)
ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
if self.local_badnews("ResourceActivity:", watch, ignore):
return self.failure("Resources stopped or started after resource management was re-enabled")
return ret
@property
def errors_to_ignore(self):
"""Return a list of errors which should be ignored."""
return [
r"resource( was|s were) active at shutdown"
]
diff --git a/python/pacemaker/_cts/tests/resourcerecover.py b/python/pacemaker/_cts/tests/resourcerecover.py
index 625d084e8c..b6069846f7 100644
--- a/python/pacemaker/_cts/tests/resourcerecover.py
+++ b/python/pacemaker/_cts/tests/resourcerecover.py
@@ -1,169 +1,170 @@
"""Fail a random resource and verify its fail count increases."""
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class ResourceRecover(CTSTest):
"""Fail a random resource."""
def __init__(self, cm):
"""
Create a new ResourceRecover instance.
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "ResourceRecover"
self._action = "asyncmon"
self._interval = 0
self._rid = None
self._rid_alt = None
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def __call__(self, node):
"""Perform this test."""
self.incr("calls")
if not self._startall(None):
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self._cm.active_resources(node)
if not resourcelist:
self._logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self._choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self._rid)
if rsc.id == rsc.clone_id:
self.debug("Failing %s" % rsc.id)
else:
self.debug("Failing %s (also known as %s)" % (rsc.id, rsc.clone_id))
# Log patterns to watch for (failure, plus restart if managed)
pats = [
self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id)
]
if rsc.managed:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._rid))
if rsc.unique:
pats.append(self.templates["Pat:RscOpOK"] % ("start", self._rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self._fail_resource(rsc, node, pats) is None:
# self.failure() already called
return None
return self.success()
def _choose_resource(self, node, resourcelist):
"""Choose a random resource to target."""
self._rid = self._env.random_gen.choice(resourcelist)
self._rid_alt = self._rid
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self._cm, line)
if rsc.id == self._rid:
# Handle anonymous clones that get renamed
self._rid = rsc.clone_id
return rsc
return None
def _get_failcount(self, node):
"""Check the fail count of targeted resource on given node."""
cmd = "crm_failcount --quiet --query --resource %s --operation %s --interval %d --node %s"
(rc, lines) = self._rsh(node, cmd % (self._rid, self._action, self._interval, node),
verbose=1)
if rc != 0 or len(lines) != 1:
lines = [l.strip() for l in lines]
self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(lines)))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self._logger.log("crm_failcount output on %s unparseable: %s" % (node, " ".join(lines)))
return -1
return failcount
def _fail_resource(self, rsc, node, pats):
"""Fail the targeted resource, and verify as expected."""
orig_failcount = self._get_failcount(node)
watch = self.create_watch(pats, 60)
watch.set_watch()
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
with Timer(self._logger, self.name, "recover"):
watch.look_for_all()
self._cm.cluster_stable()
recovered = self._cm.resource_location(self._rid)
if watch.unmatched:
return self.failure("Patterns not found: %r" % watch.unmatched)
if rsc.unique and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %r" % (self._rid, recovered))
if recovered:
self.debug("%s is running on: %r" % (self._rid, recovered))
elif rsc.managed:
return self.failure("%s was not recovered and is inactive" % self._rid)
new_failcount = self._get_failcount(node)
if new_failcount != orig_failcount + 1:
return self.failure("%s fail count is %d not %d"
% (self._rid, new_failcount, orig_failcount + 1))
- return 0 # Anything but None is success
+ # Anything but None is success
+ return 0
@property
def errors_to_ignore(self):
"""Return a list of errors which should be ignored."""
return [
r"Updating failcount for %s" % self._rid,
r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt),
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self._action, self._rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval)
]