diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index 377a42a120..26187db71e 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1029 +1,1029 @@
""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
""" The base class for various kinds of auditors. Specific audit implementations
should be built on top of this one. Audits can do all kinds of checks on the
system. The basic interface for callers is the `__call__` method, which
returns True if the audit passes and False if it fails.
"""
def __init__(self, cm):
""" Create a new ClusterAudit instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
raise NotImplementedError
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
""" Log a message """
self._cm.log("audit: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("audit: %s" % args)
class LogAudit(ClusterAudit):
""" Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that
we can read back that test message using logging tools.
"""
def __init__(self, cm):
""" Create a new LogAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
""" Restart logging on the given nodes, or all if none are given """
if not nodes:
nodes = self._cm.Env["nodes"]
self._cm.debug("Restarting logging on: %r" % nodes)
for node in nodes:
if self._cm.Env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"])
if rc != 0:
self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node))
def _create_watcher(self, patterns, kind):
""" Create a new LogWatcher instance for the given patterns """
watch = LogWatcher(self._cm.Env["LogFileName"], patterns,
self._cm.Env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
""" Perform the log audit """
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self._cm.Env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
watch_pref = self._cm.Env["LogWatcher"]
if watch_pref == LogKind.ANY:
kinds = [ LogKind.FILE ]
if self._cm.Env["have_systemd"]:
kinds += [ LogKind.JOURNAL ]
kinds += [ LogKind.REMOTE_FILE ]
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log("Logging test message with identifier %s" % suffix)
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.Env["nodes"]:
cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in list(watch.keys()):
w = watch[k]
if watch_pref == LogKind.ANY:
self._cm.log("Checking for test message in %s logs" % k)
w.look_for_all(silent=True)
if w.unmatched:
for regex in w.unmatched:
self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
else:
if watch_pref == LogKind.ANY:
self._cm.log("Found test message in %s logs" % k)
self._cm.Env["LogWatcher"] = k
return 1
return False
def __call__(self):
max_attempts = 3
attempt = 0
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60*attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
return False
return True
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
if self._cm.Env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
""" Audit disk usage on cluster nodes to verify that there is enough free
space left on whichever mounted file system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
""" Create a new DiskAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
result = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
for node in self._cm.Env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
% (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
% (node, used_percent, remaining_mb))
result = False
if not should_continue(self._cm.Env):
raise ValueError("Disk full on %s" % node)
elif remaining_mb < 100 or used_percent > 90:
self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class FileAudit(ClusterAudit):
""" Audit the filesystem looking for various failure conditions:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
""" Create a new FileAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def __call__(self):
result = True
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
for node in self._cm.Env["nodes"]:
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
if self._cm.ShouldBeStatus.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = False
clean = True
self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug("ps[%s]: %s" % (node, line))
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug("Skipping %s" % node)
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class AuditResource:
""" A base class for storing information about a cluster resource """
def __init__(self, cm, line):
""" Create a new AuditResource instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
""" Is this resource unique? """
return self.flags & 0x20
@property
def orphan(self):
""" Is this resource an orphan? """
return self.flags & 0x01
@property
def managed(self):
""" Is this resource managed by the cluster? """
return self.flags & 0x02
class AuditConstraint:
""" A base class for storing information about a cluster constraint """
def __init__(self, cm, line):
""" Create a new AuditConstraint instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
""" Audit primitive resources to verify a variety of conditions, including that
they are active and managed only when expected; they are active on the
expected clusted node; and that they are not orphaned.
"""
def __init__(self, cm):
""" Create a new PrimitiveAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
""" Perform the audit of a single resource """
rc = True
- active = self._cm.ResourceLocation(resource.id)
+ active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %r" % (resource.id, active))
elif resource.needs_quorum == 1:
self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
rc = False
elif not resource.managed:
self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
rc = False
elif resource.orphan:
self.debug("Resource %s is an inactive orphan" % resource.id)
elif not self._inactive_nodes:
self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
rc = False
elif self._cm.Env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
return rc
def _setup(self):
""" Verify cluster nodes are active, and collect resource and colocation
information used for performing the audit.
"""
for node in self._cm.Env["nodes"]:
if self._cm.ShouldBeStatus[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.Env["nodes"]:
if self._target is None and self._cm.ShouldBeStatus[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name)
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log("Unknown entry: %s" % line)
return True
def __call__(self):
result = True
if not self._setup():
return result
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
result = False
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
""" Audit group resources to verify that each of its child primitive
resources is active on the expected cluster node.
"""
def __init__(self, cm):
""" Create a new GroupAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
result = True
if not self._setup():
return result
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
- nodes = self._cm.ResourceLocation(child.id)
+ nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
result = False
self._cm.log("Child %s of %s is active more than once: %r"
% (child.id, group.id, nodes))
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
result = False
self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
% (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return result
class CloneAudit(PrimitiveAudit):
""" Audit clone resources. NOTE: Currently, this class does not perform
any actual audit functions.
"""
def __init__(self, cm):
""" Create a new CloneAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
result = True
if not self._setup():
return result
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return result
class ColocationAudit(PrimitiveAudit):
""" Audit cluster resources to verify that those that should be colocated
with each other actually are.
"""
def __init__(self, cm):
""" Create a new ColocationAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
""" Return a list of cluster nodes where a given resource is running """
(rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
result = True
if not self._setup():
return result
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if not node in target:
result = False
self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
% (coloc.id, coloc.rsc, node, target))
else:
self.debug("Colocation audit (%s): %s running on %s (in %r)"
% (coloc.id, coloc.rsc, node, target))
return result
class ControllerStateAudit(ClusterAudit):
""" Audit cluster nodes to verify that those we expect to be active are
active, and those that are expected to be inactive are inactive.
"""
def __init__(self, cm):
""" Create a new ControllerStateAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
result = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.Env["nodes"]:
should_be = self._cm.ShouldBeStatus[node]
- rc = self._cm.test_node_CM(node)
+ rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
result = False
self._cm.log("Cluster is not stable: %d (of %d): %r"
% (len(unstable_list), self._cm.upcount(), unstable_list))
if up_are_down > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be up were down."
% (up_are_down, len(self._cm.Env["nodes"])))
if down_are_up > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be down were up."
% (down_are_up, len(self._cm.Env["nodes"])))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
""" Audit the CIB by verifying that it is identical across cluster nodes """
def __init__(self, cm):
""" Create a new CIBAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return result
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
if self._audit_cib_contents(partition) == 0:
result = False
return result
def _audit_cib_contents(self, hostlist):
""" Perform the CIB audit on the given hosts """
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node)
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node0)
passed = False
else:
(rc, result) = self._cm.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
return passed
def _store_remote_cib(self, node, target):
""" Store a copy of the given node's CIB on the given target node. If
no target is given, store the CIB on the given node.
"""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
""" Audit each partition in a cluster to verify a variety of conditions:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
""" Create a new PartitionAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return result
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
result = False
for partition in ccm_partitions:
self._cm.log("\t %s" % partition)
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
result = False
return result
def _trim_string(self, avalue):
""" Remove the last character from a multi-character string """
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
""" Remove the last character from a multi-character string and convert
the result to an int.
"""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
""" Perform the audit of a single partition """
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % partition)
for node in node_list:
if self._cm.ShouldBeStatus[node] != "up":
self._cm.log("Warn: Node %s appeared out of nowhere" % node)
self._cm.ShouldBeStatus[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
self._cm.ShouldBeStatus[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log("Lowest epoch not determined in %s" % partition)
passed = False
for node in node_list:
if self._cm.ShouldBeStatus[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self._node_epoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self._cm.log("DC %s is not the oldest node (%d vs. %d)"
% (node, self._node_epoch[node], lowest_epoch))
passed = False
if not dc_found:
self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
% (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self._cm.log("%d DCs (%s) found in cluster partition: %s"
% (len(dc_found), str(dc_found), str(node_list)))
passed = False
if not passed:
for node in node_list:
if self._cm.ShouldBeStatus[node] == "up":
self._cm.log("epoch %s : %s"
% (self._node_epoch[node], self._node_state[node]))
return passed
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
""" Return a list of instances of applicable audits that can be performed
for the given ClusterManager.
"""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py
index a74041b7cc..745470aa80 100644
--- a/python/pacemaker/_cts/clustermanager.py
+++ b/python/pacemaker/_cts/clustermanager.py
@@ -1,913 +1,913 @@
""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS)
"""
__all__ = ["ClusterManager"]
__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
Certain portions by Huang Zhen are copyright 2004
International Business Machines. The version control history for this file
may have further details."""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import time
from collections import UserDict
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.CTS import NodeStatus, Process
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.cib import ConfigFactory
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogWatcher
# Throughout this file, pylint has trouble understanding that EnvFactory
# and RemoteFactory are singleton instances that can be treated as callable
# and subscriptable objects. Various warnings are disabled because of this.
# See also a comment about self._rsh in environment.py.
# pylint: disable=unsubscriptable-object
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
# ClusterManager has a lot of methods.
# pylint: disable=too-many-public-methods
class ClusterManager(UserDict):
'''The Cluster Manager class.
This is an subclass of the Python dictionary class.
(this is because it contains lots of {name,value} pairs,
not because it's behavior is that terribly similar to a
dictionary in other ways.)
This is an abstract class which class implements high-level
operations on the cluster and/or its cluster managers.
Actual cluster managers classes are subclassed from this type.
One of the things we do is track the state we think every node should
be in.
'''
- def _finalConditions(self):
+ def _final_conditions(self):
for key in list(self.keys()):
if self[key] is None:
raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key)
def __init__(self):
# Eventually, ClusterManager should not be a UserDict subclass. Until
# that point...
# pylint: disable=super-init-not-called
self.Env = EnvFactory().getInstance()
self.templates = PatternSelector(self.Env["Name"])
self.logger = LogFactory()
self.data = {}
self.name = self.Env["Name"]
self.rsh = RemoteFactory().getInstance()
self.ShouldBeStatus={}
# pylint: disable=invalid-name
self.ns = NodeStatus(self.Env)
self.OurNode = os.uname()[1].lower()
self.__instance_errors_to_ignore = []
self.cib_installed = False
- self._finalConditions()
+ self._final_conditions()
self.CIBsync = {}
self.CibFactory = ConfigFactory(self)
self.cib = self.CibFactory.create_config(self.Env["Schema"])
def __getitem__(self, key):
if key == "Name":
return self.name
print("FIXME: Getting %s from %r" % (key, self))
if key in self.data:
return self.data[key]
return self.templates.get_patterns(key)
def __setitem__(self, key, value):
print("FIXME: Setting %s=%s on %r" % (key, value, self))
self.data[key] = value
def key_for_node(self, node):
return node
def clear_instance_errors_to_ignore(self):
""" Reset instance-specific errors to ignore on each iteration """
self.__instance_errors_to_ignore = []
@property
def instance_errors_to_ignore(self):
""" Return a list of known errors that should be ignored for a specific
test instance
"""
return self.__instance_errors_to_ignore
@property
def errors_to_ignore(self):
""" Return a list of known error messages that should be ignored """
return self.templates.get_patterns("BadNewsIgnore")
def log(self, args):
self.logger.log(args)
def debug(self, args):
self.logger.debug(args)
def upcount(self):
'''How many nodes are up?'''
count = 0
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
count = count + 1
return count
def install_support(self, command="install"):
for node in self.Env["nodes"]:
self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command))
def prepare_fencing_watcher(self):
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
if self.has_quorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonithPats = []
for peer in self.Env["nodes"]:
if self.ShouldBeStatus[peer] == "up":
continue
stonithPats.extend([ self.templates["Pat:Fencing_ok"] % peer,
self.templates["Pat:Fencing_start"] % peer ])
stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0)
stonith.set_watch()
return stonith
def fencing_cleanup(self, node, stonith):
peer_list = []
peer_state = {}
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.has_quorum(None)
if not q and len(self.Env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
self.debug("Quorum: %s Len: %d" % (q, len(self.Env["nodes"])))
return peer_list
for n in self.Env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug("looking for: %r" % stonith.regexes)
shot = stonith.look(0)
while shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
# Extract node name
for n in self.Env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self.logger.log("ERROR: Unknown stonith match: %r" % shot)
elif not peer in peer_list:
self.debug("Found peer: %s" % peer)
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
if self.Env["at-boot"]:
self.ShouldBeStatus[peer] = "up"
else:
self.ShouldBeStatus[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while stonith.regexes and shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
self.ns.wait_for_node(peer, self.Env["DeadTime"])
# Poll until it comes up
if self.Env["at-boot"]:
if not self.stat_cm(peer):
time.sleep(self.Env["StartTime"])
if not self.stat_cm(peer):
self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
return None
return peer_list
def start_cm(self, node, verbose=False):
""" Start up the cluster manager on a given node """
if verbose:
self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
else:
self.debug("Starting %s on node %s" % (self.templates["Name"], node))
if not node in self.ShouldBeStatus:
self.ShouldBeStatus[node] = "down"
if self.ShouldBeStatus[node] != "down":
return True
# Technically we should always be able to notice ourselves starting
patterns = [ self.templates["Pat:Local_started"] % node ]
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(
self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10)
self.install_config(node)
self.ShouldBeStatus[node] = "any"
if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]):
self.logger.log ("%s was already started" % node)
return True
stonith = self.prepare_fencing_watcher()
watch.set_watch()
(rc, _) = self.rsh(node, self.templates["StartCmd"])
if rc != 0:
self.logger.log ("Warn: Start command failed on node %s" % node)
self.fencing_cleanup(node, stonith)
return False
self.ShouldBeStatus[node] = "up"
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % regex)
if watch_result and self.cluster_stable(self.Env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
self.logger.log ("Warn: Start failed for node %s" % node)
return False
def start_cm_async(self, node, verbose=False):
""" Start up the cluster manager on a given node without blocking """
if verbose:
self.logger.log("Starting %s on node %s" % (self["Name"], node))
else:
self.debug("Starting %s on node %s" % (self["Name"], node))
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=False)
self.ShouldBeStatus[node] = "up"
def stop_cm(self, node, verbose=False, force=False):
""" Stop the cluster manager on a given node """
if verbose:
self.logger.log("Stopping %s on node %s" % (self["Name"], node))
else:
self.debug("Stopping %s on node %s" % (self["Name"], node))
if self.ShouldBeStatus[node] != "up" and not force:
return True
(rc, _) = self.rsh(node, self.templates["StopCmd"])
if rc == 0:
# Make sure we can continue even if corosync leaks
self.ShouldBeStatus[node] = "down"
self.cluster_stable(self.Env["DeadTime"])
return True
self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
return False
def stop_cm_async(self, node):
""" Stop the cluster manager on a given node without blocking """
self.debug("Stopping %s on node %s" % (self["Name"], node))
self.rsh(node, self.templates["StopCmd"], synchronous=False)
self.ShouldBeStatus[node] = "down"
def startall(self, nodelist=None, verbose=False, quick=False):
""" Start the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.ShouldBeStatus[node] == "down":
self.ns.wait_for_all_nodes(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
return self.start_cm(nodelist[0], verbose=verbose)
# Approximation of SimulStartList for --boot
watchpats = [ self.templates["Pat:DC_IDLE"] ]
for node in nodelist:
watchpats.extend([ self.templates["Pat:InfraUp"] % node,
self.templates["Pat:PacemakerUp"] % node,
self.templates["Pat:Local_started"] % node,
self.templates["Pat:They_up"] % (nodelist[0], node) ])
# Start all the nodes - at about the same time...
watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10)
watch.set_watch()
if not self.start_cm(nodelist[0], verbose=verbose):
return False
for node in nodelist:
self.start_cm_async(node, verbose=verbose)
watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % regex)
if not self.cluster_stable():
self.logger.log("Cluster did not stabilize")
return False
return True
def stopall(self, nodelist=None, verbose=False, force=False):
""" Stop the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
ret = True
if not nodelist:
nodelist = self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up" or force:
if not self.stop_cm(node, verbose=verbose, force=force):
ret = False
return ret
def statall(self, nodelist=None):
'''Return the status of the cluster managers in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
result = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.stat_cm(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
'''isolate the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node == target:
continue
(rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
if rc != 0:
self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
return False
self.debug("Communication cut between %s and %s" % (target, node))
return True
def unisolate_node(self, target, nodes=None):
'''fix the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node == target:
continue
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False)
self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False)
self.debug("Communication restored between %s and %s" % (target, node))
- def oprofileStart(self, node=None):
+ def oprofile_start(self, node=None):
if not node:
for n in self.Env["oprofile"]:
- self.oprofileStart(n)
+ self.oprofile_start(n)
elif node in self.Env["oprofile"]:
self.debug("Enabling oprofile on %s" % node)
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
- def oprofileSave(self, test, node=None):
+ def oprofile_save(self, test, node=None):
if not node:
for n in self.Env["oprofile"]:
- self.oprofileSave(test, n)
+ self.oprofile_save(test, n)
elif node in self.Env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, "opcontrol --save=cts.%d" % test)
# Read back with: opreport -l session:cts.0 image:/c*
- self.oprofileStop(node)
- self.oprofileStart(node)
+ self.oprofile_stop(node)
+ self.oprofile_start(node)
- def oprofileStop(self, node=None):
+ def oprofile_stop(self, node=None):
if not node:
for n in self.Env["oprofile"]:
- self.oprofileStop(n)
+ self.oprofile_stop(n)
elif node in self.Env["oprofile"]:
self.debug("Stopping oprofile on %s" % node)
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
def install_config(self, node):
if not self.ns.wait_for_node(node):
self.log("Node %s is not up." % node)
return
if node in self.CIBsync or not self.Env["ClobberCIB"]:
return
self.CIBsync[node] = True
self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
# Only install the CIB on the first node, all the other ones will pick it up from there
if self.cib_installed:
return
self.cib_installed = True
if self.Env["CIBfilename"] is None:
self.log("Installing Generated CIB on node %s" % node)
self.cib.install(node)
else:
self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node))
rc = self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
if rc != 0:
raise ValueError("Can not scp file to %s %d" % (node, rc))
self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR))
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
self.partitions_expected = 1
for node in self.Env["nodes"]:
self.ShouldBeStatus[node] = ""
if self.Env["experimental-tests"]:
self.unisolate_node(node)
self.stat_cm(node)
- def test_node_CM(self, node):
+ def test_node_cm(self, node):
'''Report the status of the cluster manager on a given node'''
watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)",
self.templates["Pat:NonDC_started"] % node,
self.templates["Pat:DC_started"] % node ]
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle")
idle_watch.set_watch()
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if not out:
out = ""
else:
out = out[0].strip()
self.debug("Node %s status: '%s'" % (node, out))
if out.find('ok') < 0:
if self.ShouldBeStatus[node] == "up":
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, "down", self.ShouldBeStatus[node]))
self.ShouldBeStatus[node] = "down"
return 0
if self.ShouldBeStatus[node] == "down":
self.log(
"Node status for %s is %s but we think it should be %s: %s"
% (node, "up", self.ShouldBeStatus[node], out))
self.ShouldBeStatus[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" % (node, out))
return 1
# Up and stable
return 2
def stat_cm(self, node):
""" Report the status of the cluster manager on a given node """
- return self.test_node_CM(node) > 0
+ return self.test_node_cm(node) > 0
# Being up and being stable is not the same question...
def node_stable(self, node):
'''Report the status of the cluster manager on a given node'''
- if self.test_node_CM(node) == 2:
+ if self.test_node_cm(node) == 2:
return True
self.log("Warn: Node %s not stable" % node)
return False
def partition_stable(self, nodes, timeout=None):
watchpats = [ "Current ping state: S_IDLE",
self.templates["Pat:DC_IDLE"] ]
self.debug("Waiting for cluster stability...")
if timeout is None:
timeout = self.Env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return True
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout)
idle_watch.set_watch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return True
ret = idle_watch.look()
self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout))
return False
def cluster_stable(self, timeout=None, double_check=False):
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
if not double_check:
return True
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
return True
def is_node_dc(self, node, status_line=None):
if not status_line:
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if out:
status_line = out[0].strip()
if not status_line:
return False
if status_line.find('S_IDLE') != -1:
return True
if status_line.find('S_INTEGRATION') != -1:
return True
if status_line.find('S_FINALIZE_JOIN') != -1:
return True
if status_line.find('S_POLICY_ENGINE') != -1:
return True
if status_line.find('S_TRANSITION_ENGINE') != -1:
return True
return False
def active_resources(self, node):
(_, output) = self.rsh(node, "crm_resource -c", verbose=1)
resources = []
for line in output:
if not re.search("^Resource", line):
continue
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
- def ResourceLocation(self, rid):
+ def resource_location(self, rid):
ResourceNodes = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] != "up":
continue
cmd = self.templates["RscRunning"] % rid
(rc, lines) = self.rsh(node, cmd)
if rc == 127:
self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
for line in lines:
self.log("Output: %s " % line)
elif rc == 0:
ResourceNodes.append(node)
return ResourceNodes
def find_partitions(self):
ccm_partitions = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] != "up":
self.debug("Node %s is down... skipping" % node)
continue
(_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
if not out:
self.log("no partition details for %s" % node)
continue
partition = out[0].strip()
if len(partition) <= 2:
self.log("bad partition details for %s" % node)
continue
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" % (node, partition))
ccm_partitions.append(partition)
else:
self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
self.debug("Found partitions: %r" % ccm_partitions)
return ccm_partitions
def has_quorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.Env["nodes"]
for node in node_list:
if self.ShouldBeStatus[node] != "up":
continue
(_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
quorum = quorum[0].strip()
if quorum.find("1") != -1:
return True
if quorum.find("0") != -1:
return False
self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum))
return False
def Components(self):
complist = []
common_ignore = [
"Pending action:",
"(ERROR|error): crm_log_message_adv:",
"(ERROR|error): MSG: No message to dump",
"pending LRM operations at shutdown",
"Lost connection to the CIB manager",
"Connection to the CIB terminated...",
"Sending message to the CIB manager FAILED",
"Action A_RECOVER .* not supported",
"(ERROR|error): stonithd_op_result_ready: not signed on",
"pingd.*(ERROR|error): send_update: Could not send update",
"send_ipc_message: IPC Channel to .* is not connected",
"unconfirmed_actions: Waiting on .* unconfirmed actions",
"cib_native_msgready: Message pending on command channel",
r": Performing A_EXIT_1 - forcefully exiting ",
r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.",
]
stonith_ignore = [
r"Updating failcount for child_DoFencing",
r"error.*: Fencer connection failed \(will retry\)",
"pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.",
]
stonith_ignore.extend(common_ignore)
ccm = Process(self, "ccm", pats = [
"State transition .* S_RECOVERY",
"pacemaker-controld.*Action A_RECOVER .* not supported",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
"pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
r"attrd.*exited with status 1",
r"cib.*exited with status 2",
"State transition S_STARTING -> S_PENDING",
], badnews_ignore = common_ignore)
based = Process(self, "pacemaker-based", pats = [
"State transition .* S_RECOVERY",
"Lost connection to the CIB manager",
"Connection to the CIB manager terminated",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
"pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
r"pacemaker-controld.*: Could not recover from internal error",
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
r"attrd.*exited with status 1",
], badnews_ignore = common_ignore)
execd = Process(self, "pacemaker-execd", pats = [
"State transition .* S_RECOVERY",
"LRM Connection failed",
"pacemaker-controld.*I_ERROR.*lrm_connection_destroy",
"State transition S_STARTING -> S_PENDING",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
# this status number is likely wrong now
r"pacemaker-controld.*exited with status 2",
], badnews_ignore = common_ignore)
controld = Process(self, "pacemaker-controld",
pats = [
"State transition .* S_IDLE",
"State transition S_STARTING -> S_PENDING",
], badnews_ignore = common_ignore)
schedulerd = Process(self, "pacemaker-schedulerd", pats = [
"State transition .* S_RECOVERY",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed",
"pacemaker-controld.*I_ERROR.*save_cib_contents",
# this status number is likely wrong now
r"pacemaker-controld.*exited with status 2",
], badnews_ignore = common_ignore, dc_only=True)
if self.Env["DoFencing"]:
complist.append(Process(self, "stoniths", dc_pats = [
r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed",
"Attempting connection to fencing daemon",
], badnews_ignore = stonith_ignore))
ccm.pats.extend([
# these status numbers are likely wrong now
r"attrd.*exited with status 1",
r"pacemaker-(based|controld).*exited with status 2",
])
based.pats.extend([
# these status numbers are likely wrong now
r"attrd.*exited with status 1",
r"pacemaker-controld.*exited with status 2",
])
execd.pats.extend([
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
])
complist.extend([ ccm, based, execd, controld, schedulerd ])
return complist
- def StandbyStatus(self, node):
+ def standby_status(self, node):
(_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
if not out:
return "off"
out = out[0].strip()
self.debug("Standby result: %s" % out)
return out
# status == "on" : Enter Standby mode
# status == "off": Enter Active mode
def set_standby_mode(self, node, status):
- current_status = self.StandbyStatus(node)
+ current_status = self.standby_status(node)
if current_status == status:
return True
cmd = self.templates["StandbyCmd"] % (node, status)
(rc, _) = self.rsh(node, cmd)
return rc == 0
- def AddDummyRsc(self, node, rid):
+ def add_dummy_rsc(self, node, rid):
rsc_xml = """ '
'""" % (rid, rid)
constraint_xml = """ '
'
""" % (rid, node, node, rid)
self.rsh(node, self.templates['CibAddXml'] % rsc_xml)
self.rsh(node, self.templates['CibAddXml'] % constraint_xml)
- def RemoveDummyRsc(self, node, rid):
+ def remove_dummy_rsc(self, node, rid):
constraint = "\"//rsc_location[@rsc='%s']\"" % rid
rsc = "\"//primitive[@id='%s']\"" % rid
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py
index f5d70a2595..86466770f4 100644
--- a/python/pacemaker/_cts/scenarios.py
+++ b/python/pacemaker/_cts/scenarios.py
@@ -1,408 +1,408 @@
""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence" ]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker._cts.audits import ClusterAudit
from pacemaker._cts.input import should_continue
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.watcher import LogWatcher
class ScenarioComponent:
""" The base class for all scenario components. A scenario component is
one single step in a scenario. Each component is basically just a setup
and teardown method.
"""
def __init__(self, cm, env):
""" Create a new ScenarioComponent instance
Arguments:
cm -- A ClusterManager instance
env -- An Environment instance
"""
# pylint: disable=invalid-name
self._cm = cm
self._env = env
def is_applicable(self):
""" Return True if this component is applicable in the given Environment.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def setup(self):
""" Set up the component, returning True on success. This method must be
provided by all subclasses.
"""
raise NotImplementedError
def teardown(self):
""" Tear down the given component. This method must be provided by all
subclasses.
"""
raise NotImplementedError
class Scenario:
""" The base class for scenario. A scenario is an ordered list of
ScenarioComponent objects. A scenario proceeds by setting up all its
components in sequence, running a list of tests and audits, and then
tearing down its components in reverse.
"""
def __init__(self, cm, components, audits, tests):
""" Create a new Scenario instance
Arguments:
cm -- A ClusterManager instance
components -- A list of ScenarioComponents comprising this Scenario
audits -- A list of ClusterAudits that will be performed as
part of this Scenario
tests -- A list of CTSTests that will be run
"""
# pylint: disable=invalid-name
self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 }
self.tests = tests
self._audits = audits
self._bad_news = None
self._cm = cm
self._components = components
for comp in components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def is_applicable(self):
""" Return True if all ScenarioComponents are applicable """
for comp in self._components:
if not comp.is_applicable():
return False
return True
def setup(self):
""" Set up the scenario, returning True on success. If setup fails at
some point, tear down those components that did successfully set up.
"""
self._cm.prepare()
self.audit() # Also detects remote/local log config
self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
self.audit()
self._cm.install_support()
self._bad_news = LogWatcher(self._cm.Env["LogFileName"],
self._cm.templates.get_patterns("BadNews"),
self._cm.Env["nodes"],
self._cm.Env["LogWatcher"],
"BadNews", 0)
self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self._components):
if not self._components[j].setup():
# OOPS! We failed. Tear partial setups down.
self.audit()
self._cm.log("Tearing down partial setup")
self.teardown(j)
return False
j += 1
self.audit()
return True
def teardown(self, n_components=None):
""" Tear down the scenario in the reverse order it was set up. If
n_components is not None, only tear down that many components.
"""
if not n_components:
n_components = len(self._components)-1
j = n_components
while j >= 0:
self._components[j].teardown()
j -= 1
self.audit()
self._cm.install_support("uninstall")
def incr(self, name):
""" Increment the given stats key """
if not name in self.stats:
self.stats[name] = 0
self.stats[name] += 1
def run(self, iterations):
""" Run all tests in the scenario the given number of times """
- self._cm.oprofileStart()
+ self._cm.oprofile_start()
try:
self._run_loop(iterations)
- self._cm.oprofileStop()
+ self._cm.oprofile_stop()
except:
- self._cm.oprofileStop()
+ self._cm.oprofile_stop()
raise
def _run_loop(self, iterations):
""" Do the hard part of the run method - actually run all the tests the
given number of times.
"""
raise NotImplementedError
def run_test(self, test, testcount):
""" Run the given test. testcount is the number of tests (including
this one) that have been run across all iterations.
"""
nodechoice = self._cm.Env.random_node()
ret = True
did_run = False
self._cm.clear_instance_errors_to_ignore()
choice = "(%s)" % nodechoice
self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount))
starttime = test.set_timer()
if not test.setup(nodechoice):
self._cm.log("Setup failed")
ret = False
elif not test.can_run_now(nodechoice):
self._cm.log("Skipped")
test.skipped()
else:
did_run = True
ret = test(nodechoice)
if not test.teardown(nodechoice):
self._cm.log("Teardown failed")
if not should_continue(self._cm.Env):
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = False
stoptime = time.time()
- self._cm.oprofileSave(testcount)
+ self._cm.oprofile_save(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if "min_time" not in test.stats:
test.stats["elapsed_time"] = elapsed_time
test.stats["min_time"] = test_time
test.stats["max_time"] = test_time
else:
test.stats["elapsed_time"] += elapsed_time
if test_time < test.stats["min_time"]:
test.stats["min_time"] = test_time
if test_time > test.stats["max_time"]:
test.stats["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self._cm.statall()
did_run = True # Force the test count to be incremented anyway so test extraction works
self.audit(test.errors_to_ignore)
return did_run
def summarize(self):
""" Output scenario results """
self._cm.log("****************")
self._cm.log("Overall Results:%r" % self.stats)
self._cm.log("****************")
stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0 }
self._cm.log("Test Summary")
for test in self.tests:
for key in stat_filter:
stat_filter[key] = test.stats[key]
name = "Test %s:" % test.name
self._cm.log("{:<25} {!r}".format(name, stat_filter))
self._cm.debug("Detailed Results")
for test in self.tests:
name = "Test %s:" % test.name
self._cm.debug("{:<25} {!r}".format(name, stat_filter))
self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, local_ignore=None):
""" Perform all scenario audits and log results. If there are too many
failures, prompt the user to confirm that the scenario should continue
running.
"""
errcount = 0
ignorelist = ["CTS:"]
if local_ignore:
ignorelist.extend(local_ignore)
ignorelist.extend(self._cm.errors_to_ignore)
ignorelist.extend(self._cm.instance_errors_to_ignore)
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self._audits:
if not audit():
self._cm.log("Audit %s FAILED." % audit.name)
failed += 1
else:
self._cm.debug("Audit %s passed." % audit.name)
while errcount < 1000:
match = None
if self._bad_news:
match = self._bad_news.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._cm.log("BadNews: %s" % match)
self.incr("BadNews")
errcount += 1
else:
break
else:
print("Big problems")
if not should_continue(self._cm.Env):
self._cm.log("Shutting down.")
self.summarize()
self.teardown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self._bad_news:
self._bad_news.end()
return failed
class AllOnce(Scenario):
""" Every Test Once """
def _run_loop(self, iterations):
testcount = 1
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
""" Random Test Execution """
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
test = self._cm.Env.random_gen.choice(self.tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
""" Named Tests in Sequence """
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
""" Start the Cluster """
def _run_loop(self, iterations):
return
class BootCluster(ScenarioComponent):
""" The BootCluster component simply starts the cluster manager on all
nodes, waiting for each to come up before starting given that a node
might have been rebooted or crashed beforehand.
"""
def is_applicable(self):
""" BootCluster is always applicable """
return True
def setup(self):
""" Set up the component, returning True on success """
self._cm.prepare()
# Clear out the cobwebs ;-)
self._cm.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
self._cm.log("Starting Cluster Manager on all nodes.")
return self._cm.startall(verbose=True, quick=True)
def teardown(self):
""" Tear down the component """
self._cm.log("Stopping Cluster Manager on all nodes")
self._cm.stopall(verbose=True, force=False)
class LeaveBooted(BootCluster):
""" The LeaveBooted component leaves all nodes up when the scenario
is complete.
"""
def teardown(self):
""" Tear down the component """
self._cm.log("Leaving Cluster running on all nodes")
diff --git a/python/pacemaker/_cts/tests/maintenancemode.py b/python/pacemaker/_cts/tests/maintenancemode.py
index f9ed8678bd..d8adcb1a41 100644
--- a/python/pacemaker/_cts/tests/maintenancemode.py
+++ b/python/pacemaker/_cts/tests/maintenancemode.py
@@ -1,228 +1,228 @@
""" Toggle nodes in and out of maintenance mode """
__all__ = ["MaintenanceMode"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class MaintenanceMode(CTSTest):
""" A concrete test that toggles nodes in and out of maintenance mode """
def __init__(self, cm):
""" Create a new MaintenanceMode instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "MaintenanceMode"
self._action = "asyncmon"
self._rid = "maintenanceDummy"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def _toggle_maintenance_mode(self, node, enabled):
""" Toggle maintenance mode on the given node """
pats = [ self.templates["Pat:DC_IDLE"] ]
if enabled:
action = "On"
else:
action = "Off"
# fail the resource right after turning Maintenance mode on
# verify it is not recovered until maintenance mode is turned off
if enabled:
pats.append(self.templates["Pat:RscOpFail"] % (self._action, self._rid))
else:
pats.extend([ self.templates["Pat:RscOpOK"] % ("stop", self._rid),
self.templates["Pat:RscOpOK"] % ("start", self._rid) ])
watch = self.create_watch(pats, 60)
watch.set_watch()
self.debug("Turning maintenance mode %s" % action)
self._rsh(node, self.templates["MaintenanceMode%s" % action])
if enabled:
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
with Timer(self._logger, self.name, "recover%s" % action):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when turning maintenance mode %s" % action)
return repr(watch.unmatched)
return ""
def _insert_maintenance_dummy(self, node):
""" Create a dummy resource on the given node """
pats = [ ("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self._rid)) ]
watch = self.create_watch(pats, 60)
watch.set_watch()
- self._cm.AddDummyRsc(node, self._rid)
+ self._cm.add_dummy_rsc(node, self._rid)
with Timer(self._logger, self.name, "addDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when adding maintenance dummy resource")
return repr(watch.unmatched)
return ""
def _remove_maintenance_dummy(self, node):
""" Remove the previously created dummy resource on the given node """
pats = [ self.templates["Pat:RscOpOK"] % ("stop", self._rid) ]
watch = self.create_watch(pats, 60)
watch.set_watch()
- self._cm.RemoveDummyRsc(node, self._rid)
+ self._cm.remove_dummy_rsc(node, self._rid)
with Timer(self._logger, self.name, "removeDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when removing maintenance dummy resource")
return repr(watch.unmatched)
return ""
def _managed_rscs(self, node):
""" Return a list of all resources managed by the cluster """
rscs = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if tmp.managed:
rscs.append(tmp.id)
return rscs
def _verify_resources(self, node, rscs, managed):
""" Verify that all resources in rscList are managed if they are expected
to be, or unmanaged if they are expected to be.
"""
managed_rscs = rscs
managed_str = "managed"
if not managed:
managed_str = "unmanaged"
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if managed and not tmp.managed:
continue
if not managed and tmp.managed:
continue
if managed_rscs.count(tmp.id):
managed_rscs.remove(tmp.id)
if not managed_rscs:
self.debug("Found all %s resources on %s" % (managed_str, node))
return True
self._logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managed_rscs))
return False
def __call__(self, node):
""" Perform this test """
self.incr("calls")
verify_managed = False
verify_unmanaged = False
fail_pat = ""
if not self._startall(None):
return self.failure("Setup failed")
# get a list of all the managed resources. We use this list
# after enabling maintenance mode to verify all managed resources
# become un-managed. After maintenance mode is turned off, we use
# this list to verify all the resources become managed again.
managed_rscs = self._managed_rscs(node)
if not managed_rscs:
self._logger.log("No managed resources on %s" % node)
return self.skipped()
# insert a fake resource we can fail during maintenance mode
# so we can verify recovery does not take place until after maintenance
# mode is disabled.
fail_pat += self._insert_maintenance_dummy(node)
# toggle maintenance mode ON, then fail dummy resource.
fail_pat += self._toggle_maintenance_mode(node, True)
# verify all the resources are now unmanaged
if self._verify_resources(node, managed_rscs, False):
verify_unmanaged = True
# Toggle maintenance mode OFF, verify dummy is recovered.
fail_pat += self._toggle_maintenance_mode(node, False)
# verify all the resources are now managed again
if self._verify_resources(node, managed_rscs, True):
verify_managed = True
# Remove our maintenance dummy resource.
fail_pat += self._remove_maintenance_dummy(node)
self._cm.cluster_stable()
if fail_pat != "":
return self.failure("Unmatched patterns: %s" % fail_pat)
if not verify_unmanaged:
return self.failure("Failed to verify resources became unmanaged during maintenance mode")
if not verify_managed:
return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for %s" % self._rid,
r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self._rid,
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self._action, self._rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, 0) ]
diff --git a/python/pacemaker/_cts/tests/resourcerecover.py b/python/pacemaker/_cts/tests/resourcerecover.py
index f04abda463..a2ecfa8e31 100644
--- a/python/pacemaker/_cts/tests/resourcerecover.py
+++ b/python/pacemaker/_cts/tests/resourcerecover.py
@@ -1,171 +1,171 @@
""" Fail a random resource and verify its fail count increases """
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class ResourceRecover(CTSTest):
""" A concrete test that fails a random resource """
def __init__(self, cm):
""" Create a new ResourceRecover instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "ResourceRecover"
self._action = "asyncmon"
self._interval = 0
self._rid = None
self._rid_alt = None
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
if not self._startall(None):
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self._cm.active_resources(node)
if not resourcelist:
self._logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self._choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self._rid)
if rsc.id == rsc.clone_id:
self.debug("Failing %s" % rsc.id)
else:
self.debug("Failing %s (also known as %s)" % (rsc.id, rsc.clone_id))
# Log patterns to watch for (failure, plus restart if managed)
pats = [ self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id) ]
if rsc.managed:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._rid))
if rsc.unique:
pats.append(self.templates["Pat:RscOpOK"] % ("start", self._rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self._fail_resource(rsc, node, pats) is None:
# self.failure() already called
return None
return self.success()
def _choose_resource(self, node, resourcelist):
""" Choose a random resource to target """
self._rid = self._env.random_gen.choice(resourcelist)
self._rid_alt = self._rid
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self._cm, line)
if rsc.id == self._rid:
# Handle anonymous clones that get renamed
self._rid = rsc.clone_id
return rsc
return None
def _get_failcount(self, node):
""" Check the fail count of targeted resource on given node """
cmd = "crm_failcount --quiet --query --resource %s --operation %s --interval %d --node %s"
(rc, lines) = self._rsh(node, cmd % (self._rid, self._action, self._interval, node),
verbose=1)
if rc != 0 or len(lines) != 1:
lines = [l.strip() for l in lines]
self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(lines)))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self._logger.log("crm_failcount output on %s unparseable: %s" % (node, " ".join(lines)))
return -1
return failcount
def _fail_resource(self, rsc, node, pats):
""" Fail the targeted resource, and verify as expected """
orig_failcount = self._get_failcount(node)
watch = self.create_watch(pats, 60)
watch.set_watch()
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
with Timer(self._logger, self.name, "recover"):
watch.look_for_all()
self._cm.cluster_stable()
- recovered = self._cm.ResourceLocation(self._rid)
+ recovered = self._cm.resource_location(self._rid)
if watch.unmatched:
return self.failure("Patterns not found: %r" % watch.unmatched)
if rsc.unique and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %r" % (self._rid, recovered))
if recovered:
self.debug("%s is running on: %r" % (self._rid, recovered))
elif rsc.managed:
return self.failure("%s was not recovered and is inactive" % self._rid)
new_failcount = self._get_failcount(node)
if new_failcount != orig_failcount + 1:
return self.failure("%s fail count is %d not %d" % (self._rid,
new_failcount, orig_failcount + 1))
return 0 # Anything but None is success
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for %s" % self._rid,
r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt),
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self._action, self._rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval) ]
diff --git a/python/pacemaker/_cts/tests/standbytest.py b/python/pacemaker/_cts/tests/standbytest.py
index 8df8a917c6..3320338f83 100644
--- a/python/pacemaker/_cts/tests/standbytest.py
+++ b/python/pacemaker/_cts/tests/standbytest.py
@@ -1,111 +1,111 @@
""" Put a node into standby mode and check that resources migrate """
__all__ = ["StandbyTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StandbyTest(CTSTest):
""" A concrete tests that puts a node into standby and checks that resources
migrate away from the node
"""
def __init__(self, cm):
""" Create a new StandbyTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "Standby"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
# make sure the node is active
# set the node to standby mode
# check resources, none resource should be running on the node
# set the node to active mode
# check resources, resources should have been migrated back (SHOULD THEY?)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Start all nodes failed")
self.debug("Make sure node %s is active" % node)
- if self._cm.StandbyStatus(node) != "off":
+ if self._cm.standby_status(node) != "off":
if not self._cm.set_standby_mode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self._cm.cluster_stable()
- status = self._cm.StandbyStatus(node)
+ status = self._cm.standby_status(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
watchpats = [ r"State transition .* -> S_POLICY_ENGINE" ]
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self.debug("Setting node %s to standby mode" % node)
if not self._cm.set_standby_mode(node, "on"):
return self.failure("can't set node %s to standby mode" % node)
self.set_timer("on")
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: %r" % watch.unmatched)
self._cm.set_standby_mode(node, "off")
return self.failure("cluster didn't react to standby change on %s" % node)
self._cm.cluster_stable()
- status = self._cm.StandbyStatus(node)
+ status = self._cm.standby_status(node)
if status != "on":
return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status))
self.log_timer("on")
self.debug("Checking resources")
rscs_on_node = self._cm.active_resources(node)
if rscs_on_node:
rc = self.failure("%s set to standby, %r is still running on it" % (node, rscs_on_node))
self.debug("Setting node %s to active mode" % node)
self._cm.set_standby_mode(node, "off")
return rc
self.debug("Setting node %s to active mode" % node)
if not self._cm.set_standby_mode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self.set_timer("off")
self._cm.cluster_stable()
- status = self._cm.StandbyStatus(node)
+ status = self._cm.standby_status(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.log_timer("off")
return self.success()