diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index d140a15a6e..a790ee9806 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1052 +1,1052 @@
"""Auditing classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
"""
The base class for various kinds of auditors.
Specific audit implementations should be built on top of this one. Audits
can do all kinds of checks on the system. The basic interface for callers
is the `__call__` method, which returns True if the audit passes and False
if it fails.
"""
def __init__(self, cm):
"""
Create a new ClusterAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
"""Perform the audit action."""
raise NotImplementedError
def is_applicable(self):
"""
Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
"""Log a message."""
self._cm.log(f"audit: {args}")
def debug(self, args):
"""Log a debug message."""
self._cm.debug(f"audit: {args}")
class LogAudit(ClusterAudit):
"""
Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that we
can read back that test message using logging tools.
"""
def __init__(self, cm):
"""
Create a new LogAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
"""Restart logging on the given nodes, or all if none are given."""
if not nodes:
nodes = self._cm.env["nodes"]
self._cm.debug(f"Restarting logging on: {nodes!r}")
for node in nodes:
if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log(f"ERROR: Cannot stop 'systemd-journald' on {node}")
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log(f"ERROR: Cannot start 'systemd-journald' on {node}")
if "syslogd" in self._cm.env:
(rc, _) = self._cm.rsh(node, f"service {self._cm.env['syslogd']} restart")
if rc != 0:
self._cm.log(f"""ERROR: Cannot restart '{self._cm.env["syslogd"]}' on {node}""")
def _create_watcher(self, patterns, kind):
"""Create a new LogWatcher instance for the given patterns."""
watch = LogWatcher(self._cm.env["LogFileName"], patterns,
self._cm.env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
"""Perform the log audit."""
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self._cm.env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append(f"{simple}.*{prefix} {node} {suffix}")
watch_pref = self._cm.env["log_kind"]
if watch_pref is None:
kinds = [LogKind.LOCAL_FILE]
if self._cm.env["have_systemd"]:
kinds.append(LogKind.JOURNAL)
kinds.append(LogKind.REMOTE_FILE)
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log(f"Logging test message with identifier {suffix}")
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.env["nodes"]:
cmd = f"logger -p {self._cm.env['SyslogFacility']}.info {prefix} {node} {suffix}"
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log(f"ERROR: Cannot execute remote command [{cmd}] on {node}")
for k, w in watch.items():
if watch_pref is None:
self._cm.log(f"Checking for test message in {k} logs")
w.look_for_all(silent=True)
if not w.unmatched:
if watch_pref is None:
self._cm.log(f"Found test message in {k} logs")
self._cm.env["log_kind"] = k
return True
for regex in w.unmatched:
self._cm.log(f"Test message [{regex}] not found in {w.kind} logs")
return False
def __call__(self):
"""Perform the audit action."""
max_attempts = 3
attempt = 0
passed = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60 * attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
passed = False
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
if self._cm.env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
"""
Audit disk usage on cluster nodes.
Verify that there is enough free space left on whichever mounted file
system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
"""
Create a new DiskAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
"""Perform the audit action."""
passed = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log(f"ERROR: Cannot execute remote df command [{dfcmd}] on {node}")
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log(f"Warning: df output '{dfout}' from {node} was invalid [{used}, {remain}]")
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log(f"CRIT: Out of log disk space on {node} ({used_percent}% / {remaining_mb}MB)")
passed = False
if not should_continue(self._cm.env):
raise ValueError(f"Disk full on {node}")
elif remaining_mb < 100 or used_percent > 90:
self._cm.log(f"WARN: Low on log disk space ({remaining_mb}MB) on {node}")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
return True
class FileAudit(ClusterAudit):
"""
Audit the filesystem looking for various failure conditions.
Check for:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
"""
Create a new FileAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def _output_has_core(self, output, node):
"""Check output for any lines that would indicate the presence of a core dump."""
found = False
for line in output:
line = line.strip()
if line in self.known:
continue
found = True
self.known.append(line)
self._cm.log(f"Warning: core file on {node}: {line}")
return found
def _find_core_with_coredumpctl(self, node):
"""Use coredumpctl to find core dumps on the given node."""
(_, lsout) = self._cm.rsh(node, "coredumpctl --no-legend --no-pager")
return self._output_has_core(lsout, node)
def _find_core_on_fs(self, node, paths):
"""Check for core dumps on the given node, under any of the given paths."""
(_, lsout) = self._cm.rsh(node, f"ls -al {' '.join(paths)} | grep core.[0-9]",
verbose=1)
return self._output_has_core(lsout, node)
def __call__(self):
"""Perform the audit action."""
passed = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
found = False
# If systemd is present, first see if coredumpctl logged any core dumps.
if self._cm.env["have_systemd"]:
found = self._find_core_with_coredumpctl(node)
if found:
passed = False
# If we didn't find any core dumps, it's for one of three reasons:
# (1) Nothing crashed
# (2) systemd is not present
# (3) systemd is present but coredumpctl is not enabled
#
# To handle the last two cases, check the other filesystem locations.
if not found:
found = self._find_core_on_fs(node, ["/var/lib/pacemaker/cores/*",
"/var/lib/corosync"])
if found:
passed = False
if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
passed = False
clean = True
self._cm.log(f"Warning: Stale IPC file on {node}: {line}")
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug(f"ps[{node}]: {line}")
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug(f"Skipping {node}")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
return True
class AuditResource:
"""A base class for storing information about a cluster resource."""
def __init__(self, cm, line):
"""
Create a new AuditResource instance.
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
"""Return True if this resource is unique."""
return self.flags & 0x20
@property
def orphan(self):
"""Return True if this resource is an orphan."""
return self.flags & 0x01
@property
def managed(self):
"""Return True if this resource is managed by the cluster."""
return self.flags & 0x02
class AuditConstraint:
"""A base class for storing information about a cluster constraint."""
def __init__(self, cm, line):
"""
Create a new AuditConstraint instance.
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
"""
Audit primitive resources to verify a variety of conditions.
Check that:
* Resources are active and managed only when expected
* Resources are active on the expected cluster node
* Resources are not orphaned
"""
def __init__(self, cm):
"""
Create a new PrimitiveAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
"""Perform the audit of a single resource."""
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug(f"Resource {resource.id} active on {active!r}")
elif resource.needs_quorum == 1:
self._cm.log(f"Resource {resource.id} active without quorum: {active!r}")
rc = False
elif not resource.managed:
self._cm.log(f"Resource {resource.id} not managed. Active on {active!r}")
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug(f"Non-unique resource {resource.id} is active on: {active!r}")
else:
self.debug(f"Non-unique resource {resource.id} is not active")
elif len(active) > 1:
self._cm.log(f"Resource {resource.id} is active multiple times: {active!r}")
rc = False
elif resource.orphan:
self.debug(f"Resource {resource.id} is an inactive orphan")
elif not self._inactive_nodes:
self._cm.log(f"WARN: Resource {resource.id} not served anywhere")
rc = False
elif self._cm.env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log(f"WARN: Resource {resource.id} not served anywhere "
f"(Inactive nodes: {self._inactive_nodes!r})")
else:
self.debug(f"Resource {resource.id} not served anywhere "
f"(Inactive nodes: {self._inactive_nodes!r})")
elif quorum or not resource.needs_quorum:
self.debug(f"Resource {resource.id} not served anywhere "
f"(Inactive nodes: {self._inactive_nodes!r})")
return rc
def _setup(self):
"""
Verify cluster nodes are active.
Collect resource and colocation information used for performing the audit.
"""
for node in self._cm.env["nodes"]:
if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.env["nodes"]:
if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug(f"No nodes active - skipping {self.name}")
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource --list-cts",
verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log(f"Unknown entry: {line}")
return True
def __call__(self):
"""Perform the audit action."""
passed = True
if not self._setup():
return passed
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
passed = False
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- # if self._cm["Name"] == "crm-corosync":
+ # if self._cm.name == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
"""
Audit group resources.
Check that:
* Each of its child primitive resources is active on the expected cluster node
"""
def __init__(self, cm):
"""
Create a new GroupAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
passed = True
if not self._setup():
return passed
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
passed = False
self._cm.log(f"Child {child.id} of {group.id} is active more than once: {nodes!r}")
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug(f"Child {child.id} of {group.id} is stopped")
elif nodes[0] != group_location:
passed = False
self._cm.log(f"Child {child.id} of {group.id} is active on the wrong "
f"node ({nodes[0]}) expected {group_location}")
else:
self.debug(f"Child {child.id} of {group.id} is active on {nodes[0]}")
return passed
class CloneAudit(PrimitiveAudit):
"""
Audit clone resources.
NOTE: Currently, this class does not perform any actual audit functions.
"""
def __init__(self, cm):
"""
Create a new CloneAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
passed = True
if not self._setup():
return passed
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug(f"Checking child {child.id} of {clone.id}...")
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return passed
class ColocationAudit(PrimitiveAudit):
"""
Audit cluster resources.
Check that:
* Resources are colocated with the expected resource
"""
def __init__(self, cm):
"""
Create a new ColocationAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
"""Return a list of cluster nodes where a given resource is running."""
(rc, lines) = self._cm.rsh(self._target,
f"crm_resource --locate -r {resource} -Q",
verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
passed = True
if not self._setup():
return passed
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug(f"Colocation audit ({coloc.id}): {coloc.rsc} not running")
else:
for node in source:
if node not in target:
passed = False
self._cm.log(f"Colocation audit ({coloc.id}): {coloc.rsc} running "
f"on {node} (not in {target!r})")
else:
self.debug(f"Colocation audit ({coloc.id}): {coloc.rsc} running "
f"on {node} (in {target!r})")
return passed
class ControllerStateAudit(ClusterAudit):
"""Verify active and inactive resources."""
def __init__(self, cm):
"""
Create a new ControllerStateAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
passed = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.env["nodes"]:
should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
passed = False
self._cm.log(f"Cluster is not stable: {len(unstable_list)} (of "
f"{self._cm.upcount()}): {unstable_list!r}")
if up_are_down > 0:
passed = False
self._cm.log(f"{up_are_down} (of {len(self._cm.env['nodes'])}) nodes "
"expected to be up were down.")
if down_are_up > 0:
passed = False
self._cm.log(f"{down_are_up} (of {len(self._cm.env['nodes'])}) nodes "
"expected to be down were up.")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- # if self._cm["Name"] == "crm-corosync":
+ # if self._cm.name == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
"""Audit the CIB by verifying that it is identical across cluster nodes."""
def __init__(self, cm):
"""
Create a new CIBAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
passed = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return passed
for partition in ccm_partitions:
self.debug(f"\tAuditing CIB consistency for: {partition}")
if self._audit_cib_contents(partition) == 0:
passed = False
return passed
def _audit_cib_contents(self, hostlist):
"""Perform the CIB audit on the given hosts."""
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log(f"Could not perform audit: No configuration from {node}")
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log(f"Could not perform audit: No configuration from {node0}")
passed = False
else:
(rc, result) = self._cm.rsh(
node0, f"crm_diff -VV -cf --new {node_xml} --original {node0_xml}", verbose=1)
if rc != 0:
self._cm.log(f"Diff between {node0_xml} and {node_xml} failed: {rc}")
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug(f"CibDiff[{node0}-{node}]: {line}")
else:
self.debug(f"CibDiff[{node0}-{node}] Ignoring: {line}")
return passed
def _store_remote_cib(self, node, target):
"""
Store a copy of the given node's CIB on the given target node.
If no target is given, store the CIB on the given node.
"""
filename = f"/tmp/ctsaudit.{node}.xml"
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", f"rm -f {filename}")
for line in lines:
self._cm.rsh("localhost", f"echo \'{line[:-1]}\' >> {filename}", verbose=0)
if self._cm.rsh.copy(filename, f"root@{target}:{filename}", silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- # if self._cm["Name"] == "crm-corosync":
+ # if self._cm.name == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
"""
Audit each partition in a cluster to verify a variety of conditions.
Check that:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
"""
Create a new PartitionAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
passed = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return passed
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log(f"ERROR: {len(ccm_partitions)} cluster partitions detected:")
passed = False
for partition in ccm_partitions:
self._cm.log(f"\t {partition}")
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
passed = False
return passed
def _trim_string(self, avalue):
"""Remove the last character from a multi-character string."""
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
"""Remove the last character from a multi-character string and convert the result to an int."""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
"""Perform the audit of a single partition."""
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug(f"Auditing partition: {partition}")
for node in node_list:
if self._cm.expected_status[node] != "up":
self._cm.log(f"Warn: Node {node} appeared out of nowhere")
self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug(f"Node {node}: {self._node_state[node]} - {self._node_epoch[node]} - {self._node_quorum[node]}.")
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log(f"Warn: Node {node} disappeared: can't determine epoch")
self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log(f"Lowest epoch not determined in {partition}")
passed = False
for node in node_list:
if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug(f"{node}: OK")
elif not self._node_epoch[node]:
self.debug(f"Check on {node} ignored: no node epoch")
elif not lowest_epoch:
self.debug(f"Check on {node} ignored: no lowest epoch")
else:
self._cm.log(f"DC {node} is not the oldest node "
f"({self._node_epoch[node]} vs. {lowest_epoch})")
passed = False
if not dc_found:
self._cm.log(f"DC not found on any of the {len(dc_allowed_list)} allowed "
f"nodes: {dc_allowed_list} (of {node_list})")
elif len(dc_found) > 1:
self._cm.log(f"{len(dc_found)} DCs ({dc_found}) found in cluster partition: {node_list}")
passed = False
if not passed:
for node in node_list:
if self._cm.expected_status[node] == "up":
self._cm.log(f"epoch {self._node_epoch[node]} : {self._node_state[node]}")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
- # if self._cm["Name"] == "crm-corosync":
+ # if self._cm.name == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
"""Return a list of instances of applicable audits that can be performed."""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py
index cb1e17b26c..70be04f23a 100644
--- a/python/pacemaker/_cts/clustermanager.py
+++ b/python/pacemaker/_cts/clustermanager.py
@@ -1,899 +1,890 @@
"""ClusterManager class for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["ClusterManager"]
__copyright__ = """Copyright 2000-2025 the Pacemaker project contributors.
Certain portions by Huang Zhen are copyright 2004
International Business Machines. The version control history for this file
may have further details."""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import time
from collections import UserDict
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.CTS import NodeStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.cib import ConfigFactory
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogWatcher
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory).
# @TODO See if type annotations fix this.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
# ClusterManager has a lot of methods.
# pylint: disable=too-many-public-methods
class ClusterManager(UserDict):
"""
An abstract base class for managing the cluster.
This class implements high-level operations on the cluster and/or its cluster
managers. Actual cluster-specific management classes should be subclassed
from this one.
Among other things, this class tracks the state every node is expected to be in.
"""
def _final_conditions(self):
"""Check all keys to make sure they have a non-None value."""
for (key, val) in self._data.items():
if val is None:
raise ValueError(f"Improper derivation: self[{key}] must be overridden by subclass.")
def __init__(self):
"""
Create a new ClusterManager instance.
This class can be treated kind of like a dictionary due to the process
of certain dict functions like __getitem__ and __setitem__. This is
because it contains a lot of name/value pairs. However, it is not
actually a dictionary so do not rely on standard dictionary behavior.
"""
# Eventually, ClusterManager should not be a UserDict subclass. Until
# that point...
# pylint: disable=super-init-not-called
self.__instance_errors_to_ignore = []
self._cib_installed = False
self._data = {}
self._logger = LogFactory()
self.env = EnvFactory().getInstance()
self.expected_status = {}
self.name = self.env["Name"]
# pylint: disable=invalid-name
self.ns = NodeStatus(self.env)
self.our_node = os.uname()[1].lower()
self.partitions_expected = 1
self.rsh = RemoteFactory().getInstance()
self.templates = PatternSelector(self.name)
self._final_conditions()
self._cib_factory = ConfigFactory(self)
self._cib = self._cib_factory.create_config(self.env["Schema"])
self._cib_sync = {}
def __getitem__(self, key):
"""
- Return the given key, checking for it in several places.
+ Return the given key, checking for it in two places.
- If key is "Name", return the name of the cluster manager. If the key
- was previously added to the dictionary via __setitem__, return that.
- Otherwise, return the template pattern for the key.
+ If the key was previously added to the dictionary via
+ __setitem__, return that. Otherwise, return the template
+ pattern for the key.
This method should not be used and may be removed in the future.
"""
- if key == "Name":
- return self.name
-
print(f"FIXME: Getting {key} from {self!r}")
if key in self._data:
return self._data[key]
return self.templates.get_patterns(key)
def __setitem__(self, key, value):
"""
Set the given key to the given value, overriding any previous value.
This method should not be used and may be removed in the future.
"""
print(f"FIXME: Setting {key}={value} on {self!r}")
self._data[key] = value
def clear_instance_errors_to_ignore(self):
"""Reset instance-specific errors to ignore on each iteration."""
self.__instance_errors_to_ignore = []
@property
def instance_errors_to_ignore(self):
"""Return a list of known errors that should be ignored for a specific test instance."""
return self.__instance_errors_to_ignore
@property
def errors_to_ignore(self):
"""Return a list of known error messages that should be ignored."""
return self.templates.get_patterns("BadNewsIgnore")
def log(self, args):
"""Log a message."""
self._logger.log(args)
def debug(self, args):
"""Log a debug message."""
self._logger.debug(args)
def upcount(self):
"""Return how many nodes are up."""
count = 0
for node in self.env["nodes"]:
if self.expected_status[node] == "up":
count += 1
return count
def install_support(self, command="install"):
"""
Install or uninstall the CTS support files.
This includes various init scripts and data, daemons, fencing agents, etc.
"""
for node in self.env["nodes"]:
self.rsh(node, f"{BuildOptions.DAEMON_DIR}/cts-support {command}")
def prepare_fencing_watcher(self):
"""Return a LogWatcher object that watches for fencing log messages."""
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
if self.has_quorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonith_pats = []
for peer in self.env["nodes"]:
if self.expected_status[peer] == "up":
continue
stonith_pats.extend([
self.templates["Pat:Fencing_ok"] % peer,
self.templates["Pat:Fencing_start"] % peer,
])
stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"],
self.env["log_kind"], "StartupFencing", 0)
stonith.set_watch()
return stonith
def fencing_cleanup(self, node, stonith):
"""Wait for a previously fenced node to return to the cluster."""
peer_list = []
peer_state = {}
self.debug(f"Looking for nodes that were fenced as a result of {node} starting")
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.has_quorum(None)
if not q and len(self.env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
self.debug(f"Quorum: {q} Len: {len(self.env['nodes'])}")
return peer_list
for n in self.env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug(f"looking for: {stonith.regexes!r}")
shot = stonith.look(0)
while shot:
self.debug(f"Found: {shot!r}")
del stonith.regexes[stonith.whichmatch]
# Extract node name
for n in self.env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self._logger.log(f"ERROR: Unknown stonith match: {shot!r}")
elif peer not in peer_list:
self.debug(f"Found peer: {peer}")
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(f" Peer {peer} was fenced as a result of {node} starting: {peer_state[peer]}")
if self.env["at-boot"]:
self.expected_status[peer] = "up"
else:
self.expected_status[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while stonith.regexes and shot:
self.debug(f"Found: {shot!r}")
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
self.ns.wait_for_node(peer, self.env["DeadTime"])
# Poll until it comes up
if self.env["at-boot"]:
if not self.stat_cm(peer):
time.sleep(self.env["StartTime"])
if not self.stat_cm(peer):
self._logger.log(f"ERROR: Peer {peer} failed to restart after being fenced")
return None
return peer_list
def start_cm(self, node, verbose=False):
"""Start up the cluster manager on a given node."""
- if verbose:
- self._logger.log(f"Starting {self.templates['Name']} on node {node}")
- else:
- self.debug(f"Starting {self.templates['Name']} on node {node}")
+ log_fn = self._logger.log if verbose else self.debug
+ log_fn(f"Starting {self.name} on node {node}")
if node not in self.expected_status:
self.expected_status[node] = "down"
if self.expected_status[node] != "down":
return True
# Technically we should always be able to notice ourselves starting
patterns = [
self.templates["Pat:Local_started"] % node,
]
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(self.env["LogFileName"], patterns,
self.env["nodes"], self.env["log_kind"],
"StartaCM", self.env["StartTime"] + 10)
self.install_config(node)
self.expected_status[node] = "any"
if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
self._logger.log(f"{node} was already started")
return True
stonith = self.prepare_fencing_watcher()
watch.set_watch()
(rc, _) = self.rsh(node, self.templates["StartCmd"])
if rc != 0:
self._logger.log(f"Warn: Start command failed on node {node}")
self.fencing_cleanup(node, stonith)
return False
self.expected_status[node] = "up"
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self._logger.log(f"Warn: Startup pattern not found: {regex}")
if watch_result and self.cluster_stable(self.env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
self._logger.log(f"Warn: Start failed for node {node}")
return False
def start_cm_async(self, node, verbose=False):
"""Start up the cluster manager on a given node without blocking."""
- if verbose:
- self._logger.log(f"Starting {self['Name']} on node {node}")
- else:
- self.debug(f"Starting {self['Name']} on node {node}")
+ log_fn = self._logger.log if verbose else self.debug
+ log_fn(f"Starting {self.name} on node {node}")
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=False)
self.expected_status[node] = "up"
def stop_cm(self, node, verbose=False, force=False):
"""Stop the cluster manager on a given node."""
- if verbose:
- self._logger.log(f"Stopping {self['Name']} on node {node}")
- else:
- self.debug(f"Stopping {self['Name']} on node {node}")
+ log_fn = self._logger.log if verbose else self.debug
+ log_fn(f"Stopping {self.name} on node {node}")
if self.expected_status[node] != "up" and not force:
return True
(rc, _) = self.rsh(node, self.templates["StopCmd"])
if rc == 0:
# Make sure we can continue even if corosync leaks
self.expected_status[node] = "down"
self.cluster_stable(self.env["DeadTime"])
return True
- self._logger.log(f"ERROR: Could not stop {self['Name']} on node {node}")
+ self._logger.log(f"ERROR: Could not stop {self.name} on node {node}")
return False
def stop_cm_async(self, node):
"""Stop the cluster manager on a given node without blocking."""
- self.debug(f"Stopping {self['Name']} on node {node}")
+ self.debug(f"Stopping {self.name} on node {node}")
self.rsh(node, self.templates["StopCmd"], synchronous=False)
self.expected_status[node] = "down"
def startall(self, nodelist=None, verbose=False, quick=False):
"""Start the cluster manager on every node in the cluster, or on every node in nodelist."""
if not nodelist:
nodelist = self.env["nodes"]
for node in nodelist:
if self.expected_status[node] == "down":
self.ns.wait_for_all_nodes(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
return self.start_cm(nodelist[0], verbose=verbose)
# Approximation of SimulStartList for --boot
watchpats = [
self.templates["Pat:DC_IDLE"],
]
for node in nodelist:
watchpats.extend([
self.templates["Pat:InfraUp"] % node,
self.templates["Pat:PacemakerUp"] % node,
self.templates["Pat:Local_started"] % node,
self.templates["Pat:They_up"] % (nodelist[0], node),
])
# Start all the nodes - at about the same time...
watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"],
self.env["log_kind"], "fast-start",
self.env["DeadTime"] + 10)
watch.set_watch()
if not self.start_cm(nodelist[0], verbose=verbose):
return False
for node in nodelist:
self.start_cm_async(node, verbose=verbose)
watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self._logger.log(f"Warn: Startup pattern not found: {regex}")
if not self.cluster_stable():
self._logger.log("Cluster did not stabilize")
return False
return True
def stopall(self, nodelist=None, verbose=False, force=False):
"""Stop the cluster manager on every node in the cluster, or on every node in nodelist."""
ret = True
if not nodelist:
nodelist = self.env["nodes"]
for node in self.env["nodes"]:
if self.expected_status[node] == "up" or force:
if not self.stop_cm(node, verbose=verbose, force=force):
ret = False
return ret
def statall(self, nodelist=None):
"""Return the status of the cluster manager on every node in the cluster, or on every node in nodelist."""
result = {}
if not nodelist:
nodelist = self.env["nodes"]
for node in nodelist:
if self.stat_cm(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
"""Break communication between the target node and all other nodes in the cluster, or nodes."""
if not nodes:
nodes = self.env["nodes"]
for node in nodes:
if node == target:
continue
(rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node)
if rc != 0:
self._logger.log(f"Could not break the communication between {target} and {node}: {rc}")
return False
self.debug(f"Communication cut between {target} and {node}")
return True
def unisolate_node(self, target, nodes=None):
"""Re-establish communication between the target node and all other nodes in the cluster, or nodes."""
if not nodes:
nodes = self.env["nodes"]
for node in nodes:
if node == target:
continue
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False)
self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False)
self.debug(f"Communication restored between {target} and {node}")
def oprofile_start(self, node=None):
"""Start profiling on the given node, or all nodes in the cluster."""
if not node:
for n in self.env["oprofile"]:
self.oprofile_start(n)
elif node in self.env["oprofile"]:
self.debug(f"Enabling oprofile on {node}")
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
def oprofile_save(self, test, node=None):
"""Save profiling data and restart profiling on the given node, or all nodes in the cluster."""
if not node:
for n in self.env["oprofile"]:
self.oprofile_save(test, n)
elif node in self.env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, f"opcontrol --save=cts.{test}")
# Read back with: opreport -l session:cts.0 image:/c*
self.oprofile_stop(node)
self.oprofile_start(node)
def oprofile_stop(self, node=None):
"""
Start profiling on the given node, or all nodes in the cluster.
This does not save profiling data, so call oprofile_save first if needed.
"""
if not node:
for n in self.env["oprofile"]:
self.oprofile_stop(n)
elif node in self.env["oprofile"]:
self.debug(f"Stopping oprofile on {node}")
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
def install_config(self, node):
"""Remove and re-install the CIB on the first node in the cluster."""
if not self.ns.wait_for_node(node):
self.log(f"Node {node} is not up.")
return
if node in self._cib_sync or not self.env["ClobberCIB"]:
return
self._cib_sync[node] = True
self.rsh(node, f"rm -f {BuildOptions.CIB_DIR}/cib*")
# Only install the CIB on the first node, all the other ones will pick it up from there
if self._cib_installed:
return
self._cib_installed = True
if self.env["CIBfilename"]:
self.log(f"Installing CIB ({self.env['CIBfilename']}) on node {node}")
rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
if rc != 0:
raise ValueError(f"Can not scp file to {node} {rc}")
else:
self.log(f"Installing Generated CIB on node {node}")
self._cib.install(node)
self.rsh(node, f"chown {BuildOptions.DAEMON_USER} {BuildOptions.CIB_DIR}/cib.xml")
def prepare(self):
"""
Finish initialization.
Clear out the expected status and record the current status of every
node in the cluster.
"""
self.partitions_expected = 1
for node in self.env["nodes"]:
self.expected_status[node] = ""
if self.env["experimental-tests"]:
self.unisolate_node(node)
self.stat_cm(node)
def test_node_cm(self, node):
"""
Check the status of a given node.
Returns 0 if the node is down, 1 if the node is up but unstable, and 2
if the node is up and stable.
"""
watchpats = [
"Current ping state: (S_IDLE|S_NOT_DC)",
self.templates["Pat:NonDC_started"] % node,
self.templates["Pat:DC_started"] % node,
]
idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node],
self.env["log_kind"], "ClusterIdle")
idle_watch.set_watch()
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if not out:
out = ""
else:
out = out[0].strip()
self.debug(f"Node {node} status: '{out}'")
if out.find('ok') < 0:
if self.expected_status[node] == "up":
self.log(f"Node status for {node} is down but we think it should be {self.expected_status[node]}")
self.expected_status[node] = "down"
return 0
if self.expected_status[node] == "down":
self.log(f"Node status for {node} is up but we think it should be {self.expected_status[node]}: {out}")
self.expected_status[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug(f"Warn: Node {node} is unstable: {out}")
return 1
# Up and stable
return 2
def stat_cm(self, node):
"""Report the status of the cluster manager on a given node."""
return self.test_node_cm(node) > 0
# Being up and being stable is not the same question...
def node_stable(self, node):
"""Return whether or not the given node is stable."""
if self.test_node_cm(node) == 2:
return True
self.log(f"Warn: Node {node} not stable")
return False
def partition_stable(self, nodes, timeout=None):
"""Return whether or not all nodes in the given partition are stable."""
watchpats = [
"Current ping state: S_IDLE",
self.templates["Pat:DC_IDLE"],
]
self.debug("Waiting for cluster stability...")
if timeout is None:
timeout = self.env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return True
idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(),
self.env["log_kind"], "ClusterStable", timeout)
idle_watch.set_watch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return True
ret = idle_watch.look()
self.debug(f"Warn: Partition {nodes!r} not IDLE after {timeout}s")
return False
def cluster_stable(self, timeout=None, double_check=False):
"""Return whether or not all nodes in the cluster are stable."""
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
if not double_check:
return True
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
return True
def is_node_dc(self, node, status_line=None):
"""
Return whether or not the given node is the cluster DC.
Check the given status_line, or query the cluster if None.
"""
if not status_line:
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if out:
status_line = out[0].strip()
if not status_line:
return False
if status_line.find('S_IDLE') != -1:
return True
if status_line.find('S_INTEGRATION') != -1:
return True
if status_line.find('S_FINALIZE_JOIN') != -1:
return True
if status_line.find('S_POLICY_ENGINE') != -1:
return True
if status_line.find('S_TRANSITION_ENGINE') != -1:
return True
return False
def active_resources(self, node):
"""Return a list of primitive resources active on the given node."""
(_, output) = self.rsh(node, "crm_resource -c", verbose=1)
resources = []
for line in output:
if not re.search("^Resource", line):
continue
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
def resource_location(self, rid):
"""Return a list of nodes on which the given resource is running."""
resource_nodes = []
for node in self.env["nodes"]:
if self.expected_status[node] != "up":
continue
cmd = self.templates["RscRunning"] % rid
(rc, lines) = self.rsh(node, cmd)
if rc == 127:
self.log(f"Command '{cmd}' failed. Binary or pacemaker-cts package not installed?")
for line in lines:
self.log(f"Output: {line} ")
elif rc == 0:
resource_nodes.append(node)
return resource_nodes
def find_partitions(self):
"""
Return a list of all partitions in the cluster.
Each element of the list is itself a list of all active nodes in that
partition.
"""
ccm_partitions = []
for node in self.env["nodes"]:
if self.expected_status[node] != "up":
self.debug(f"Node {node} is down... skipping")
continue
(_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
if not out:
self.log(f"no partition details for {node}")
continue
partition = out[0].strip()
if len(partition) <= 2:
self.log(f"bad partition details for {node}")
continue
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug(f"Adding partition from {node}: {partition}")
ccm_partitions.append(partition)
else:
self.debug(f"Partition '{partition}' from {node} is consistent with existing entries")
self.debug(f"Found partitions: {ccm_partitions!r}")
return ccm_partitions
def has_quorum(self, node_list):
"""Return whether or not the cluster has quorum."""
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.env["nodes"]
for node in node_list:
if self.expected_status[node] != "up":
continue
(rc, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
if rc != ExitStatus.OK:
self.debug(f"WARN: Quorum check on {node} returned error ({rc})")
continue
quorum = quorum[0].strip()
if quorum.find("1") != -1:
return True
if quorum.find("0") != -1:
return False
self.debug(f"WARN: Unexpected quorum test result from {node}:{quorum}")
return False
@property
def components(self):
"""
Return a list of all patterns that should be ignored for the cluster's components.
This must be provided by all subclasses.
"""
raise NotImplementedError
def in_standby_mode(self, node):
"""Return whether or not the node is in Standby."""
(_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
if not out:
return False
out = out[0].strip()
self.debug(f"Standby result: {out}")
return out == "on"
def set_standby_mode(self, node, status):
"""
Set node to Standby if status is True, or Active if status is False.
Return whether the node is now in the requested status.
"""
current_status = self.in_standby_mode(node)
if current_status == status:
return True
if status:
cmd = self.templates["StandbyCmd"] % (node, "on")
else:
cmd = self.templates["StandbyCmd"] % (node, "off")
(rc, _) = self.rsh(node, cmd)
return rc == 0
def add_dummy_rsc(self, node, rid):
"""Add a dummy resource with the given ID to the given node."""
rsc_xml = f""" '
'"""
constraint_xml = f""" '
'"""
self.rsh(node, self.templates['CibAddXml'] % rsc_xml)
self.rsh(node, self.templates['CibAddXml'] % constraint_xml)
def remove_dummy_rsc(self, node, rid):
"""Remove the previously added dummy resource given by rid on the given node."""
constraint = f"\"//rsc_location[@rsc='{rid}']\""
rsc = f"\"//primitive[@id='{rid}']\""
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/python/pacemaker/_cts/tests/ctstest.py b/python/pacemaker/_cts/tests/ctstest.py
index 16d0f23e7b..9e78baa4fe 100644
--- a/python/pacemaker/_cts/tests/ctstest.py
+++ b/python/pacemaker/_cts/tests/ctstest.py
@@ -1,242 +1,242 @@
"""Base classes for CTS tests."""
__all__ = ["CTSTest"]
__copyright__ = "Copyright 2000-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.timer import Timer
from pacemaker._cts.watcher import LogWatcher
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
class CTSTest:
"""
The base class for all cluster tests.
This implements a basic set of properties and behaviors like setup, tear
down, time keeping, and statistics tracking. It is up to specific tests
to implement their own specialized behavior on top of this class.
"""
def __init__(self, cm):
"""
Create a new CTSTest instance.
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self.audits = []
self.name = None
- self.templates = PatternSelector(cm["Name"])
+ self.templates = PatternSelector(cm.name)
self.stats = {
"auditfail": 0,
"calls": 0,
"failure": 0,
"skipped": 0,
"success": 0
}
self._cm = cm
self._env = EnvFactory().getInstance()
self._rsh = RemoteFactory().getInstance()
self._logger = LogFactory()
self._timers = {}
self.benchmark = True # which tests to benchmark
self.failed = False
self.is_experimental = False
self.is_loop = False
self.is_unsafe = False
self.passed = True
def log(self, args):
"""Log a message."""
self._logger.log(args)
def debug(self, args):
"""Log a debug message."""
self._logger.debug(args)
def get_timer(self, key="test"):
"""Get the start time of the given timer."""
try:
return self._timers[key].start_time
except KeyError:
return 0
def set_timer(self, key="test"):
"""Set the start time of the given timer to now, and return that time."""
if key not in self._timers:
self._timers[key] = Timer(self._logger, self.name, key)
self._timers[key].start()
return self._timers[key].start_time
def log_timer(self, key="test"):
"""Log the elapsed time of the given timer."""
if key not in self._timers:
return
elapsed = self._timers[key].elapsed
self.debug(f"{self.name}:{key} runtime: {elapsed:.2f}")
del self._timers[key]
def incr(self, name):
"""Increment the given stats key."""
if name not in self.stats:
self.stats[name] = 0
self.stats[name] += 1
# Reset the test passed boolean
if name == "calls":
self.passed = True
def failure(self, reason="none"):
"""Increment the failure count, with an optional failure reason."""
self.passed = False
self.incr("failure")
self._logger.log(f"{f'Test {self.name}':<35} FAILED: {reason}")
return False
def success(self):
"""Increment the success count."""
self.incr("success")
return True
def skipped(self):
"""Increment the skipped count."""
self.incr("skipped")
return True
def __call__(self, node):
"""Perform this test."""
raise NotImplementedError
def audit(self):
"""Perform all the relevant audits (see ClusterAudit), returning whether or not they all passed."""
passed = True
for audit in self.audits:
if not audit():
self._logger.log(f"Internal {self.name} Audit {audit.name} FAILED.")
self.incr("auditfail")
passed = False
return passed
def setup(self, node):
"""Set up this test."""
# node is used in subclasses
# pylint: disable=unused-argument
return self.success()
def teardown(self, node):
"""Tear down this test."""
# node is used in subclasses
# pylint: disable=unused-argument
return self.success()
def create_watch(self, patterns, timeout, name=None):
"""
Create a new LogWatcher object.
This object can be used to search log files for matching patterns
during this test's run.
Arguments:
patterns -- A list of regular expressions to match against the log
timeout -- Default number of seconds to watch a log file at a time;
this can be overridden by the timeout= parameter to
self.look on an as-needed basis
name -- A unique name to use when logging about this watch
"""
if not name:
name = self.name
return LogWatcher(self._env["LogFileName"], patterns,
self._env["nodes"], self._env["log_kind"], name,
timeout)
def local_badnews(self, prefix, watch, local_ignore=None):
"""
Search through log files for messages.
Arguments:
prefix -- The string to look for at the beginning of lines,
or "LocalBadNews:" if None.
watch -- The LogWatcher object to use for searching.
local_ignore -- A list of regexes that, if found in a line, will
cause that line to be ignored.
Return the number of matches found.
"""
errcount = 0
if not prefix:
prefix = "LocalBadNews:"
ignorelist = [" CTS: ", prefix]
if local_ignore:
ignorelist += local_ignore
while errcount < 100:
match = watch.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._logger.log(f"{prefix} {match}")
errcount += 1
else:
break
else:
self._logger.log("Too many errors!")
watch.end()
return errcount
def is_applicable(self):
"""
Return True if this test is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
if self.is_loop and not self._env["loop-tests"]:
return False
if self.is_unsafe and not self._env["unsafe-tests"]:
return False
if self.is_experimental and not self._env["experimental-tests"]:
return False
if self._env["benchmark"] and not self.benchmark:
return False
return True
@property
def errors_to_ignore(self):
"""Return a list of errors which should be ignored."""
return []