diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index 26187db71e..c5521aaded 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1029 +1,1029 @@
""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
""" The base class for various kinds of auditors. Specific audit implementations
should be built on top of this one. Audits can do all kinds of checks on the
system. The basic interface for callers is the `__call__` method, which
returns True if the audit passes and False if it fails.
"""
def __init__(self, cm):
""" Create a new ClusterAudit instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
raise NotImplementedError
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
""" Log a message """
self._cm.log("audit: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("audit: %s" % args)
class LogAudit(ClusterAudit):
""" Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that
we can read back that test message using logging tools.
"""
def __init__(self, cm):
""" Create a new LogAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
""" Restart logging on the given nodes, or all if none are given """
if not nodes:
nodes = self._cm.Env["nodes"]
self._cm.debug("Restarting logging on: %r" % nodes)
for node in nodes:
if self._cm.Env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"])
if rc != 0:
self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node))
def _create_watcher(self, patterns, kind):
""" Create a new LogWatcher instance for the given patterns """
watch = LogWatcher(self._cm.Env["LogFileName"], patterns,
self._cm.Env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
""" Perform the log audit """
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self._cm.Env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
watch_pref = self._cm.Env["LogWatcher"]
if watch_pref == LogKind.ANY:
kinds = [ LogKind.FILE ]
if self._cm.Env["have_systemd"]:
kinds += [ LogKind.JOURNAL ]
kinds += [ LogKind.REMOTE_FILE ]
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log("Logging test message with identifier %s" % suffix)
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.Env["nodes"]:
cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in list(watch.keys()):
w = watch[k]
if watch_pref == LogKind.ANY:
self._cm.log("Checking for test message in %s logs" % k)
w.look_for_all(silent=True)
if w.unmatched:
for regex in w.unmatched:
self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
else:
if watch_pref == LogKind.ANY:
self._cm.log("Found test message in %s logs" % k)
self._cm.Env["LogWatcher"] = k
return 1
return False
def __call__(self):
max_attempts = 3
attempt = 0
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60*attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
return False
return True
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
if self._cm.Env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
""" Audit disk usage on cluster nodes to verify that there is enough free
space left on whichever mounted file system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
""" Create a new DiskAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
result = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
for node in self._cm.Env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
% (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
% (node, used_percent, remaining_mb))
result = False
if not should_continue(self._cm.Env):
raise ValueError("Disk full on %s" % node)
elif remaining_mb < 100 or used_percent > 90:
self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class FileAudit(ClusterAudit):
""" Audit the filesystem looking for various failure conditions:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
""" Create a new FileAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def __call__(self):
result = True
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
for node in self._cm.Env["nodes"]:
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
- if self._cm.ShouldBeStatus.get(node) == "down":
+ if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = False
clean = True
self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug("ps[%s]: %s" % (node, line))
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug("Skipping %s" % node)
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class AuditResource:
""" A base class for storing information about a cluster resource """
def __init__(self, cm, line):
""" Create a new AuditResource instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
""" Is this resource unique? """
return self.flags & 0x20
@property
def orphan(self):
""" Is this resource an orphan? """
return self.flags & 0x01
@property
def managed(self):
""" Is this resource managed by the cluster? """
return self.flags & 0x02
class AuditConstraint:
""" A base class for storing information about a cluster constraint """
def __init__(self, cm, line):
""" Create a new AuditConstraint instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
""" Audit primitive resources to verify a variety of conditions, including that
they are active and managed only when expected; they are active on the
expected clusted node; and that they are not orphaned.
"""
def __init__(self, cm):
""" Create a new PrimitiveAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
""" Perform the audit of a single resource """
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %r" % (resource.id, active))
elif resource.needs_quorum == 1:
self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
rc = False
elif not resource.managed:
self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
rc = False
elif resource.orphan:
self.debug("Resource %s is an inactive orphan" % resource.id)
elif not self._inactive_nodes:
self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
rc = False
elif self._cm.Env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
return rc
def _setup(self):
""" Verify cluster nodes are active, and collect resource and colocation
information used for performing the audit.
"""
for node in self._cm.Env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.Env["nodes"]:
- if self._target is None and self._cm.ShouldBeStatus[node] == "up":
+ if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name)
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log("Unknown entry: %s" % line)
return True
def __call__(self):
result = True
if not self._setup():
return result
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
result = False
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
""" Audit group resources to verify that each of its child primitive
resources is active on the expected cluster node.
"""
def __init__(self, cm):
""" Create a new GroupAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
result = True
if not self._setup():
return result
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
result = False
self._cm.log("Child %s of %s is active more than once: %r"
% (child.id, group.id, nodes))
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
result = False
self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
% (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return result
class CloneAudit(PrimitiveAudit):
""" Audit clone resources. NOTE: Currently, this class does not perform
any actual audit functions.
"""
def __init__(self, cm):
""" Create a new CloneAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
result = True
if not self._setup():
return result
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return result
class ColocationAudit(PrimitiveAudit):
""" Audit cluster resources to verify that those that should be colocated
with each other actually are.
"""
def __init__(self, cm):
""" Create a new ColocationAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
""" Return a list of cluster nodes where a given resource is running """
(rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
result = True
if not self._setup():
return result
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if not node in target:
result = False
self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
% (coloc.id, coloc.rsc, node, target))
else:
self.debug("Colocation audit (%s): %s running on %s (in %r)"
% (coloc.id, coloc.rsc, node, target))
return result
class ControllerStateAudit(ClusterAudit):
""" Audit cluster nodes to verify that those we expect to be active are
active, and those that are expected to be inactive are inactive.
"""
def __init__(self, cm):
""" Create a new ControllerStateAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
result = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.Env["nodes"]:
- should_be = self._cm.ShouldBeStatus[node]
+ should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
result = False
self._cm.log("Cluster is not stable: %d (of %d): %r"
% (len(unstable_list), self._cm.upcount(), unstable_list))
if up_are_down > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be up were down."
% (up_are_down, len(self._cm.Env["nodes"])))
if down_are_up > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be down were up."
% (down_are_up, len(self._cm.Env["nodes"])))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
""" Audit the CIB by verifying that it is identical across cluster nodes """
def __init__(self, cm):
""" Create a new CIBAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return result
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
if self._audit_cib_contents(partition) == 0:
result = False
return result
def _audit_cib_contents(self, hostlist):
""" Perform the CIB audit on the given hosts """
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node)
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node0)
passed = False
else:
(rc, result) = self._cm.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
return passed
def _store_remote_cib(self, node, target):
""" Store a copy of the given node's CIB on the given target node. If
no target is given, store the CIB on the given node.
"""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
""" Audit each partition in a cluster to verify a variety of conditions:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
""" Create a new PartitionAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return result
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
result = False
for partition in ccm_partitions:
self._cm.log("\t %s" % partition)
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
result = False
return result
def _trim_string(self, avalue):
""" Remove the last character from a multi-character string """
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
""" Remove the last character from a multi-character string and convert
the result to an int.
"""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
""" Perform the audit of a single partition """
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % partition)
for node in node_list:
- if self._cm.ShouldBeStatus[node] != "up":
+ if self._cm.expected_status[node] != "up":
self._cm.log("Warn: Node %s appeared out of nowhere" % node)
- self._cm.ShouldBeStatus[node] = "up"
+ self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
- self._cm.ShouldBeStatus[node] = "down"
+ self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log("Lowest epoch not determined in %s" % partition)
passed = False
for node in node_list:
- if self._cm.ShouldBeStatus[node] != "up":
+ if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self._node_epoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self._cm.log("DC %s is not the oldest node (%d vs. %d)"
% (node, self._node_epoch[node], lowest_epoch))
passed = False
if not dc_found:
self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
% (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self._cm.log("%d DCs (%s) found in cluster partition: %s"
% (len(dc_found), str(dc_found), str(node_list)))
passed = False
if not passed:
for node in node_list:
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
self._cm.log("epoch %s : %s"
% (self._node_epoch[node], self._node_state[node]))
return passed
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
""" Return a list of instances of applicable audits that can be performed
for the given ClusterManager.
"""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py
index a1fa1df5b2..b7ed1eb666 100644
--- a/python/pacemaker/_cts/clustermanager.py
+++ b/python/pacemaker/_cts/clustermanager.py
@@ -1,806 +1,806 @@
""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS)
"""
__all__ = ["ClusterManager"]
__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
Certain portions by Huang Zhen are copyright 2004
International Business Machines. The version control history for this file
may have further details."""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import time
from collections import UserDict
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.CTS import NodeStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.cib import ConfigFactory
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogWatcher
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
# ClusterManager has a lot of methods.
# pylint: disable=too-many-public-methods
class ClusterManager(UserDict):
'''The Cluster Manager class.
This is an subclass of the Python dictionary class.
(this is because it contains lots of {name,value} pairs,
not because it's behavior is that terribly similar to a
dictionary in other ways.)
This is an abstract class which class implements high-level
operations on the cluster and/or its cluster managers.
Actual cluster managers classes are subclassed from this type.
One of the things we do is track the state we think every node should
be in.
'''
def _final_conditions(self):
for key in list(self.keys()):
if self[key] is None:
raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key)
def __init__(self):
# Eventually, ClusterManager should not be a UserDict subclass. Until
# that point...
# pylint: disable=super-init-not-called
self.Env = EnvFactory().getInstance()
self.templates = PatternSelector(self.Env["Name"])
self.logger = LogFactory()
self.data = {}
self.name = self.Env["Name"]
self.rsh = RemoteFactory().getInstance()
- self.ShouldBeStatus={}
+ self.expected_status = {}
# pylint: disable=invalid-name
self.ns = NodeStatus(self.Env)
self.OurNode = os.uname()[1].lower()
self.__instance_errors_to_ignore = []
self.cib_installed = False
self._final_conditions()
self.CIBsync = {}
self.CibFactory = ConfigFactory(self)
self.cib = self.CibFactory.create_config(self.Env["Schema"])
def __getitem__(self, key):
if key == "Name":
return self.name
print("FIXME: Getting %s from %r" % (key, self))
if key in self.data:
return self.data[key]
return self.templates.get_patterns(key)
def __setitem__(self, key, value):
print("FIXME: Setting %s=%s on %r" % (key, value, self))
self.data[key] = value
def key_for_node(self, node):
return node
def clear_instance_errors_to_ignore(self):
""" Reset instance-specific errors to ignore on each iteration """
self.__instance_errors_to_ignore = []
@property
def instance_errors_to_ignore(self):
""" Return a list of known errors that should be ignored for a specific
test instance
"""
return self.__instance_errors_to_ignore
@property
def errors_to_ignore(self):
""" Return a list of known error messages that should be ignored """
return self.templates.get_patterns("BadNewsIgnore")
def log(self, args):
self.logger.log(args)
def debug(self, args):
self.logger.debug(args)
def upcount(self):
'''How many nodes are up?'''
count = 0
for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] == "up":
+ if self.expected_status[node] == "up":
count = count + 1
return count
def install_support(self, command="install"):
for node in self.Env["nodes"]:
self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command))
def prepare_fencing_watcher(self):
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
if self.has_quorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonithPats = []
for peer in self.Env["nodes"]:
- if self.ShouldBeStatus[peer] == "up":
+ if self.expected_status[peer] == "up":
continue
stonithPats.extend([ self.templates["Pat:Fencing_ok"] % peer,
self.templates["Pat:Fencing_start"] % peer ])
stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0)
stonith.set_watch()
return stonith
def fencing_cleanup(self, node, stonith):
peer_list = []
peer_state = {}
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.has_quorum(None)
if not q and len(self.Env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
self.debug("Quorum: %s Len: %d" % (q, len(self.Env["nodes"])))
return peer_list
for n in self.Env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug("looking for: %r" % stonith.regexes)
shot = stonith.look(0)
while shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
# Extract node name
for n in self.Env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self.logger.log("ERROR: Unknown stonith match: %r" % shot)
elif not peer in peer_list:
self.debug("Found peer: %s" % peer)
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
if self.Env["at-boot"]:
- self.ShouldBeStatus[peer] = "up"
+ self.expected_status[peer] = "up"
else:
- self.ShouldBeStatus[peer] = "down"
+ self.expected_status[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while stonith.regexes and shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
self.ns.wait_for_node(peer, self.Env["DeadTime"])
# Poll until it comes up
if self.Env["at-boot"]:
if not self.stat_cm(peer):
time.sleep(self.Env["StartTime"])
if not self.stat_cm(peer):
self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
return None
return peer_list
def start_cm(self, node, verbose=False):
""" Start up the cluster manager on a given node """
if verbose:
self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
else:
self.debug("Starting %s on node %s" % (self.templates["Name"], node))
- if not node in self.ShouldBeStatus:
- self.ShouldBeStatus[node] = "down"
+ if not node in self.expected_status:
+ self.expected_status[node] = "down"
- if self.ShouldBeStatus[node] != "down":
+ if self.expected_status[node] != "down":
return True
# Technically we should always be able to notice ourselves starting
patterns = [ self.templates["Pat:Local_started"] % node ]
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(
self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10)
self.install_config(node)
- self.ShouldBeStatus[node] = "any"
+ self.expected_status[node] = "any"
if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]):
self.logger.log ("%s was already started" % node)
return True
stonith = self.prepare_fencing_watcher()
watch.set_watch()
(rc, _) = self.rsh(node, self.templates["StartCmd"])
if rc != 0:
self.logger.log ("Warn: Start command failed on node %s" % node)
self.fencing_cleanup(node, stonith)
return False
- self.ShouldBeStatus[node] = "up"
+ self.expected_status[node] = "up"
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % regex)
if watch_result and self.cluster_stable(self.Env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
self.logger.log ("Warn: Start failed for node %s" % node)
return False
def start_cm_async(self, node, verbose=False):
""" Start up the cluster manager on a given node without blocking """
if verbose:
self.logger.log("Starting %s on node %s" % (self["Name"], node))
else:
self.debug("Starting %s on node %s" % (self["Name"], node))
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=False)
- self.ShouldBeStatus[node] = "up"
+ self.expected_status[node] = "up"
def stop_cm(self, node, verbose=False, force=False):
""" Stop the cluster manager on a given node """
if verbose:
self.logger.log("Stopping %s on node %s" % (self["Name"], node))
else:
self.debug("Stopping %s on node %s" % (self["Name"], node))
- if self.ShouldBeStatus[node] != "up" and not force:
+ if self.expected_status[node] != "up" and not force:
return True
(rc, _) = self.rsh(node, self.templates["StopCmd"])
if rc == 0:
# Make sure we can continue even if corosync leaks
- self.ShouldBeStatus[node] = "down"
+ self.expected_status[node] = "down"
self.cluster_stable(self.Env["DeadTime"])
return True
self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
return False
def stop_cm_async(self, node):
""" Stop the cluster manager on a given node without blocking """
self.debug("Stopping %s on node %s" % (self["Name"], node))
self.rsh(node, self.templates["StopCmd"], synchronous=False)
- self.ShouldBeStatus[node] = "down"
+ self.expected_status[node] = "down"
def startall(self, nodelist=None, verbose=False, quick=False):
""" Start the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
- if self.ShouldBeStatus[node] == "down":
+ if self.expected_status[node] == "down":
self.ns.wait_for_all_nodes(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
return self.start_cm(nodelist[0], verbose=verbose)
# Approximation of SimulStartList for --boot
watchpats = [ self.templates["Pat:DC_IDLE"] ]
for node in nodelist:
watchpats.extend([ self.templates["Pat:InfraUp"] % node,
self.templates["Pat:PacemakerUp"] % node,
self.templates["Pat:Local_started"] % node,
self.templates["Pat:They_up"] % (nodelist[0], node) ])
# Start all the nodes - at about the same time...
watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10)
watch.set_watch()
if not self.start_cm(nodelist[0], verbose=verbose):
return False
for node in nodelist:
self.start_cm_async(node, verbose=verbose)
watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % regex)
if not self.cluster_stable():
self.logger.log("Cluster did not stabilize")
return False
return True
def stopall(self, nodelist=None, verbose=False, force=False):
""" Stop the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
ret = True
if not nodelist:
nodelist = self.Env["nodes"]
for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] == "up" or force:
+ if self.expected_status[node] == "up" or force:
if not self.stop_cm(node, verbose=verbose, force=force):
ret = False
return ret
def statall(self, nodelist=None):
'''Return the status of the cluster managers in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
result = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.stat_cm(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
'''isolate the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node == target:
continue
(rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
if rc != 0:
self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
return False
self.debug("Communication cut between %s and %s" % (target, node))
return True
def unisolate_node(self, target, nodes=None):
'''fix the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node == target:
continue
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False)
self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False)
self.debug("Communication restored between %s and %s" % (target, node))
def oprofile_start(self, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofile_start(n)
elif node in self.Env["oprofile"]:
self.debug("Enabling oprofile on %s" % node)
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
def oprofile_save(self, test, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofile_save(test, n)
elif node in self.Env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, "opcontrol --save=cts.%d" % test)
# Read back with: opreport -l session:cts.0 image:/c*
self.oprofile_stop(node)
self.oprofile_start(node)
def oprofile_stop(self, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofile_stop(n)
elif node in self.Env["oprofile"]:
self.debug("Stopping oprofile on %s" % node)
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
def install_config(self, node):
if not self.ns.wait_for_node(node):
self.log("Node %s is not up." % node)
return
if node in self.CIBsync or not self.Env["ClobberCIB"]:
return
self.CIBsync[node] = True
self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
# Only install the CIB on the first node, all the other ones will pick it up from there
if self.cib_installed:
return
self.cib_installed = True
if self.Env["CIBfilename"] is None:
self.log("Installing Generated CIB on node %s" % node)
self.cib.install(node)
else:
self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node))
rc = self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
if rc != 0:
raise ValueError("Can not scp file to %s %d" % (node, rc))
self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR))
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
self.partitions_expected = 1
for node in self.Env["nodes"]:
- self.ShouldBeStatus[node] = ""
+ self.expected_status[node] = ""
if self.Env["experimental-tests"]:
self.unisolate_node(node)
self.stat_cm(node)
def test_node_cm(self, node):
'''Report the status of the cluster manager on a given node'''
watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)",
self.templates["Pat:NonDC_started"] % node,
self.templates["Pat:DC_started"] % node ]
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle")
idle_watch.set_watch()
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if not out:
out = ""
else:
out = out[0].strip()
self.debug("Node %s status: '%s'" % (node, out))
if out.find('ok') < 0:
- if self.ShouldBeStatus[node] == "up":
+ if self.expected_status[node] == "up":
self.log(
"Node status for %s is %s but we think it should be %s"
- % (node, "down", self.ShouldBeStatus[node]))
- self.ShouldBeStatus[node] = "down"
+ % (node, "down", self.expected_status[node]))
+ self.expected_status[node] = "down"
return 0
- if self.ShouldBeStatus[node] == "down":
+ if self.expected_status[node] == "down":
self.log(
"Node status for %s is %s but we think it should be %s: %s"
- % (node, "up", self.ShouldBeStatus[node], out))
+ % (node, "up", self.expected_status[node], out))
- self.ShouldBeStatus[node] = "up"
+ self.expected_status[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" % (node, out))
return 1
# Up and stable
return 2
def stat_cm(self, node):
""" Report the status of the cluster manager on a given node """
return self.test_node_cm(node) > 0
# Being up and being stable is not the same question...
def node_stable(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_cm(node) == 2:
return True
self.log("Warn: Node %s not stable" % node)
return False
def partition_stable(self, nodes, timeout=None):
watchpats = [ "Current ping state: S_IDLE",
self.templates["Pat:DC_IDLE"] ]
self.debug("Waiting for cluster stability...")
if timeout is None:
timeout = self.Env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return True
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout)
idle_watch.set_watch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return True
ret = idle_watch.look()
self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout))
return False
def cluster_stable(self, timeout=None, double_check=False):
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
if not double_check:
return True
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
return True
def is_node_dc(self, node, status_line=None):
if not status_line:
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if out:
status_line = out[0].strip()
if not status_line:
return False
if status_line.find('S_IDLE') != -1:
return True
if status_line.find('S_INTEGRATION') != -1:
return True
if status_line.find('S_FINALIZE_JOIN') != -1:
return True
if status_line.find('S_POLICY_ENGINE') != -1:
return True
if status_line.find('S_TRANSITION_ENGINE') != -1:
return True
return False
def active_resources(self, node):
(_, output) = self.rsh(node, "crm_resource -c", verbose=1)
resources = []
for line in output:
if not re.search("^Resource", line):
continue
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
def resource_location(self, rid):
ResourceNodes = []
for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] != "up":
+ if self.expected_status[node] != "up":
continue
cmd = self.templates["RscRunning"] % rid
(rc, lines) = self.rsh(node, cmd)
if rc == 127:
self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
for line in lines:
self.log("Output: %s " % line)
elif rc == 0:
ResourceNodes.append(node)
return ResourceNodes
def find_partitions(self):
ccm_partitions = []
for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] != "up":
+ if self.expected_status[node] != "up":
self.debug("Node %s is down... skipping" % node)
continue
(_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
if not out:
self.log("no partition details for %s" % node)
continue
partition = out[0].strip()
if len(partition) <= 2:
self.log("bad partition details for %s" % node)
continue
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" % (node, partition))
ccm_partitions.append(partition)
else:
self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
self.debug("Found partitions: %r" % ccm_partitions)
return ccm_partitions
def has_quorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.Env["nodes"]
for node in node_list:
- if self.ShouldBeStatus[node] != "up":
+ if self.expected_status[node] != "up":
continue
(_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
quorum = quorum[0].strip()
if quorum.find("1") != -1:
return True
if quorum.find("0") != -1:
return False
self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum))
return False
@property
def components(self):
raise NotImplementedError
def standby_status(self, node):
(_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
if not out:
return "off"
out = out[0].strip()
self.debug("Standby result: %s" % out)
return out
# status == "on" : Enter Standby mode
# status == "off": Enter Active mode
def set_standby_mode(self, node, status):
current_status = self.standby_status(node)
if current_status == status:
return True
cmd = self.templates["StandbyCmd"] % (node, status)
(rc, _) = self.rsh(node, cmd)
return rc == 0
def add_dummy_rsc(self, node, rid):
rsc_xml = """ '
'""" % (rid, rid)
constraint_xml = """ '
'
""" % (rid, node, node, rid)
self.rsh(node, self.templates['CibAddXml'] % rsc_xml)
self.rsh(node, self.templates['CibAddXml'] % constraint_xml)
def remove_dummy_rsc(self, node, rid):
constraint = "\"//rsc_location[@rsc='%s']\"" % rid
rsc = "\"//primitive[@id='%s']\"" % rid
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py
index aa323b3b9b..fba883dc7a 100644
--- a/python/pacemaker/_cts/tests/componentfail.py
+++ b/python/pacemaker/_cts/tests/componentfail.py
@@ -1,159 +1,159 @@
""" Kill a pacemaker daemon and test how the cluster recovers """
__all__ = ["ComponentFail"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class ComponentFail(CTSTest):
""" A concrete test that kills a random pacemaker daemon and waits for the
cluster to recover
"""
def __init__(self, cm):
""" Create a new ComponentFail instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.is_unsafe = True
self.name = "ComponentFail"
self._complist = cm.components
self._okerrpatterns = []
self._patterns = []
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
self._patterns = []
self._okerrpatterns = []
# start all nodes
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
if not self._cm.cluster_stable(self._env["StableTime"]):
return self.failure("Setup failed - unstable")
node_is_dc = self._cm.is_node_dc(node, None)
# select a component to kill
chosen = self._env.random_gen.choice(self._complist)
while chosen.dc_only and not node_is_dc:
chosen = self._env.random_gen.choice(self._complist)
self.debug("...component %s (dc=%s)" % (chosen.name, node_is_dc))
self.incr(chosen.name)
if chosen.name != "corosync":
self._patterns.extend([ self.templates["Pat:ChildKilled"] % (node, chosen.name),
self.templates["Pat:ChildRespawn"] % (node, chosen.name) ])
self._patterns.extend(chosen.pats)
if node_is_dc:
self._patterns.extend(chosen.dc_pats)
# @TODO this should be a flag in the Component
if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]:
# Ignore actions for fence devices if fencer will respawn
# (their registration will be lost, and probes will fail)
self._okerrpatterns = [ self.templates["Pat:Fencing_active"] ]
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self._okerrpatterns.extend([ self.templates["Pat:Fencing_recover"] % r.id,
self.templates["Pat:Fencing_probe"] % r.id ])
# supply a copy so self.patterns doesn't end up empty
tmp_pats = self._patterns.copy()
self._patterns.extend(chosen.badnews_ignore)
# Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
stonith_pats = [ self.templates["Pat:Fencing_ok"] % node ]
stonith = self.create_watch(stonith_pats, 0)
stonith.set_watch()
# set the watch for stable
watch = self.create_watch(
tmp_pats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
# kill the component
chosen.kill(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for any fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
self._cm.cluster_stable(self._env["StartTime"])
self.debug("Checking if %s was shot" % node)
shot = stonith.look(60)
if shot:
self.debug("Found: %r" % shot)
self._okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
if not self._env["at-boot"]:
- self._cm.ShouldBeStatus[node] = "down"
+ self._cm.expected_status[node] = "down"
# If fencing occurred, chances are many (if not all) the expected logs
# will not be sent - or will be lost when the node reboots
return self.success()
# check for logs indicating a graceful recovery
matched = watch.look_for_all(allow_multiple_matches=True)
if watch.unmatched:
self._logger.log("Patterns not found: %r" % watch.unmatched)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected %s patterns" % chosen.name)
if not is_stable:
return self.failure("Cluster did not become stable after killing %s" % chosen.name)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Note that okerrpatterns refers to the last time we ran this test
# The good news is that this works fine for us...
self._okerrpatterns.extend(self._patterns)
return self._okerrpatterns
diff --git a/python/pacemaker/_cts/tests/fliptest.py b/python/pacemaker/_cts/tests/fliptest.py
index d3f1571419..5e779367e4 100644
--- a/python/pacemaker/_cts/tests/fliptest.py
+++ b/python/pacemaker/_cts/tests/fliptest.py
@@ -1,61 +1,61 @@
""" Stop running nodes, and start stopped nodes """
__all__ = ["FlipTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import time
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.tests.stoptest import StopTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class FlipTest(CTSTest):
""" A concrete test that stops running nodes and starts stopped nodes """
def __init__(self, cm):
""" Create a new FlipTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "Flip"
self._start = StartTest(cm)
self._stop = StopTest(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
self.incr("stopped")
ret = self._stop(node)
kind = "up->down"
# Give the cluster time to recognize it's gone...
time.sleep(self._env["StableTime"])
- elif self._cm.ShouldBeStatus[node] == "down":
+ elif self._cm.expected_status[node] == "down":
self.incr("started")
ret = self._start(node)
kind = "down->up"
else:
return self.skipped()
self.incr(kind)
if ret:
return self.success()
return self.failure("%s failure" % kind)
diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py
index 94b1abdcbf..abbd82599e 100644
--- a/python/pacemaker/_cts/tests/nearquorumpointtest.py
+++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py
@@ -1,125 +1,125 @@
""" Randomly start and stop nodes to bring the cluster close to the quorum point """
__all__ = ["NearQuorumPointTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class NearQuorumPointTest(CTSTest):
""" A concrete test that randomly starts and stops nodes to bring the
cluster close to the quorum point
"""
def __init__(self, cm):
""" Create a new NearQuorumPointTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "NearQuorumPoint"
def __call__(self, dummy):
""" Perform this test """
self.incr("calls")
startset = []
stopset = []
stonith = self._cm.prepare_fencing_watcher()
#decide what to do with each node
for node in self._env["nodes"]:
action = self._env.random_gen.choice(["start", "stop"])
if action == "start" :
startset.append(node)
elif action == "stop" :
stopset.append(node)
self.debug("start nodes:%r" % startset)
self.debug("stop nodes:%r" % stopset)
#add search patterns
watchpats = [ ]
for node in stopset:
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
watchpats.append(self.templates["Pat:We_stopped"] % node)
for node in startset:
- if self._cm.ShouldBeStatus[node] == "down":
+ if self._cm.expected_status[node] == "down":
watchpats.append(self.templates["Pat:Local_started"] % node)
else:
for stopping in stopset:
- if self._cm.ShouldBeStatus[stopping] == "up":
+ if self._cm.expected_status[stopping] == "up":
watchpats.append(self.templates["Pat:They_stopped"] % (node, self._cm.key_for_node(stopping)))
if not watchpats:
return self.skipped()
if startset:
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, self._env["DeadTime"] + 10)
watch.set_watch()
#begin actions
for node in stopset:
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
self._cm.stop_cm_async(node)
for node in startset:
- if self._cm.ShouldBeStatus[node] == "down":
+ if self._cm.expected_status[node] == "down":
self._cm.start_cm_async(node)
#get the result
if watch.look_for_all():
self._cm.cluster_stable()
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
return self.success()
self._logger.log("Warn: Patterns not found: %r" % watch.unmatched)
#get the "bad" nodes
upnodes = []
for node in stopset:
if self._cm.stat_cm(node):
upnodes.append(node)
downnodes = []
for node in startset:
if not self._cm.stat_cm(node):
downnodes.append(node)
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
if not upnodes and not downnodes:
self._cm.cluster_stable()
# Make sure they're completely down with no residule
for node in stopset:
self._rsh(node, self.templates["StopCmd"])
return self.success()
if upnodes:
self._logger.log("Warn: Unstoppable nodes: %r" % upnodes)
if downnodes:
self._logger.log("Warn: Unstartable nodes: %r" % downnodes)
return self.failure()
diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py
index 05f07c9014..16cf67472c 100644
--- a/python/pacemaker/_cts/tests/simulstartlite.py
+++ b/python/pacemaker/_cts/tests/simulstartlite.py
@@ -1,131 +1,131 @@
""" Simultaneously start stopped nodes """
__all__ = ["SimulStartLite"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class SimulStartLite(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class starts any stopped nodes more or less
simultaneously.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new SimulStartLite instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self,cm)
self.name = "SimulStartLite"
def __call__(self, dummy):
""" Start all stopped nodes more or less simultaneously, returning
whether this succeeded or not.
"""
self.incr("calls")
self.debug("Setup: %s" % self.name)
# We ignore the "node" parameter...
node_list = []
for node in self._env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "down":
+ if self._cm.expected_status[node] == "down":
self.incr("WasStopped")
node_list.append(node)
self.set_timer()
while len(node_list) > 0:
# Repeat until all nodes come up
uppat = self.templates["Pat:NonDC_started"]
if self._cm.upcount() == 0:
uppat = self.templates["Pat:Local_started"]
watchpats = [ self.templates["Pat:DC_IDLE"] ]
for node in node_list:
watchpats.extend([uppat % node,
self.templates["Pat:InfraUp"] % node,
self.templates["Pat:PacemakerUp"] % node])
# Start all the nodes - at about the same time...
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
stonith = self._cm.prepare_fencing_watcher()
for node in node_list:
self._cm.start_cm_async(node)
watch.look_for_all()
node_list = self._cm.fencing_cleanup(self.name, stonith)
if node_list is None:
return self.failure("Cluster did not stabilize")
# Remove node_list messages from watch.unmatched
for node in node_list:
self._logger.debug("Dealing with stonith operations for %s" % node_list)
if watch.unmatched:
try:
watch.unmatched.remove(uppat % node)
except ValueError:
self.debug("Already matched: %s" % (uppat % node))
try:
watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
except ValueError:
self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
try:
watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
except ValueError:
self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
if watch.unmatched:
for regex in watch.unmatched:
self._logger.log ("Warn: Startup pattern not found: %s" % regex)
if not self._cm.cluster_stable():
return self.failure("Cluster did not stabilize")
did_fail = False
unstable = []
for node in self._env["nodes"]:
if not self._cm.stat_cm(node):
did_fail = True
unstable.append(node)
if did_fail:
return self.failure("Unstarted nodes exist: %s" % unstable)
unstable = []
for node in self._env["nodes"]:
if not self._cm.node_stable(node):
did_fail = True
unstable.append(node)
if did_fail:
return self.failure("Unstable cluster nodes exist: %s" % unstable)
return self.success()
def is_applicable(self):
""" SimulStartLite is a setup test and never applicable """
return False
diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py
index 2f51ac655a..9dbc1f2d2d 100644
--- a/python/pacemaker/_cts/tests/simulstoplite.py
+++ b/python/pacemaker/_cts/tests/simulstoplite.py
@@ -1,91 +1,91 @@
""" Simultaneously stop running nodes """
__all__ = ["SimulStopLite"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class SimulStopLite(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class stops any running nodes more or less
simultaneously. It can be used both to set up a test or to clean up
a test.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new SimulStopLite instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self,cm)
self.name = "SimulStopLite"
def __call__(self, dummy):
""" Stop all running nodes more or less simultaneously, returning
whether this succeeded or not.
"""
self.incr("calls")
self.debug("Setup: %s" % self.name)
# We ignore the "node" parameter...
watchpats = []
for node in self._env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
self.incr("WasStarted")
watchpats.append(self.templates["Pat:We_stopped"] % node)
if len(watchpats) == 0:
return self.success()
# Stop all the nodes - at about the same time...
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self.set_timer()
for node in self._env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "up":
+ if self._cm.expected_status[node] == "up":
self._cm.stop_cm_async(node)
if watch.look_for_all():
# Make sure they're completely down with no residule
for node in self._env["nodes"]:
self._rsh(node, self.templates["StopCmd"])
return self.success()
did_fail = False
up_nodes = []
for node in self._env["nodes"]:
if self._cm.stat_cm(node):
did_fail = True
up_nodes.append(node)
if did_fail:
return self.failure("Active nodes exist: %s" % up_nodes)
self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched)
return self.failure("Missing log message: %s " % watch.unmatched)
def is_applicable(self):
""" SimulStopLite is a setup test and never applicable """
return False
diff --git a/python/pacemaker/_cts/tests/starttest.py b/python/pacemaker/_cts/tests/starttest.py
index 3b0b18f7a3..53a347a392 100644
--- a/python/pacemaker/_cts/tests/starttest.py
+++ b/python/pacemaker/_cts/tests/starttest.py
@@ -1,54 +1,54 @@
""" Start the cluster manager on a given node """
__all__ = ["StartTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StartTest(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class starts the cluster manager on a given
node.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new StartTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self,cm)
self.name = "Start"
def __call__(self, node):
""" Start the given node, returning whether this succeeded or not """
self.incr("calls")
if self._cm.upcount() == 0:
self.incr("us")
else:
self.incr("them")
- if self._cm.ShouldBeStatus[node] != "down":
+ if self._cm.expected_status[node] != "down":
return self.skipped()
if self._cm.start_cm(node):
return self.success()
return self.failure("Startup %s on node %s failed"
% (self._env["Name"], node))
diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py
index da3848ad0c..e2fa341227 100644
--- a/python/pacemaker/_cts/tests/stonithdtest.py
+++ b/python/pacemaker/_cts/tests/stonithdtest.py
@@ -1,139 +1,139 @@
""" Fence a running node and wait for it to restart """
__all__ = ["StonithdTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StonithdTest(CTSTest):
""" A concrete test that fences a running node and waits for it to restart """
def __init__(self, cm):
""" Create a new StonithdTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "Stonithd"
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
if len(self._env["nodes"]) < 2:
return self.skipped()
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
watchpats = [ self.templates["Pat:Fencing_ok"] % node,
self.templates["Pat:NodeFenced"] % node ]
if not self._env["at-boot"]:
self.debug("Expecting %s to stay down" % node)
- self._cm.ShouldBeStatus[node] = "down"
+ self._cm.expected_status[node] = "down"
else:
self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"]))
watchpats.extend([ "%s.* S_STARTING -> S_PENDING" % node,
"%s.* S_PENDING -> S_NOT_DC" % node ])
watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
origin = self._env.random_gen.choice(self._env["nodes"])
(rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
if rc == ExitStatus.TIMEOUT:
# Look for the patterns, usually this means the required
# device was running on the node to be fenced - or that
# the required devices were in the process of being loaded
# and/or moved
#
# Effectively the node committed suicide so there will be
# no confirmation, but pacemaker should be watching and
# fence the node again
self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
elif origin != node and rc != 0:
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
elif origin == node and rc != 255:
# 255 == broken pipe, ie. the node was fenced as expected
self._logger.log("Locally originated fencing returned %d" % rc)
with Timer(self._logger, self.name, "fence"):
matched = watch.look_for_all()
self.set_timer("reform")
if watch.unmatched:
self._logger.log("Patterns not found: %r" % watch.unmatched)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected patterns")
if not is_stable:
return self.failure("Cluster did not become stable")
self.log_timer("reform")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ self.templates["Pat:Fencing_start"] % ".*",
self.templates["Pat:Fencing_ok"] % ".*",
self.templates["Pat:Fencing_active"],
r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ]
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not CTSTest.is_applicable(self):
return False
# pylint gets confused because of EnvFactory here.
# pylint: disable=unsupported-membership-test
if "DoFencing" in self._env:
return self._env["DoFencing"]
return True
diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py
index 237e928c20..7c864e67cf 100644
--- a/python/pacemaker/_cts/tests/stoptest.py
+++ b/python/pacemaker/_cts/tests/stoptest.py
@@ -1,97 +1,97 @@
""" Stop the cluster manager on a given node """
__all__ = ["StopTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StopTest(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class stops the cluster manager on a given
node.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new StopTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "Stop"
def __call__(self, node):
""" Stop the given node, returning whether this succeeded or not """
self.incr("calls")
- if self._cm.ShouldBeStatus[node] != "up":
+ if self._cm.expected_status[node] != "up":
return self.skipped()
# Technically we should always be able to notice ourselves stopping
patterns = [ self.templates["Pat:We_stopped"] % node ]
# Any active node needs to notice this one left
# (note that this won't work if we have multiple partitions)
for other in self._env["nodes"]:
- if self._cm.ShouldBeStatus[other] == "up" and other != node:
+ if self._cm.expected_status[other] == "up" and other != node:
patterns.append(self.templates["Pat:They_stopped"] %(other, self._cm.key_for_node(node)))
watch = self.create_watch(patterns, self._env["DeadTime"])
watch.set_watch()
if node == self._cm.OurNode:
self.incr("us")
else:
if self._cm.upcount() <= 1:
self.incr("all")
else:
self.incr("them")
self._cm.stop_cm(node)
watch.look_for_all()
failreason = None
unmatched_str = "||"
if watch.unmatched:
(_, output) = self._rsh(node, "/bin/ps axf", verbose=1)
for line in output:
self.debug(line)
(_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
for line in output:
self.debug(line)
for regex in watch.unmatched:
self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex)
unmatched_str += "%s||" % regex
failreason = "Missing shutdown pattern"
self._cm.cluster_stable(self._env["DeadTime"])
if not watch.unmatched or self._cm.upcount() == 0:
return self.success()
if len(watch.unmatched) >= self._cm.upcount():
return self.failure("no match against (%s)" % unmatched_str)
if failreason is None:
return self.success()
return self.failure(failreason)