diff --git a/python/pacemaker/_cts/CTS.py b/python/pacemaker/_cts/CTS.py
index 520b777575..166ea1098d 100644
--- a/python/pacemaker/_cts/CTS.py
+++ b/python/pacemaker/_cts/CTS.py
@@ -1,239 +1,239 @@
""" Main classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["CtsLab", "NodeStatus", "Process"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys
import time
import traceback
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.input import should_continue
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
class CtsLab:
""" A class that defines the Lab Environment for the Cluster Test System.
It defines those things which are expected to change from test
environment to test environment for the same cluster manager.
This is where you define the set of nodes that are in your test lab,
what kind of reset mechanism you use, etc. All this data is stored
as key/value pairs in an Environment instance constructed from arguments
passed to this class.
The CTS code ignores names it doesn't know about or need. Individual
tests have access to this information, and it is perfectly acceptable
to provide hints, tweaks, fine-tuning directions, or other information
to the tests through this mechanism.
"""
def __init__(self, args=None):
""" Create a new CtsLab instance. This class can be treated kind
of like a dictionary due to the presence of typical dict functions
like __contains__, __getitem__, and __setitem__. However, it is not a
dictionary so do not rely on standard dictionary behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
"""
self._env = EnvFactory().getInstance(args)
self._logger = LogFactory()
def dump(self):
""" Print the current environment """
self._env.dump()
def __contains__(self, key):
""" Does the given environment key exist? """
# pylint gets confused because of EnvFactory here.
# pylint: disable=unsupported-membership-test
return key in self._env
def __getitem__(self, key):
""" Return the given environment key, or raise KeyError if it does
not exist
"""
# Throughout this file, pylint has trouble understanding that EnvFactory
# and RemoteFactory are singleton instances that can be treated as callable
# and subscriptable objects. Various warnings are disabled because of this.
# See also a comment about self._rsh in environment.py.
# pylint: disable=unsubscriptable-object
return self._env[key]
def __setitem__(self, key, value):
""" Set the given environment key to the given value, overriding any
previous value
"""
# pylint: disable=unsupported-assignment-operation
self._env[key] = value
def run(self, scenario, iterations):
""" Run the given scenario the given number of times.
Returns:
ExitStatus.OK on success, or ExitStatus.ERROR on error
"""
if not scenario:
self._logger.log("No scenario was defined")
return ExitStatus.ERROR
self._logger.log("Cluster nodes: ")
# pylint: disable=unsubscriptable-object
for node in self._env["nodes"]:
self._logger.log(" * %s" % (node))
if not scenario.setup():
return ExitStatus.ERROR
# We want to alert on any exceptions caused by running a scenario, so
# here it's okay to disable the pylint warning.
# pylint: disable=bare-except
try:
scenario.run(iterations)
except:
self._logger.log("Exception by %s" % sys.exc_info()[0])
self._logger.traceback(traceback)
scenario.summarize()
scenario.teardown()
return ExitStatus.ERROR
scenario.teardown()
scenario.summarize()
if scenario.stats["failure"] > 0:
return ExitStatus.ERROR
if scenario.stats["success"] != iterations:
self._logger.log("No failure count but success != requested iterations")
return ExitStatus.ERROR
return ExitStatus.OK
class NodeStatus:
""" A class for querying the status of cluster nodes - are nodes up? Do
they respond to SSH connections?
"""
def __init__(self, env):
""" Create a new NodeStatus instance
Arguments:
env -- An Environment instance
"""
self._env = env
def _node_booted(self, node):
""" Return True if the given node is booted (responds to pings) """
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, verbose=0)
return rc == 0
def _sshd_up(self, node):
""" Return true if sshd responds on the given node """
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()(node, "true", verbose=0)
return rc == 0
def wait_for_node(self, node, timeout=300):
""" Wait for a node to become available. Should the timeout be reached,
the user will be given a choice whether to continue or not. If not,
ValueError will be raised.
Returns:
True when the node is available, or False if the timeout is reached.
"""
initial_timeout = timeout
anytimeouts = False
while timeout > 0:
if self._node_booted(node) and self._sshd_up(node):
if anytimeouts:
# Fudge to wait for the system to finish coming up
time.sleep(30)
LogFactory().debug("Node %s now up" % node)
return True
time.sleep(30)
if not anytimeouts:
LogFactory().debug("Waiting for node %s to come up" % node)
anytimeouts = True
timeout -= 1
LogFactory().log("%s did not come up within %d tries" % (node, initial_timeout))
if not should_continue(self._env["continue"]):
raise ValueError("%s did not come up within %d tries" % (node, initial_timeout))
return False
def wait_for_all_nodes(self, nodes, timeout=300):
""" Return True when all nodes come up, or False if the timeout is reached """
for node in nodes:
if not self.wait_for_node(node, timeout):
return False
return True
class Process:
""" A class for managing a Pacemaker daemon """
# pylint: disable=invalid-name
def __init__(self, cm, name, dc_only=False, pats=None, dc_pats=None,
badnews_ignore=None):
""" Create a new Process instance.
Arguments:
cm -- A ClusterManager instance
name -- The command being run
dc_only -- Should this daemon be killed only on the DC?
pats -- Regexes we expect to find in log files
dc_pats -- Additional DC-specific regexes we expect to find
in log files
badnews_ignore -- Regexes for lines in the log that can be ignored
"""
self._cm = cm
self.badnews_ignore = badnews_ignore
self.dc_only = dc_only
self.dc_pats = dc_pats
self.name = name
self.pats = pats
if self.badnews_ignore is None:
self.badnews_ignore = []
if self.dc_pats is None:
self.dc_pats = []
if self.pats is None:
self.pats = []
def kill(self, node):
""" Kill the instance of this process running on the given node """
(rc, _) = self._cm.rsh(node, "killall -9 %s" % self.name)
if rc != 0:
- self._cm.log ("ERROR: Kill %s failed on node %s" % (self.name, node))
+ self._cm.log("ERROR: Kill %s failed on node %s" % (self.name, node))
diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index 4584867e01..c3eb402c51 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1029 +1,1029 @@
""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
""" The base class for various kinds of auditors. Specific audit implementations
should be built on top of this one. Audits can do all kinds of checks on the
system. The basic interface for callers is the `__call__` method, which
returns True if the audit passes and False if it fails.
"""
def __init__(self, cm):
""" Create a new ClusterAudit instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
raise NotImplementedError
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
""" Log a message """
self._cm.log("audit: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("audit: %s" % args)
class LogAudit(ClusterAudit):
""" Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that
we can read back that test message using logging tools.
"""
def __init__(self, cm):
""" Create a new LogAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
""" Restart logging on the given nodes, or all if none are given """
if not nodes:
nodes = self._cm.env["nodes"]
self._cm.debug("Restarting logging on: %r" % nodes)
for node in nodes:
if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
- self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node)
+ self._cm.log("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
- self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node)
+ self._cm.log("ERROR: Cannot start 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"])
if rc != 0:
- self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node))
+ self._cm.log("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node))
def _create_watcher(self, patterns, kind):
""" Create a new LogWatcher instance for the given patterns """
watch = LogWatcher(self._cm.env["LogFileName"], patterns,
self._cm.env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
""" Perform the log audit """
patterns = []
- prefix = "Test message from"
- suffix = str(uuid.uuid4())
- watch = {}
+ prefix = "Test message from"
+ suffix = str(uuid.uuid4())
+ watch = {}
for node in self._cm.env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
watch_pref = self._cm.env["LogWatcher"]
if watch_pref == LogKind.ANY:
- kinds = [ LogKind.FILE ]
+ kinds = [LogKind.FILE]
if self._cm.env["have_systemd"]:
- kinds += [ LogKind.JOURNAL ]
- kinds += [ LogKind.REMOTE_FILE ]
+ kinds += [LogKind.JOURNAL]
+ kinds += [LogKind.REMOTE_FILE]
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log("Logging test message with identifier %s" % suffix)
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.env["nodes"]:
cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
- self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
+ self._cm.log("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in list(watch.keys()):
w = watch[k]
if watch_pref == LogKind.ANY:
self._cm.log("Checking for test message in %s logs" % k)
w.look_for_all(silent=True)
if w.unmatched:
for regex in w.unmatched:
self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
else:
if watch_pref == LogKind.ANY:
self._cm.log("Found test message in %s logs" % k)
self._cm.env["LogWatcher"] = k
return 1
return False
def __call__(self):
max_attempts = 3
attempt = 0
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60*attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
return False
return True
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
if self._cm.env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
""" Audit disk usage on cluster nodes to verify that there is enough free
space left on whichever mounted file system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
""" Create a new DiskAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
result = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
- self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
+ self._cm.log("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
% (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
% (node, used_percent, remaining_mb))
result = False
if not should_continue(self._cm.env):
raise ValueError("Disk full on %s" % node)
elif remaining_mb < 100 or used_percent > 90:
self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class FileAudit(ClusterAudit):
""" Audit the filesystem looking for various failure conditions:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
""" Create a new FileAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def __call__(self):
result = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = False
clean = True
self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug("ps[%s]: %s" % (node, line))
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug("Skipping %s" % node)
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class AuditResource:
""" A base class for storing information about a cluster resource """
def __init__(self, cm, line):
""" Create a new AuditResource instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
""" Is this resource unique? """
return self.flags & 0x20
@property
def orphan(self):
""" Is this resource an orphan? """
return self.flags & 0x01
@property
def managed(self):
""" Is this resource managed by the cluster? """
return self.flags & 0x02
class AuditConstraint:
""" A base class for storing information about a cluster constraint """
def __init__(self, cm, line):
""" Create a new AuditConstraint instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
""" Audit primitive resources to verify a variety of conditions, including that
they are active and managed only when expected; they are active on the
expected clusted node; and that they are not orphaned.
"""
def __init__(self, cm):
""" Create a new PrimitiveAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
""" Perform the audit of a single resource """
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %r" % (resource.id, active))
elif resource.needs_quorum == 1:
self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
rc = False
elif not resource.managed:
self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
rc = False
elif resource.orphan:
self.debug("Resource %s is an inactive orphan" % resource.id)
elif not self._inactive_nodes:
self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
rc = False
elif self._cm.env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
return rc
def _setup(self):
""" Verify cluster nodes are active, and collect resource and colocation
information used for performing the audit.
"""
for node in self._cm.env["nodes"]:
if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.env["nodes"]:
if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name)
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log("Unknown entry: %s" % line)
return True
def __call__(self):
result = True
if not self._setup():
return result
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
result = False
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
""" Audit group resources to verify that each of its child primitive
resources is active on the expected cluster node.
"""
def __init__(self, cm):
""" Create a new GroupAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
result = True
if not self._setup():
return result
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
result = False
self._cm.log("Child %s of %s is active more than once: %r"
% (child.id, group.id, nodes))
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
result = False
self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
% (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return result
class CloneAudit(PrimitiveAudit):
""" Audit clone resources. NOTE: Currently, this class does not perform
any actual audit functions.
"""
def __init__(self, cm):
""" Create a new CloneAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
result = True
if not self._setup():
return result
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return result
class ColocationAudit(PrimitiveAudit):
""" Audit cluster resources to verify that those that should be colocated
with each other actually are.
"""
def __init__(self, cm):
""" Create a new ColocationAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
""" Return a list of cluster nodes where a given resource is running """
(rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
result = True
if not self._setup():
return result
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if not node in target:
result = False
self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
% (coloc.id, coloc.rsc, node, target))
else:
self.debug("Colocation audit (%s): %s running on %s (in %r)"
% (coloc.id, coloc.rsc, node, target))
return result
class ControllerStateAudit(ClusterAudit):
""" Audit cluster nodes to verify that those we expect to be active are
active, and those that are expected to be inactive are inactive.
"""
def __init__(self, cm):
""" Create a new ControllerStateAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
result = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.env["nodes"]:
should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
result = False
self._cm.log("Cluster is not stable: %d (of %d): %r"
% (len(unstable_list), self._cm.upcount(), unstable_list))
if up_are_down > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be up were down."
% (up_are_down, len(self._cm.env["nodes"])))
if down_are_up > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be down were up."
% (down_are_up, len(self._cm.env["nodes"])))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
""" Audit the CIB by verifying that it is identical across cluster nodes """
def __init__(self, cm):
""" Create a new CIBAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return result
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
if self._audit_cib_contents(partition) == 0:
result = False
return result
def _audit_cib_contents(self, hostlist):
""" Perform the CIB audit on the given hosts """
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node)
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node0)
passed = False
else:
(rc, result) = self._cm.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
return passed
def _store_remote_cib(self, node, target):
""" Store a copy of the given node's CIB on the given target node. If
no target is given, store the CIB on the given node.
"""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
""" Audit each partition in a cluster to verify a variety of conditions:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
""" Create a new PartitionAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return result
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
result = False
for partition in ccm_partitions:
self._cm.log("\t %s" % partition)
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
result = False
return result
def _trim_string(self, avalue):
""" Remove the last character from a multi-character string """
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
""" Remove the last character from a multi-character string and convert
the result to an int.
"""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
""" Perform the audit of a single partition """
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % partition)
for node in node_list:
if self._cm.expected_status[node] != "up":
self._cm.log("Warn: Node %s appeared out of nowhere" % node)
self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
- self._node_state[node] = self._trim_string(self._node_state[node])
+ self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log("Lowest epoch not determined in %s" % partition)
passed = False
for node in node_list:
if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self._node_epoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self._cm.log("DC %s is not the oldest node (%d vs. %d)"
% (node, self._node_epoch[node], lowest_epoch))
passed = False
if not dc_found:
self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
% (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self._cm.log("%d DCs (%s) found in cluster partition: %s"
% (len(dc_found), str(dc_found), str(node_list)))
passed = False
if not passed:
for node in node_list:
if self._cm.expected_status[node] == "up":
self._cm.log("epoch %s : %s"
% (self._node_epoch[node], self._node_state[node]))
return passed
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
""" Return a list of instances of applicable audits that can be performed
for the given ClusterManager.
"""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/cib.py b/python/pacemaker/_cts/cib.py
index 63295ac0c2..fba76bfbb3 100644
--- a/python/pacemaker/_cts/cib.py
+++ b/python/pacemaker/_cts/cib.py
@@ -1,424 +1,424 @@
""" CIB generator for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["ConfigFactory"]
__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import warnings
import tempfile
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.cibxml import Alerts, Clone, Expression, FencingTopology, Group, Nodes, OpDefaults, Option, Resource, Rule
from pacemaker._cts.network import next_ip
class CIB:
""" A class for generating, representing, and installing a CIB file onto
cluster nodes
"""
def __init__(self, cm, version, factory, tmpfile=None):
""" Create a new CIB instance
Arguments:
cm -- A ClusterManager instance
version -- The schema syntax version
factory -- A ConfigFactory instance
tmpfile -- Where to store the CIB, or None to use a new tempfile
"""
# pylint: disable=invalid-name
self._cib = None
self._cm = cm
self._counter = 1
self._factory = factory
self._num_nodes = 0
self.version = version
if not tmpfile:
warnings.filterwarnings("ignore")
# pylint: disable=consider-using-with
f = tempfile.NamedTemporaryFile(delete=True)
f.close()
tmpfile = f.name
warnings.resetwarnings()
self._factory.tmpfile = tmpfile
def _show(self):
""" Query a cluster node for its generated CIB; log and return the result """
output = ""
(_, result) = self._factory.rsh(self._factory.target, "HOME=/root CIB_file=%s cibadmin -Ql" % self._factory.tmpfile, verbose=1)
for line in result:
output += line
self._factory.debug("Generated Config: %s" % line)
return output
def new_ip(self, name=None):
""" Generate an IP resource for the next available IP address, optionally
specifying the resource's name.
"""
if self._cm.env["IPagent"] == "IPaddr2":
ip = next_ip(self._cm.env["IPBase"])
if not name:
if ":" in ip:
(_, _, suffix) = ip.rpartition(":")
name = "r%s" % suffix
else:
name = "r%s" % ip
r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
r["ip"] = ip
if ":" in ip:
r["cidr_netmask"] = "64"
r["nic"] = "eth0"
else:
r["cidr_netmask"] = "32"
else:
if not name:
name = "r%s%d" % (self._cm.env["IPagent"], self._counter)
self._counter += 1
r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
r.add_op("monitor", "5s")
return r
def get_node_id(self, node_name):
""" Check the cluster configuration for the node ID for the given node_name """
# We can't account for every possible configuration,
# so we only return a node ID if:
# * The node is specified in /etc/corosync/corosync.conf
# with "ring0_addr:" equal to node_name and "nodeid:"
# explicitly specified.
# In all other cases, we return 0.
node_id = 0
# awkward command: use } as record separator
# so each corosync.conf "object" is one record;
# match the "node {" record that has "ring0_addr: node_name";
# then print the substring of that record after "nodeid:"
(rc, output) = self._factory.rsh(self._factory.target,
r"""awk -v RS="}" """
r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/"""
r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s"""
% (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1)
if rc == 0 and len(output) == 1:
try:
node_id = int(output[0])
except ValueError:
node_id = 0
return node_id
def install(self, target):
""" Generate a CIB file and install it to the given cluster node """
old = self._factory.tmpfile
# Force a rebuild
self._cib = None
self._factory.tmpfile = "%s/cib.xml" % BuildOptions.CIB_DIR
self.contents(target)
self._factory.rsh(self._factory.target, "chown %s %s" % (BuildOptions.DAEMON_USER, self._factory.tmpfile))
self._factory.tmpfile = old
def contents(self, target):
""" Generate a complete CIB file """
# fencing resource
if self._cib:
return self._cib
if target:
self._factory.target = target
self._factory.rsh(self._factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self._factory.tmpfile))
self._num_nodes = len(self._cm.env["nodes"])
no_quorum = "stop"
if self._num_nodes < 3:
no_quorum = "ignore"
self._factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self._num_nodes)
# We don't need a nodes section unless we add attributes
stn = None
# Fencing resource
# Define first so that the shell doesn't reject every update
if self._cm.env["DoFencing"]:
# Define the "real" fencing device
st = Resource(self._factory, "Fencing", self._cm.env["stonith-type"], "stonith")
# Set a threshold for unreliable stonith devices such as the vmware one
st.add_meta("migration-threshold", "5")
st.add_op("monitor", "120s", timeout="120s")
st.add_op("stop", "0", timeout="60s")
st.add_op("start", "0", timeout="60s")
# For remote node tests, a cluster node is stopped and brought back up
# as a remote node with the name "remote-OLDNAME". To allow fencing
# devices to fence these nodes, create a list of all possible node names.
- all_node_names = [ prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-') ]
+ all_node_names = [prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-')]
# Add all parameters specified by user
entries = self._cm.env["stonith-params"].split(',')
for entry in entries:
try:
(name, value) = entry.split('=', 1)
except ValueError:
print("Warning: skipping invalid fencing parameter: %s" % entry)
continue
# Allow user to specify "all" as the node list, and expand it here
- if name in [ "hostlist", "pcmk_host_list" ] and value == "all":
+ if name in ["hostlist", "pcmk_host_list"] and value == "all":
value = ' '.join(all_node_names)
st[name] = value
st.commit()
# Test advanced fencing logic
stf_nodes = []
stt_nodes = []
attr_nodes = {}
# Create the levels
stl = FencingTopology(self._factory)
for node in self._cm.env["nodes"]:
# Remote node tests will rename the node
remote_node = "remote-%s" % node
# Randomly assign node to a fencing method
ftype = self._cm.env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
# For levels-and, randomly choose targeting by node name or attribute
by = ""
if ftype == "levels-and":
node_id = self.get_node_id(node)
if node_id == 0 or self._cm.env.random_gen.choice([True, False]):
by = " (by name)"
else:
attr_nodes[node] = node_id
by = " (by attribute)"
self._cm.log(" - Using %s fencing for node: %s%s" % (ftype, node, by))
if ftype == "levels-and":
# If targeting by name, add a topology level for this node
if node not in attr_nodes:
stl.level(1, node, "FencingPass,Fencing")
# Always target remote nodes by name, otherwise we would need to add
# an attribute to the remote node only during remote tests (we don't
# want nonexistent remote nodes showing up in the non-remote tests).
# That complexity is not worth the effort.
stl.level(1, remote_node, "FencingPass,Fencing")
# Add the node (and its remote equivalent) to the list of levels-and nodes.
stt_nodes.extend([node, remote_node])
elif ftype == "levels-or ":
- for n in [ node, remote_node ]:
+ for n in [node, remote_node]:
stl.level(1, n, "FencingFail")
stl.level(2, n, "Fencing")
stf_nodes.extend([node, remote_node])
# If any levels-and nodes were targeted by attribute,
# create the attributes and a level for the attribute.
if attr_nodes:
stn = Nodes(self._factory)
for (node_name, node_id) in attr_nodes.items():
- stn.add_node(node_name, node_id, { "cts-fencing" : "levels-and" })
+ stn.add_node(node_name, node_id, {"cts-fencing": "levels-and"})
stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and")
# Create a Dummy agent that always passes for levels-and
if stt_nodes:
stt = Resource(self._factory, "FencingPass", "fence_dummy", "stonith")
stt["pcmk_host_list"] = " ".join(stt_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stt["random_sleep_range"] = "30"
stt["mode"] = "pass"
stt.commit()
# Create a Dummy agent that always fails for levels-or
if stf_nodes:
stf = Resource(self._factory, "FencingFail", "fence_dummy", "stonith")
stf["pcmk_host_list"] = " ".join(stf_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stf["random_sleep_range"] = "30"
stf["mode"] = "fail"
stf.commit()
# Now commit the levels themselves
stl.commit()
o = Option(self._factory)
o["stonith-enabled"] = self._cm.env["DoFencing"]
o["start-failure-is-fatal"] = "false"
o["pe-input-series-max"] = "5000"
o["shutdown-escalation"] = "5min"
o["batch-limit"] = "10"
o["dc-deadtime"] = "5s"
o["no-quorum-policy"] = no_quorum
o.commit()
o = OpDefaults(self._factory)
o["timeout"] = "90s"
o.commit()
# Commit the nodes section if we defined one
if stn is not None:
stn.commit()
# Add an alerts section if possible
if self._factory.rsh.exists_on_all(self._cm.env["notification-agent"], self._cm.env["nodes"]):
alerts = Alerts(self._factory)
alerts.add_alert(self._cm.env["notification-agent"],
self._cm.env["notification-recipient"])
alerts.commit()
# Add resources?
if self._cm.env["CIBResource"]:
self.add_resources()
# generate cib
self._cib = self._show()
if self._factory.tmpfile != "%s/cib.xml" % BuildOptions.CIB_DIR:
self._factory.rsh(self._factory.target, "rm -f %s" % self._factory.tmpfile)
return self._cib
def add_resources(self):
""" Add various resources and their constraints to the CIB """
# Per-node resources
for node in self._cm.env["nodes"]:
name = "rsc_%s" % node
r = self.new_ip(name)
r.prefer(node, "100")
r.commit()
# Migrator
# Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach
- m = Resource(self._factory, "migrator","Dummy", "ocf", "pacemaker")
+ m = Resource(self._factory, "migrator", "Dummy", "ocf", "pacemaker")
m["passwd"] = "whatever"
- m.add_meta("resource-stickiness","1")
+ m.add_meta("resource-stickiness", "1")
m.add_meta("allow-migrate", "1")
m.add_op("monitor", "P10S")
m.commit()
# Ping the test exerciser
- p = Resource(self._factory, "ping-1","ping", "ocf", "pacemaker")
+ p = Resource(self._factory, "ping-1", "ping", "ocf", "pacemaker")
p.add_op("monitor", "60s")
p["host_list"] = self._cm.env["cts-exerciser"]
p["name"] = "connected"
p["debug"] = "true"
c = Clone(self._factory, "Connectivity", p)
c["globally-unique"] = "false"
c.commit()
# promotable clone resource
s = Resource(self._factory, "stateful-1", "Stateful", "ocf", "pacemaker")
s.add_op("monitor", "15s", timeout="60s")
s.add_op("monitor", "16s", timeout="60s", role="Promoted")
ms = Clone(self._factory, "promotable-1", s)
ms["promotable"] = "true"
ms["clone-max"] = self._num_nodes
ms["clone-node-max"] = 1
ms["promoted-max"] = 1
ms["promoted-node-max"] = 1
# Require connectivity to run the promotable clone
r = Rule(self._factory, "connected", "-INFINITY", op="or")
r.add_child(Expression(self._factory, "m1-connected-1", "connected", "lt", "1"))
r.add_child(Expression(self._factory, "m1-connected-2", "connected", "not_defined", None))
ms.prefer("connected", rule=r)
ms.commit()
# Group Resource
g = Group(self._factory, "group-1")
g.add_child(self.new_ip())
if self._cm.env["have_systemd"]:
sysd = Resource(self._factory, "petulant", "pacemaker-cts-dummyd@10", "service")
sysd.add_op("monitor", "P10S")
g.add_child(sysd)
else:
g.add_child(self.new_ip())
g.add_child(self.new_ip())
# Make group depend on the promotable clone
g.after("promotable-1", first="promote", then="start")
g.colocate("promotable-1", "INFINITY", withrole="Promoted")
g.commit()
# LSB resource
lsb = Resource(self._factory, "lsb-dummy", "LSBDummy", "lsb")
lsb.add_op("monitor", "5s")
# LSB with group
lsb.after("group-1")
lsb.colocate("group-1")
lsb.commit()
class ConfigFactory:
""" Singleton to generate a CIB file for the environment's schema version """
def __init__(self, cm):
""" Create a new ConfigFactory instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.rsh = self._cm.rsh
if not self._cm.env["ListTests"]:
self.target = self._cm.env["nodes"][0]
self.tmpfile = None
def log(self, args):
""" Log a message """
self._cm.log("cib: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("cib: %s" % args)
def create_config(self, name="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION):
""" Return a CIB object for the given schema version """
return CIB(self._cm, name, self)
diff --git a/python/pacemaker/_cts/cibxml.py b/python/pacemaker/_cts/cibxml.py
index 88df7bf7aa..52e37216cc 100644
--- a/python/pacemaker/_cts/cibxml.py
+++ b/python/pacemaker/_cts/cibxml.py
@@ -1,723 +1,734 @@
""" CIB XML generator for Pacemaker's Cluster Test Suite (CTS) """
-__all__ = [ "Alerts", "Clone", "Expression", "FencingTopology", "Group", "Nodes", "OpDefaults", "Option", "Resource", "Rule" ]
+__all__ = [
+ "Alerts",
+ "Clone",
+ "Expression",
+ "FencingTopology",
+ "Group",
+ "Nodes",
+ "OpDefaults",
+ "Option",
+ "Resource",
+ "Rule",
+]
__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
def key_val_string(**kwargs):
""" Given keyword arguments as key=value pairs, construct a single string
containing all those pairs separated by spaces. This is suitable for
using in an XML element as a list of its attributes.
Any pairs that have value=None will be skipped.
Note that a dictionary can be passed to this function instead of kwargs
by using a construction like:
key_val_string(**{"a": 1, "b": 2})
"""
retval = ""
for (k, v) in kwargs.items():
if v is None:
continue
retval += ' %s="%s"' % (k, v)
return retval
def element(element_name, **kwargs):
""" Create an XML element string with the given element_name and attributes.
This element does not support having any children, so it will be closed
on the same line. The attributes are processed by key_val_string.
"""
return "<%s %s/>" % (element_name, key_val_string(**kwargs))
def containing_element(element_name, inner, **kwargs):
""" Like element, but surrounds some child text passed by the inner
parameter.
"""
attrs = key_val_string(**kwargs)
return "<%s %s>%s%s>" % (element_name, attrs, inner, element_name)
class XmlBase:
""" A base class for deriving all kinds of XML sections in the CIB. This
class contains only the most basic operations common to all sections.
It is up to subclasses to provide most behavior.
Note that subclasses of this base class often have different sets of
arguments to their __init__ methods. In general this is not a great
practice, however it is so thoroughly used in these classes that trying
to straighten it out is likely to cause more bugs than just leaving it
alone for now.
"""
def __init__(self, factory, tag, _id, **kwargs):
""" Create a new XmlBase instance
Arguments:
factory -- A ConfigFactory instance
tag -- The XML element's start and end tag
_id -- A unique name for the element
kwargs -- Any additional key/value pairs that should be added to
this element as attributes
"""
self._children = []
self._factory = factory
self._kwargs = kwargs
self._tag = tag
self.name = _id
def __repr__(self):
""" Return a short string description of this XML section """
return "%s-%s" % (self._tag, self.name)
def add_child(self, child):
""" Add an XML section as a child of this one """
self._children.append(child)
def __setitem__(self, key, value):
""" Add a key/value pair to this element, resulting in it becoming an
XML attribute. If value is None, remove the key.
"""
if value:
self._kwargs[key] = value
else:
self._kwargs.pop(key, None)
def show(self):
""" Return a string representation of this XML section, including all
of its children
"""
text = '''<%s''' % self._tag
if self.name:
text += ''' id="%s"''' % self.name
text += key_val_string(**self._kwargs)
if not self._children:
text += '''/>'''
return text
text += '''>'''
for c in self._children:
text += c.show()
text += '''%s>''' % self._tag
return text
def _run(self, operation, xml, section, options=""):
""" Update the CIB on the cluster to include this XML section, including
all of its children
Arguments:
operation -- Whether this update is a "create" or "modify" operation
xml -- The XML to update the CIB with, typically the result
of calling show
section -- Which section of the CIB this update applies to (see
the --scope argument to cibadmin for allowed values)
options -- Extra options to pass to cibadmin
"""
if self.name:
label = self.name
else:
label = "<%s>" % self._tag
self._factory.debug("Writing out %s" % label)
- fixed = "HOME=/root CIB_file=%s" % self._factory.tmpfile
+ fixed = "HOME=/root CIB_file=%s" % self._factory.tmpfile
fixed += " cibadmin --%s --scope %s %s --xml-text '%s'" % (operation, section, options, xml)
(rc, _) = self._factory.rsh(self._factory.target, fixed)
if rc != 0:
raise RuntimeError("Configure call failed: %s" % fixed)
class InstanceAttributes(XmlBase):
""" A class that creates an XML section with
key/value pairs
"""
def __init__(self, factory, _id, attrs):
""" Create a new InstanceAttributes instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
attrs -- Key/value pairs to add as nvpair child elements
"""
XmlBase.__init__(self, factory, "instance_attributes", _id)
# Create an for each attribute
for (attr, value) in attrs.items():
self.add_child(XmlBase(factory, "nvpair", "%s-%s" % (_id, attr),
name=attr, value=value))
class Node(XmlBase):
""" A class that creates a XML section for a single node, complete
with node attributes
"""
def __init__(self, factory, node_name, node_id, node_attrs):
""" Create a new Node instance
Arguments:
factory -- A ConfigFactory instance
node_name -- The value of the uname attribute for this node
node_id -- A unique name for the element
node_attrs -- Additional key/value pairs to set as instance
attributes for this node
"""
XmlBase.__init__(self, factory, "node", node_id, uname=node_name)
self.add_child(InstanceAttributes(factory, "%s-1" % node_name, node_attrs))
class Nodes(XmlBase):
""" A class that creates a XML section containing multiple Node
instances as children
"""
def __init__(self, factory):
""" Create a new Nodes instance
Arguments:
factory -- A ConfigFactory instance
"""
XmlBase.__init__(self, factory, "nodes", None)
def add_node(self, node_name, node_id, node_attrs):
""" Add a child node element
Arguments:
node_name -- The value of the uname attribute for this node
node_id -- A unique name for the element
node_attrs -- Additional key/value pairs to set as instance
attributes for this node
"""
self.add_child(Node(self._factory, node_name, node_id, node_attrs))
def commit(self):
""" Modify the CIB on the cluster to include this XML section """
self._run("modify", self.show(), "configuration", "--allow-create")
class FencingTopology(XmlBase):
""" A class that creates a XML section describing how
fencing is configured in the cluster
"""
def __init__(self, factory):
""" Create a new FencingTopology instance
Arguments:
factory -- A ConfigFactory instance
"""
XmlBase.__init__(self, factory, "fencing-topology", None)
def level(self, index, target, devices, target_attr=None, target_value=None):
""" Generate a XML element
index -- The order in which to attempt fencing-levels
(1 through 9). Levels are attempted in ascending
order until one succeeds.
target -- The name of a single node to which this level applies
devices -- A list of devices that must all be tried for this
level
target_attr -- The name of a node attribute that is set for nodes
to which this level applies
target_value -- The value of a node attribute that is set for nodes
to which this level applies
"""
if target:
xml_id = "cts-%s.%d" % (target, index)
self.add_child(XmlBase(self._factory, "fencing-level", xml_id, target=target, index=index, devices=devices))
else:
xml_id = "%s-%s.%d" % (target_attr, target_value, index)
child = XmlBase(self._factory, "fencing-level", xml_id, index=index, devices=devices)
- child["target-attribute"]=target_attr
- child["target-value"]=target_value
+ child["target-attribute"] = target_attr
+ child["target-value"] = target_value
self.add_child(child)
def commit(self):
""" Create this XML section in the CIB """
self._run("create", self.show(), "configuration", "--allow-create")
class Option(XmlBase):
""" A class that creates a XML section of key/value
pairs for cluster-wide configuration settings
"""
def __init__(self, factory, _id="cib-bootstrap-options"):
""" Create a new Option instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
"""
XmlBase.__init__(self, factory, "cluster_property_set", _id)
def __setitem__(self, key, value):
""" Add a child nvpair element containing the given key/value pair """
self.add_child(XmlBase(self._factory, "nvpair", "cts-%s" % key, name=key, value=value))
def commit(self):
""" Modify the CIB on the cluster to include this XML section """
self._run("modify", self.show(), "crm_config", "--allow-create")
class OpDefaults(XmlBase):
""" A class that creates a XML section of key/value
pairs for operation default settings
"""
def __init__(self, factory):
""" Create a new OpDefaults instance
Arguments:
factory -- A ConfigFactory instance
"""
XmlBase.__init__(self, factory, "op_defaults", None)
self.meta = XmlBase(self._factory, "meta_attributes", "cts-op_defaults-meta")
self.add_child(self.meta)
def __setitem__(self, key, value):
""" Add a child nvpair meta_attribute element containing the given
key/value pair
"""
self.meta.add_child(XmlBase(self._factory, "nvpair", "cts-op_defaults-%s" % key, name=key, value=value))
def commit(self):
""" Modify the CIB on the cluster to include this XML section """
self._run("modify", self.show(), "configuration", "--allow-create")
class Alerts(XmlBase):
""" A class that creates an XML section """
def __init__(self, factory):
""" Create a new Alerts instance
Arguments:
factory -- A ConfigFactory instance
"""
XmlBase.__init__(self, factory, "alerts", None)
self._alert_count = 0
def add_alert(self, path, recipient):
""" Create a new alert as a child of this XML section
Arguments:
path -- The path to a script to be called when a cluster
event occurs
recipient -- An environment variable to be passed to the script
"""
self._alert_count += 1
alert = XmlBase(self._factory, "alert", "alert-%d" % self._alert_count,
path=path)
recipient1 = XmlBase(self._factory, "recipient",
"alert-%d-recipient-1" % self._alert_count,
value=recipient)
alert.add_child(recipient1)
self.add_child(alert)
def commit(self):
""" Modify the CIB on the cluster to include this XML section """
self._run("modify", self.show(), "configuration", "--allow-create")
class Expression(XmlBase):
""" A class that creates an XML element as part of some
constraint rule
"""
def __init__(self, factory, _id, attr, op, value=None):
""" Create a new Expression instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
attr -- The attribute to be tested
op -- The comparison to perform ("lt", "eq", "defined", etc.)
value -- Value for comparison (can be None for "defined" and
"not_defined" operations)
"""
XmlBase.__init__(self, factory, "expression", _id, attribute=attr, operation=op)
if value:
self["value"] = value
class Rule(XmlBase):
""" A class that creates a XML section consisting of one or more
expressions, as part of some constraint
"""
def __init__(self, factory, _id, score, op="and", expr=None):
""" Create a new Rule instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
score -- If this rule is used in a location constraint and
evaluates to true, apply this score to the constraint
op -- If this rule contains more than one expression, use this
boolean op when evaluating
expr -- An Expression instance that can be added to this Rule
when it is created
"""
XmlBase.__init__(self, factory, "rule", _id)
self["boolean-op"] = op
self["score"] = score
if expr:
self.add_child(expr)
class Resource(XmlBase):
""" A base class that creates all kinds of XML sections fully
describing a single cluster resource. This defaults to primitive
resources, but subclasses can create other types.
"""
def __init__(self, factory, _id, rtype, standard, provider=None):
""" Create a new Resource instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
rtype -- The name of the resource agent
standard -- The standard the resource agent follows ("ocf",
"systemd", etc.)
provider -- The vendor providing the resource agent
"""
XmlBase.__init__(self, factory, "native", _id)
self._provider = provider
self._rtype = rtype
self._standard = standard
self._meta = {}
self._op = []
self._param = {}
self._coloc = {}
self._needs = {}
self._scores = {}
if self._standard == "ocf" and not provider:
self._provider = "heartbeat"
elif self._standard == "lsb":
self._provider = None
def __setitem__(self, key, value):
""" Add a child nvpair element containing the given key/value pair as
an instance attribute
"""
self._add_param(key, value)
def add_op(self, _id, interval, **kwargs):
""" Add an operation child XML element to this resource
Arguments:
_id -- A unique name for the element. Also, the action to
perform ("monitor", "start", "stop", etc.)
interval -- How frequently (in seconds) to perform the operation
kwargs -- Any additional key/value pairs that should be added to
this element as attributes
"""
self._op.append(XmlBase(self._factory, "op", "%s-%s" % (_id, interval),
name=_id, interval=interval, **kwargs))
def _add_param(self, name, value):
""" Add a child nvpair element containing the given key/value pair as
an instance attribute
"""
self._param[name] = value
def add_meta(self, name, value):
""" Add a child nvpair element containing the given key/value pair as
a meta attribute
"""
self._meta[name] = value
def prefer(self, node, score="INFINITY", rule=None):
""" Add a location constraint where this resource prefers some node
Arguments:
node -- The name of the node to prefer
score -- Apply this score to the location constraint
rule -- A Rule instance to use in creating this constraint, instead
of creating a new rule
"""
if not rule:
rule = Rule(self._factory, "prefer-%s-r" % node, score,
expr=Expression(self._factory, "prefer-%s-e" % node, "#uname", "eq", node))
self._scores[node] = rule
def after(self, resource, kind="Mandatory", first="start", then="start", **kwargs):
""" Create an ordering constraint between this resource and some other
Arguments:
resource -- The name of the dependent resource
kind -- How to enforce the constraint ("mandatory", "optional",
"serialize")
first -- The action that this resource must complete before the
then-action can be initiated for the dependent resource
("start", "stop", "promote", "demote")
then -- The action that the dependent resource can execute only
after the first-action has completed (same values as
first)
kwargs -- Any additional key/value pairs that should be added to
this element as attributes
"""
kargs = kwargs.copy()
kargs["kind"] = kind
if then:
kargs["first-action"] = "start"
kargs["then-action"] = then
if first:
kargs["first-action"] = first
self._needs[resource] = kargs
def colocate(self, resource, score="INFINITY", role=None, withrole=None, **kwargs):
""" Create a colocation constraint between this resource and some other
Arguments:
resource -- The name of the resource that should be located relative
this one
score -- Apply this score to the colocation constraint
role -- Apply this colocation constraint only to promotable clones
in this role ("started", "promoted", "unpromoted")
withrole -- Apply this colocation constraint only to with-rsc promotable
clones in this role
kwargs -- Any additional key/value pairs that should be added to
this element as attributes
"""
kargs = kwargs.copy()
kargs["score"] = score
if role:
kargs["rsc-role"] = role
if withrole:
kargs["with-rsc-role"] = withrole
self._coloc[resource] = kargs
def _constraints(self):
""" Generate a XML section containing all previously added
ordering and colocation constraints
"""
text = ""
for (k, v) in self._scores.items():
attrs = {"id": "prefer-%s" % k, "rsc": self.name}
text += containing_element("rsc_location", v.show(), **attrs)
for (k, kargs) in self._needs.items():
attrs = {"id": "%s-after-%s" % (self.name, k), "first": k, "then": self.name}
text += element("rsc_order", **attrs, **kargs)
for (k, kargs) in self._coloc.items():
attrs = {"id": "%s-with-%s" % (self.name, k), "rsc": self.name, "with-rsc": k}
text += element("rsc_colocation", **attrs)
text += ""
return text
def show(self):
""" Return a string representation of this XML section, including all
of its children
"""
text = ''''''
if self._meta:
nvpairs = ""
for (p, v) in self._meta.items():
attrs = {"id": "%s-%s" % (self.name, p), "name": p, "value": v}
nvpairs += element("nvpair", **attrs)
text += containing_element("meta_attributes", nvpairs,
id="%s-meta" % self.name)
if self._param:
nvpairs = ""
for (p, v) in self._param.items():
attrs = {"id": "%s-%s" % (self.name, p), "name": p, "value": v}
nvpairs += element("nvpair", **attrs)
text += containing_element("instance_attributes", nvpairs,
id="%s-params" % self.name)
if self._op:
text += ''''''
for o in self._op:
key = o.name
o.name = "%s-%s" % (self.name, key)
text += o.show()
o.name = key
text += ''''''
text += ''''''
return text
def commit(self):
""" Modify the CIB on the cluster to include this XML section """
self._run("create", self.show(), "resources")
self._run("modify", self._constraints(), "constraints")
class Group(Resource):
""" A specialized Resource subclass that creates a XML section
describing a single group resource consisting of multiple child
primitive resources
"""
def __init__(self, factory, _id):
""" Create a new Group instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
"""
Resource.__init__(self, factory, _id, None, None)
self.tag = "group"
def __setitem__(self, key, value):
self.add_meta(key, value)
def show(self):
""" Return a string representation of this XML section, including all
of its children
"""
text = '''<%s id="%s">''' % (self.tag, self.name)
if len(self._meta) > 0:
nvpairs = ""
for (p, v) in self._meta.items():
attrs = {"id": "%s-%s" % (self.name, p), "name": p, "value": v}
nvpairs += element("nvpair", **attrs)
text += containing_element("meta_attributes", nvpairs,
id="%s-meta" % self.name)
for c in self._children:
text += c.show()
text += '''%s>''' % self.tag
return text
class Clone(Group):
""" A specialized Group subclass that creates a XML section
describing a clone resource containing multiple instances of a
single primitive resource
"""
def __init__(self, factory, _id, child=None):
""" Create a new Clone instance
Arguments:
factory -- A ConfigFactory instance
_id -- A unique name for the element
child -- A Resource instance that can be added to this Clone
when it is created. Alternately, use add_child later.
Note that a Clone may only have one child.
"""
Group.__init__(self, factory, _id)
self.tag = "clone"
if child:
self.add_child(child)
def add_child(self, child):
""" Add the given resource as a child of this Clone. Note that a
Clone resource only supports one child at a time.
"""
if not self._children:
self._children.append(child)
else:
self._factory.log("Clones can only have a single child. Ignoring %s" % child.name)
diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py
index 9f4f39fa40..652108f3fe 100644
--- a/python/pacemaker/_cts/clustermanager.py
+++ b/python/pacemaker/_cts/clustermanager.py
@@ -1,904 +1,916 @@
""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["ClusterManager"]
__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
Certain portions by Huang Zhen are copyright 2004
International Business Machines. The version control history for this file
may have further details."""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import time
from collections import UserDict
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.CTS import NodeStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.cib import ConfigFactory
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogWatcher
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
# ClusterManager has a lot of methods.
# pylint: disable=too-many-public-methods
class ClusterManager(UserDict):
""" An abstract base class for managing the cluster. This class implements
high-level operations on the cluster and/or its cluster managers.
Actual cluster-specific management classes should be subclassed from this
one.
Among other things, this class tracks the state every node is expected to
be in.
"""
def _final_conditions(self):
""" Check all keys to make sure they have a non-None value """
for (key, val) in self._data.items():
if val is None:
raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key)
def __init__(self):
""" Create a new ClusterManager instance. This class can be treated
kind of like a dictionary due to the process of certain dict
functions like __getitem__ and __setitem__. This is because it
contains a lot of name/value pairs. However, it is not actually
a dictionary so do not rely on standard dictionary behavior.
"""
# Eventually, ClusterManager should not be a UserDict subclass. Until
# that point...
# pylint: disable=super-init-not-called
self.__instance_errors_to_ignore = []
self._cib_installed = False
self._data = {}
self._logger = LogFactory()
self.env = EnvFactory().getInstance()
self.expected_status = {}
self.name = self.env["Name"]
# pylint: disable=invalid-name
self.ns = NodeStatus(self.env)
self.our_node = os.uname()[1].lower()
self.partitions_expected = 1
self.rsh = RemoteFactory().getInstance()
self.templates = PatternSelector(self.env["Name"])
self._final_conditions()
self._cib_factory = ConfigFactory(self)
self._cib = self._cib_factory.create_config(self.env["Schema"])
self._cib_sync = {}
def __getitem__(self, key):
if key == "Name":
return self.name
print("FIXME: Getting %s from %r" % (key, self))
if key in self._data:
return self._data[key]
return self.templates.get_patterns(key)
def __setitem__(self, key, value):
print("FIXME: Setting %s=%s on %r" % (key, value, self))
self._data[key] = value
def clear_instance_errors_to_ignore(self):
""" Reset instance-specific errors to ignore on each iteration """
self.__instance_errors_to_ignore = []
@property
def instance_errors_to_ignore(self):
""" Return a list of known errors that should be ignored for a specific
test instance
"""
return self.__instance_errors_to_ignore
@property
def errors_to_ignore(self):
""" Return a list of known error messages that should be ignored """
return self.templates.get_patterns("BadNewsIgnore")
def log(self, args):
""" Log a message """
self._logger.log(args)
def debug(self, args):
""" Log a debug message """
self._logger.debug(args)
def upcount(self):
""" How many nodes are up? """
count = 0
for node in self.env["nodes"]:
if self.expected_status[node] == "up":
count += 1
return count
def install_support(self, command="install"):
""" Install or uninstall the CTS support files - various init scripts and data,
daemons, fencing agents, etc.
"""
for node in self.env["nodes"]:
self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command))
def prepare_fencing_watcher(self):
""" Return a LogWatcher object that watches for fencing log messages """
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
if self.has_quorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonith_pats = []
for peer in self.env["nodes"]:
if self.expected_status[peer] == "up":
continue
- stonith_pats.extend([ self.templates["Pat:Fencing_ok"] % peer,
- self.templates["Pat:Fencing_start"] % peer ])
+ stonith_pats.extend([
+ self.templates["Pat:Fencing_ok"] % peer,
+ self.templates["Pat:Fencing_start"] % peer,
+ ])
stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"],
self.env["LogWatcher"], "StartupFencing", 0)
stonith.set_watch()
return stonith
def fencing_cleanup(self, node, stonith):
""" Wait for a previously fenced node to return to the cluster """
peer_list = []
peer_state = {}
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.has_quorum(None)
if not q and len(self.env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
self.debug("Quorum: %s Len: %d" % (q, len(self.env["nodes"])))
return peer_list
for n in self.env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug("looking for: %r" % stonith.regexes)
shot = stonith.look(0)
while shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
# Extract node name
for n in self.env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self._logger.log("ERROR: Unknown stonith match: %r" % shot)
elif not peer in peer_list:
self.debug("Found peer: %s" % peer)
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
if self.env["at-boot"]:
self.expected_status[peer] = "up"
else:
self.expected_status[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while stonith.regexes and shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
self.ns.wait_for_node(peer, self.env["DeadTime"])
# Poll until it comes up
if self.env["at-boot"]:
if not self.stat_cm(peer):
time.sleep(self.env["StartTime"])
if not self.stat_cm(peer):
self._logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
return None
return peer_list
def start_cm(self, node, verbose=False):
""" Start up the cluster manager on a given node """
if verbose:
self._logger.log("Starting %s on node %s" % (self.templates["Name"], node))
else:
self.debug("Starting %s on node %s" % (self.templates["Name"], node))
if not node in self.expected_status:
self.expected_status[node] = "down"
if self.expected_status[node] != "down":
return True
# Technically we should always be able to notice ourselves starting
- patterns = [ self.templates["Pat:Local_started"] % node ]
+ patterns = [
+ self.templates["Pat:Local_started"] % node,
+ ]
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(self.env["LogFileName"], patterns, self.env["nodes"], self.env["LogWatcher"],
"StartaCM", self.env["StartTime"] + 10)
self.install_config(node)
self.expected_status[node] = "any"
if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
- self._logger.log ("%s was already started" % node)
+ self._logger.log("%s was already started" % node)
return True
stonith = self.prepare_fencing_watcher()
watch.set_watch()
(rc, _) = self.rsh(node, self.templates["StartCmd"])
if rc != 0:
- self._logger.log ("Warn: Start command failed on node %s" % node)
+ self._logger.log("Warn: Start command failed on node %s" % node)
self.fencing_cleanup(node, stonith)
return False
self.expected_status[node] = "up"
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
- self._logger.log ("Warn: Startup pattern not found: %s" % regex)
+ self._logger.log("Warn: Startup pattern not found: %s" % regex)
if watch_result and self.cluster_stable(self.env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
- self._logger.log ("Warn: Start failed for node %s" % node)
+ self._logger.log("Warn: Start failed for node %s" % node)
return False
def start_cm_async(self, node, verbose=False):
""" Start up the cluster manager on a given node without blocking """
if verbose:
self._logger.log("Starting %s on node %s" % (self["Name"], node))
else:
self.debug("Starting %s on node %s" % (self["Name"], node))
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=False)
self.expected_status[node] = "up"
def stop_cm(self, node, verbose=False, force=False):
""" Stop the cluster manager on a given node """
if verbose:
self._logger.log("Stopping %s on node %s" % (self["Name"], node))
else:
self.debug("Stopping %s on node %s" % (self["Name"], node))
if self.expected_status[node] != "up" and not force:
return True
(rc, _) = self.rsh(node, self.templates["StopCmd"])
if rc == 0:
# Make sure we can continue even if corosync leaks
self.expected_status[node] = "down"
self.cluster_stable(self.env["DeadTime"])
return True
- self._logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
+ self._logger.log("ERROR: Could not stop %s on node %s" % (self["Name"], node))
return False
def stop_cm_async(self, node):
""" Stop the cluster manager on a given node without blocking """
self.debug("Stopping %s on node %s" % (self["Name"], node))
self.rsh(node, self.templates["StopCmd"], synchronous=False)
self.expected_status[node] = "down"
def startall(self, nodelist=None, verbose=False, quick=False):
""" Start the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
if not nodelist:
nodelist = self.env["nodes"]
for node in nodelist:
if self.expected_status[node] == "down":
self.ns.wait_for_all_nodes(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
return self.start_cm(nodelist[0], verbose=verbose)
# Approximation of SimulStartList for --boot
- watchpats = [ self.templates["Pat:DC_IDLE"] ]
+ watchpats = [
+ self.templates["Pat:DC_IDLE"],
+ ]
for node in nodelist:
- watchpats.extend([ self.templates["Pat:InfraUp"] % node,
- self.templates["Pat:PacemakerUp"] % node,
- self.templates["Pat:Local_started"] % node,
- self.templates["Pat:They_up"] % (nodelist[0], node) ])
+ watchpats.extend([
+ self.templates["Pat:InfraUp"] % node,
+ self.templates["Pat:PacemakerUp"] % node,
+ self.templates["Pat:Local_started"] % node,
+ self.templates["Pat:They_up"] % (nodelist[0], node),
+ ])
# Start all the nodes - at about the same time...
watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"],
self.env["LogWatcher"], "fast-start", self.env["DeadTime"] + 10)
watch.set_watch()
if not self.start_cm(nodelist[0], verbose=verbose):
return False
for node in nodelist:
self.start_cm_async(node, verbose=verbose)
watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
- self._logger.log ("Warn: Startup pattern not found: %s" % regex)
+ self._logger.log("Warn: Startup pattern not found: %s" % regex)
if not self.cluster_stable():
self._logger.log("Cluster did not stabilize")
return False
return True
def stopall(self, nodelist=None, verbose=False, force=False):
""" Stop the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
ret = True
if not nodelist:
nodelist = self.env["nodes"]
for node in self.env["nodes"]:
if self.expected_status[node] == "up" or force:
if not self.stop_cm(node, verbose=verbose, force=force):
ret = False
return ret
def statall(self, nodelist=None):
""" Return the status of the cluster manager on every node in the cluster,
or on every node in nodelist if not None
"""
result = {}
if not nodelist:
nodelist = self.env["nodes"]
for node in nodelist:
if self.stat_cm(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
""" Break communication between the target node and all other nodes in the
cluster, or nodes if not None
"""
if not nodes:
nodes = self.env["nodes"]
for node in nodes:
if node == target:
continue
(rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node)
if rc != 0:
self._logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
return False
self.debug("Communication cut between %s and %s" % (target, node))
return True
def unisolate_node(self, target, nodes=None):
""" Re-establish communication between the target node and all other nodes
in the cluster, or nodes if not None
"""
if not nodes:
nodes = self.env["nodes"]
for node in nodes:
if node == target:
continue
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False)
self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False)
self.debug("Communication restored between %s and %s" % (target, node))
def oprofile_start(self, node=None):
""" Start profiling on the given node, or all nodes in the cluster """
if not node:
for n in self.env["oprofile"]:
self.oprofile_start(n)
elif node in self.env["oprofile"]:
self.debug("Enabling oprofile on %s" % node)
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
def oprofile_save(self, test, node=None):
""" Save profiling data and restart profiling on the given node, or all
nodes in the cluster if None
"""
if not node:
for n in self.env["oprofile"]:
self.oprofile_save(test, n)
elif node in self.env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, "opcontrol --save=cts.%d" % test)
# Read back with: opreport -l session:cts.0 image:/c*
self.oprofile_stop(node)
self.oprofile_start(node)
def oprofile_stop(self, node=None):
""" Start profiling on the given node, or all nodes in the cluster. This
does not save profiling data, so call oprofile_save first if needed.
"""
if not node:
for n in self.env["oprofile"]:
self.oprofile_stop(n)
elif node in self.env["oprofile"]:
self.debug("Stopping oprofile on %s" % node)
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
def install_config(self, node):
""" Remove and re-install the CIB on the first node in the cluster """
if not self.ns.wait_for_node(node):
self.log("Node %s is not up." % node)
return
if node in self._cib_sync or not self.env["ClobberCIB"]:
return
self._cib_sync[node] = True
self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
# Only install the CIB on the first node, all the other ones will pick it up from there
if self._cib_installed:
return
self._cib_installed = True
if self.env["CIBfilename"] is None:
self.log("Installing Generated CIB on node %s" % node)
self._cib.install(node)
else:
self.log("Installing CIB (%s) on node %s" % (self.env["CIBfilename"], node))
rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
if rc != 0:
raise ValueError("Can not scp file to %s %d" % (node, rc))
self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR))
def prepare(self):
""" Finish initialization by clearing out the expected status and recording
the current status of every node in the cluster
"""
self.partitions_expected = 1
for node in self.env["nodes"]:
self.expected_status[node] = ""
if self.env["experimental-tests"]:
self.unisolate_node(node)
self.stat_cm(node)
def test_node_cm(self, node):
""" Check the status of a given node. Returns 0 if the node is
down, 1 if the node is up but unstable, and 2 if the node is
up and stable
"""
- watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)",
- self.templates["Pat:NonDC_started"] % node,
- self.templates["Pat:DC_started"] % node ]
+ watchpats = [
+ "Current ping state: (S_IDLE|S_NOT_DC)",
+ self.templates["Pat:NonDC_started"] % node,
+ self.templates["Pat:DC_started"] % node,
+ ]
idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node],
self.env["LogWatcher"], "ClusterIdle")
idle_watch.set_watch()
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if not out:
out = ""
else:
out = out[0].strip()
self.debug("Node %s status: '%s'" % (node, out))
if out.find('ok') < 0:
if self.expected_status[node] == "up":
self.log("Node status for %s is %s but we think it should be %s"
% (node, "down", self.expected_status[node]))
self.expected_status[node] = "down"
return 0
if self.expected_status[node] == "down":
self.log("Node status for %s is %s but we think it should be %s: %s"
% (node, "up", self.expected_status[node], out))
self.expected_status[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" % (node, out))
return 1
# Up and stable
return 2
def stat_cm(self, node):
""" Report the status of the cluster manager on a given node """
return self.test_node_cm(node) > 0
# Being up and being stable is not the same question...
def node_stable(self, node):
""" Return whether or not the given node is stable """
if self.test_node_cm(node) == 2:
return True
self.log("Warn: Node %s not stable" % node)
return False
def partition_stable(self, nodes, timeout=None):
""" Return whether or not all nodes in the given partition are stable """
- watchpats = [ "Current ping state: S_IDLE",
- self.templates["Pat:DC_IDLE"] ]
+ watchpats = [
+ "Current ping state: S_IDLE",
+ self.templates["Pat:DC_IDLE"],
+ ]
self.debug("Waiting for cluster stability...")
if timeout is None:
timeout = self.env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return True
idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(),
self.env["LogWatcher"], "ClusterStable", timeout)
idle_watch.set_watch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return True
ret = idle_watch.look()
self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout))
return False
def cluster_stable(self, timeout=None, double_check=False):
""" Return whether or not all nodes in the cluster are stable """
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
if not double_check:
return True
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
return True
def is_node_dc(self, node, status_line=None):
""" Return whether or not the given node is the cluster DC by checking
the given status_line, or by querying the cluster if None
"""
if not status_line:
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if out:
status_line = out[0].strip()
if not status_line:
return False
if status_line.find('S_IDLE') != -1:
return True
if status_line.find('S_INTEGRATION') != -1:
return True
if status_line.find('S_FINALIZE_JOIN') != -1:
return True
if status_line.find('S_POLICY_ENGINE') != -1:
return True
if status_line.find('S_TRANSITION_ENGINE') != -1:
return True
return False
def active_resources(self, node):
""" Return a list of primitive resources active on the given node """
(_, output) = self.rsh(node, "crm_resource -c", verbose=1)
resources = []
for line in output:
if not re.search("^Resource", line):
continue
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
def resource_location(self, rid):
""" Return a list of nodes on which the given resource is running """
resource_nodes = []
for node in self.env["nodes"]:
if self.expected_status[node] != "up":
continue
cmd = self.templates["RscRunning"] % rid
(rc, lines) = self.rsh(node, cmd)
if rc == 127:
self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
for line in lines:
self.log("Output: %s " % line)
elif rc == 0:
resource_nodes.append(node)
return resource_nodes
def find_partitions(self):
""" Return a list of all partitions in the cluster. Each element of the
list is itself a list of all active nodes in that partition.
"""
ccm_partitions = []
for node in self.env["nodes"]:
if self.expected_status[node] != "up":
self.debug("Node %s is down... skipping" % node)
continue
(_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
if not out:
self.log("no partition details for %s" % node)
continue
partition = out[0].strip()
if len(partition) <= 2:
self.log("bad partition details for %s" % node)
continue
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" % (node, partition))
ccm_partitions.append(partition)
else:
self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
self.debug("Found partitions: %r" % ccm_partitions)
return ccm_partitions
def has_quorum(self, node_list):
""" Return whether or not the cluster has quorum """
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.env["nodes"]
for node in node_list:
if self.expected_status[node] != "up":
continue
(_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
quorum = quorum[0].strip()
if quorum.find("1") != -1:
return True
if quorum.find("0") != -1:
return False
self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum))
return False
@property
def components(self):
""" A list of all patterns that should be ignored for the cluster's
components. This must be provided by all subclasses.
"""
raise NotImplementedError
def in_standby_mode(self, node):
""" Return whether or not the node is in Standby """
(_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
if not out:
return False
out = out[0].strip()
self.debug("Standby result: %s" % out)
return out == "on"
def set_standby_mode(self, node, status):
""" Set node to Standby if status is True, or Active if status is False.
Return whether the node is now in the requested status.
"""
current_status = self.in_standby_mode(node)
if current_status == status:
return True
if status:
cmd = self.templates["StandbyCmd"] % (node, "on")
else:
cmd = self.templates["StandbyCmd"] % (node, "off")
(rc, _) = self.rsh(node, cmd)
return rc == 0
def add_dummy_rsc(self, node, rid):
""" Add a dummy resource with the given ID to the given node """
rsc_xml = """ '
'""" % (rid, rid)
constraint_xml = """ '
'
""" % (rid, node, node, rid)
self.rsh(node, self.templates['CibAddXml'] % rsc_xml)
self.rsh(node, self.templates['CibAddXml'] % constraint_xml)
def remove_dummy_rsc(self, node, rid):
""" Remove the previously added dummy resource given by rid on the
given node
"""
constraint = "\"//rsc_location[@rsc='%s']\"" % rid
rsc = "\"//primitive[@id='%s']\"" % rid
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/python/pacemaker/_cts/cmcorosync.py b/python/pacemaker/_cts/cmcorosync.py
index 868ad2c67d..cac059b972 100644
--- a/python/pacemaker/_cts/cmcorosync.py
+++ b/python/pacemaker/_cts/cmcorosync.py
@@ -1,73 +1,80 @@
""" Corosync-specific class for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["Corosync2"]
__copyright__ = "Copyright 2007-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.CTS import Process
from pacemaker._cts.clustermanager import ClusterManager
from pacemaker._cts.patterns import PatternSelector
# Throughout this file, pylint has trouble understanding that EnvFactory
# is a singleton instance that can be treated as a subscriptable object.
# Various warnings are disabled because of this. See also a comment about
# self._rsh in environment.py.
# pylint: disable=unsubscriptable-object
class Corosync2(ClusterManager):
""" A subclass of ClusterManager specialized to handle corosync2 and later
based clusters
"""
def __init__(self):
""" Create a new Corosync2 instance """
ClusterManager.__init__(self)
self._fullcomplist = {}
self.templates = PatternSelector(self.name)
@property
def components(self):
""" A list of all patterns that should be ignored for the cluster's
components.
"""
complist = []
if not self._fullcomplist:
common_ignore = self.templates.get_component("common-ignore")
- for c in [ "pacemaker-based", "pacemaker-controld", "pacemaker-attrd", "pacemaker-execd", "pacemaker-fenced" ]:
+ daemons = [
+ "pacemaker-based",
+ "pacemaker-controld",
+ "pacemaker-attrd",
+ "pacemaker-execd",
+ "pacemaker-fenced"
+ ]
+ for c in daemons:
badnews = self.templates.get_component("%s-ignore" % c) + common_ignore
proc = Process(self, c, pats=self.templates.get_component(c),
badnews_ignore=badnews)
self._fullcomplist[c] = proc
# the scheduler uses dc_pats instead of pats
badnews = self.templates.get_component("pacemaker-schedulerd-ignore") + common_ignore
proc = Process(self, "pacemaker-schedulerd",
dc_pats=self.templates.get_component("pacemaker-schedulerd"),
badnews_ignore=badnews)
self._fullcomplist["pacemaker-schedulerd"] = proc
# add (or replace) extra components
badnews = self.templates.get_component("corosync-ignore") + common_ignore
proc = Process(self, "corosync", pats=self.templates.get_component("corosync"),
badnews_ignore=badnews)
self._fullcomplist["corosync"] = proc
# Processes running under valgrind can't be shot with "killall -9 processname",
# so don't include them in the returned list
vgrind = self.env["valgrind-procs"].split()
for (key, val) in self._fullcomplist.items():
if self.env["valgrind-tests"] and key in vgrind:
self.log("Filtering %s from the component list as it is being profiled by valgrind" % key)
continue
if key == "pacemaker-fenced" and not self.env["DoFencing"]:
continue
complist.append(val)
return complist
diff --git a/python/pacemaker/_cts/remote.py b/python/pacemaker/_cts/remote.py
index d372cb7faa..4b6b8f6d26 100644
--- a/python/pacemaker/_cts/remote.py
+++ b/python/pacemaker/_cts/remote.py
@@ -1,288 +1,288 @@
""" Remote command runner for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["RemoteExec", "RemoteFactory"]
__copyright__ = "Copyright 2014-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import os
-from subprocess import Popen,PIPE
+from subprocess import Popen, PIPE
from threading import Thread
from pacemaker._cts.logging import LogFactory
def convert2string(lines):
""" Convert a byte string to a UTF-8 string, and a list of byte strings to
a list of UTF-8 strings. All other text formats are passed through.
"""
if isinstance(lines, bytes):
return lines.decode("utf-8")
if isinstance(lines, list):
lst = []
for line in lines:
if isinstance(line, bytes):
line = line.decode("utf-8")
lst.append(line)
return lst
return lines
class AsyncCmd(Thread):
""" A class for doing the hard work of running a command on another machine """
def __init__(self, node, command, proc=None, delegate=None):
""" Create a new AsyncCmd instance
Arguments:
node -- The remote machine to run on
command -- The ssh command string to use for remote execution
proc -- If not None, a process object previously created with Popen.
Instead of spawning a new process, we will then wait on
this process to finish and handle its output.
delegate -- When the command completes, call the async_complete method
on this object
"""
self._command = command
self._delegate = delegate
self._logger = LogFactory()
self._node = node
self._proc = proc
Thread.__init__(self)
def run(self):
""" Run the previously instantiated AsyncCmd object """
out = None
err = None
if not self._proc:
# pylint: disable=consider-using-with
self._proc = Popen(self._command, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True)
self._logger.debug("cmd: async: target=%s, pid=%d: %s" % (self._node, self._proc.pid, self._command))
self._proc.wait()
if self._delegate:
self._logger.debug("cmd: pid %d returned %d to %r" % (self._proc.pid, self._proc.returncode, self._delegate))
else:
self._logger.debug("cmd: pid %d returned %d" % (self._proc.pid, self._proc.returncode))
if self._proc.stderr:
err = self._proc.stderr.readlines()
self._proc.stderr.close()
for line in err:
self._logger.debug("cmd: stderr[%d]: %s" % (self._proc.pid, line))
err = convert2string(err)
if self._proc.stdout:
out = self._proc.stdout.readlines()
self._proc.stdout.close()
out = convert2string(out)
if self._delegate:
self._delegate.async_complete(self._proc.pid, self._proc.returncode, out, err)
class RemoteExec:
""" An abstract class for remote execution. It runs a command on another
machine using ssh and scp.
"""
def __init__(self, command, cp_command, silent=False):
""" Create a new RemoteExec instance
Arguments:
command -- The ssh command string to use for remote execution
cp_command -- The scp command string to use for copying files
silent -- Should we log command status?
"""
self._command = command
self._cp_command = cp_command
self._logger = LogFactory()
self._silent = silent
self._our_node = os.uname()[1].lower()
def _fixcmd(self, cmd):
""" Perform shell escapes on certain characters in the input cmd string """
return re.sub("\'", "'\\''", cmd)
def _cmd(self, args):
""" Given a list of arguments, return the string that will be run on the
remote system
"""
sysname = args[0]
command = args[1]
if sysname is None or sysname.lower() in [self._our_node, "localhost"]:
ret = command
else:
ret = "%s %s '%s'" % (self._command, sysname, self._fixcmd(command))
return ret
def _log(self, args):
""" Log a message """
if not self._silent:
self._logger.log(args)
def _debug(self, args):
""" Log a message at the debug level """
if not self._silent:
self._logger.debug(args)
def call_async(self, node, command, delegate=None):
""" Run the given command on the given remote system and do not wait for
it to complete.
Arguments:
node -- The remote machine to run on
command -- The command to run, as a string
delegate -- When the command completes, call the async_complete method
on this object
Returns:
The running process object
"""
aproc = AsyncCmd(node, self._cmd([node, command]), delegate=delegate)
aproc.start()
return aproc
def __call__(self, node, command, synchronous=True, verbose=2):
""" Run the given command on the given remote system. If you call this class
like a function, this is what gets called. It's approximately the same
as a system() call on the remote machine.
Arguments:
node -- The remote machine to run on
command -- The command to run, as a string
synchronous -- Should we wait for the command to complete?
verbose -- If 0, do not lo:g anything. If 1, log the command and its
return code but not its output. If 2, additionally log
command output.
Returns:
A tuple of (return code, command output)
"""
rc = 0
result = None
# pylint: disable=consider-using-with
proc = Popen(self._cmd([node, command]),
- stdout = PIPE, stderr = PIPE, close_fds = True, shell = True)
+ stdout=PIPE, stderr=PIPE, close_fds=True, shell=True)
if not synchronous and proc.pid > 0 and not self._silent:
aproc = AsyncCmd(node, command, proc=proc)
aproc.start()
return (rc, result)
if proc.stdout:
result = proc.stdout.readlines()
proc.stdout.close()
else:
self._log("No stdout stream")
rc = proc.wait()
if verbose > 0:
self._debug("cmd: target=%s, rc=%d: %s" % (node, rc, command))
result = convert2string(result)
if proc.stderr:
errors = proc.stderr.readlines()
proc.stderr.close()
for err in errors:
self._debug("cmd: stderr: %s" % err)
if verbose == 2:
for line in result:
self._debug("cmd: stdout: %s" % line)
return (rc, result)
def copy(self, source, target, silent=False):
""" Perform a copy of the source file to the remote target, using the
cp_command provided when the RemoteExec object was created.
Returns:
The return code of the cp_command
"""
cmd = "%s '%s' '%s'" % (self._cp_command, source, target)
rc = os.system(cmd)
if not silent:
self._debug("cmd: rc=%d: %s" % (rc, cmd))
return rc
def exists_on_all(self, filename, hosts):
""" Return True if specified file exists on all specified hosts. """
for host in hosts:
rc = self(host, "test -r %s" % filename)
if rc != 0:
return False
return True
class RemoteFactory:
""" A class for constructing a singleton instance of a RemoteExec object """
# Class variables
# -n: no stdin, -x: no X11,
# -o ServerAliveInterval=5: disconnect after 3*5s if the server
# stops responding
command = ("ssh -l root -n -x -o ServerAliveInterval=5 "
"-o ConnectTimeout=10 -o TCPKeepAlive=yes "
"-o ServerAliveCountMax=3 ")
# -B: batch mode, -q: no stats (quiet)
cp_command = "scp -B -q"
instance = None
# pylint: disable=invalid-name
def getInstance(self):
""" Returns the previously created instance of RemoteExec, or creates a
new instance if one does not already exist.
"""
if not RemoteFactory.instance:
RemoteFactory.instance = RemoteExec(RemoteFactory.command,
RemoteFactory.cp_command,
False)
return RemoteFactory.instance
def enable_qarsh(self):
""" Enable the QA remote shell """
# http://nstraz.wordpress.com/2008/12/03/introducing-qarsh/
print("Using QARSH for connections to cluster nodes")
RemoteFactory.command = "qarsh -t 300 -l root"
RemoteFactory.cp_command = "qacp -q"
diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py
index 6c3de3ab40..bfc2839304 100644
--- a/python/pacemaker/_cts/scenarios.py
+++ b/python/pacemaker/_cts/scenarios.py
@@ -1,408 +1,425 @@
""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """
-__all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence" ]
+__all__ = [
+ "AllOnce",
+ "Boot",
+ "BootCluster",
+ "LeaveBooted",
+ "RandomTests",
+ "Sequence",
+]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker._cts.audits import ClusterAudit
from pacemaker._cts.input import should_continue
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.watcher import LogWatcher
class ScenarioComponent:
""" The base class for all scenario components. A scenario component is
one single step in a scenario. Each component is basically just a setup
and teardown method.
"""
def __init__(self, cm, env):
""" Create a new ScenarioComponent instance
Arguments:
cm -- A ClusterManager instance
env -- An Environment instance
"""
# pylint: disable=invalid-name
self._cm = cm
self._env = env
def is_applicable(self):
""" Return True if this component is applicable in the given Environment.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def setup(self):
""" Set up the component, returning True on success. This method must be
provided by all subclasses.
"""
raise NotImplementedError
def teardown(self):
""" Tear down the given component. This method must be provided by all
subclasses.
"""
raise NotImplementedError
class Scenario:
""" The base class for scenario. A scenario is an ordered list of
ScenarioComponent objects. A scenario proceeds by setting up all its
components in sequence, running a list of tests and audits, and then
tearing down its components in reverse.
"""
def __init__(self, cm, components, audits, tests):
""" Create a new Scenario instance
Arguments:
cm -- A ClusterManager instance
components -- A list of ScenarioComponents comprising this Scenario
audits -- A list of ClusterAudits that will be performed as
part of this Scenario
tests -- A list of CTSTests that will be run
"""
# pylint: disable=invalid-name
- self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 }
+ self.stats = {
+ "success": 0,
+ "failure": 0,
+ "BadNews": 0,
+ "skipped": 0
+ }
self.tests = tests
- self._audits = audits
+ self._audits = audits
self._bad_news = None
self._cm = cm
self._components = components
for comp in components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def is_applicable(self):
""" Return True if all ScenarioComponents are applicable """
for comp in self._components:
if not comp.is_applicable():
return False
return True
def setup(self):
""" Set up the scenario, returning True on success. If setup fails at
some point, tear down those components that did successfully set up.
"""
self._cm.prepare()
self.audit() # Also detects remote/local log config
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
self.audit()
self._cm.install_support()
self._bad_news = LogWatcher(self._cm.env["LogFileName"],
self._cm.templates.get_patterns("BadNews"),
self._cm.env["nodes"],
self._cm.env["LogWatcher"],
"BadNews", 0)
self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self._components):
if not self._components[j].setup():
# OOPS! We failed. Tear partial setups down.
self.audit()
self._cm.log("Tearing down partial setup")
self.teardown(j)
return False
j += 1
self.audit()
return True
def teardown(self, n_components=None):
""" Tear down the scenario in the reverse order it was set up. If
n_components is not None, only tear down that many components.
"""
if not n_components:
n_components = len(self._components)-1
j = n_components
while j >= 0:
self._components[j].teardown()
j -= 1
self.audit()
self._cm.install_support("uninstall")
def incr(self, name):
""" Increment the given stats key """
if not name in self.stats:
self.stats[name] = 0
self.stats[name] += 1
def run(self, iterations):
""" Run all tests in the scenario the given number of times """
self._cm.oprofile_start()
try:
self._run_loop(iterations)
self._cm.oprofile_stop()
except:
self._cm.oprofile_stop()
raise
def _run_loop(self, iterations):
""" Do the hard part of the run method - actually run all the tests the
given number of times.
"""
raise NotImplementedError
def run_test(self, test, testcount):
""" Run the given test. testcount is the number of tests (including
this one) that have been run across all iterations.
"""
nodechoice = self._cm.env.random_node()
ret = True
did_run = False
self._cm.clear_instance_errors_to_ignore()
choice = "(%s)" % nodechoice
self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount))
starttime = test.set_timer()
if not test.setup(nodechoice):
self._cm.log("Setup failed")
ret = False
elif not test.can_run_now(nodechoice):
self._cm.log("Skipped")
test.skipped()
else:
did_run = True
ret = test(nodechoice)
if not test.teardown(nodechoice):
self._cm.log("Teardown failed")
if not should_continue(self._cm.env):
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = False
stoptime = time.time()
self._cm.oprofile_save(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if "min_time" not in test.stats:
test.stats["elapsed_time"] = elapsed_time
test.stats["min_time"] = test_time
test.stats["max_time"] = test_time
else:
test.stats["elapsed_time"] += elapsed_time
if test_time < test.stats["min_time"]:
test.stats["min_time"] = test_time
if test_time > test.stats["max_time"]:
test.stats["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self._cm.statall()
did_run = True # Force the test count to be incremented anyway so test extraction works
self.audit(test.errors_to_ignore)
return did_run
def summarize(self):
""" Output scenario results """
self._cm.log("****************")
self._cm.log("Overall Results:%r" % self.stats)
self._cm.log("****************")
- stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0 }
+ stat_filter = {
+ "calls": 0,
+ "failure": 0,
+ "skipped": 0,
+ "auditfail": 0,
+ }
self._cm.log("Test Summary")
for test in self.tests:
for key in stat_filter:
stat_filter[key] = test.stats[key]
name = "Test %s:" % test.name
self._cm.log("{:<25} {!r}".format(name, stat_filter))
self._cm.debug("Detailed Results")
for test in self.tests:
name = "Test %s:" % test.name
self._cm.debug("{:<25} {!r}".format(name, stat_filter))
self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, local_ignore=None):
""" Perform all scenario audits and log results. If there are too many
failures, prompt the user to confirm that the scenario should continue
running.
"""
errcount = 0
ignorelist = ["CTS:"]
if local_ignore:
ignorelist.extend(local_ignore)
ignorelist.extend(self._cm.errors_to_ignore)
ignorelist.extend(self._cm.instance_errors_to_ignore)
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self._audits:
if not audit():
self._cm.log("Audit %s FAILED." % audit.name)
failed += 1
else:
self._cm.debug("Audit %s passed." % audit.name)
while errcount < 1000:
match = None
if self._bad_news:
match = self._bad_news.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._cm.log("BadNews: %s" % match)
self.incr("BadNews")
errcount += 1
else:
break
else:
print("Big problems")
if not should_continue(self._cm.env):
self._cm.log("Shutting down.")
self.summarize()
self.teardown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self._bad_news:
self._bad_news.end()
return failed
class AllOnce(Scenario):
""" Every Test Once """
def _run_loop(self, iterations):
testcount = 1
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
""" Random Test Execution """
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
test = self._cm.env.random_gen.choice(self.tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
""" Named Tests in Sequence """
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
""" Start the Cluster """
def _run_loop(self, iterations):
return
class BootCluster(ScenarioComponent):
""" The BootCluster component simply starts the cluster manager on all
nodes, waiting for each to come up before starting given that a node
might have been rebooted or crashed beforehand.
"""
def is_applicable(self):
""" BootCluster is always applicable """
return True
def setup(self):
""" Set up the component, returning True on success """
self._cm.prepare()
# Clear out the cobwebs ;-)
self._cm.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
self._cm.log("Starting Cluster Manager on all nodes.")
return self._cm.startall(verbose=True, quick=True)
def teardown(self):
""" Tear down the component """
self._cm.log("Stopping Cluster Manager on all nodes")
self._cm.stopall(verbose=True, force=False)
class LeaveBooted(BootCluster):
""" The LeaveBooted component leaves all nodes up when the scenario
is complete.
"""
def teardown(self):
""" Tear down the component """
self._cm.log("Leaving Cluster running on all nodes")
diff --git a/python/pacemaker/_cts/test.py b/python/pacemaker/_cts/test.py
index b2a90a6e08..577ebb3eb0 100644
--- a/python/pacemaker/_cts/test.py
+++ b/python/pacemaker/_cts/test.py
@@ -1,601 +1,603 @@
""" A module providing base classes for defining regression tests and groups of
regression tests. Everything exported here should be considered an abstract
class that needs to be subclassed in order to do anything useful. Various
functions will raise NotImplementedError if not overridden by a subclass.
"""
__copyright__ = "Copyright 2009-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+)"
__all__ = ["Test", "Tests"]
import io
import os
import re
import shlex
import signal
import subprocess
import sys
import time
from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError
from pacemaker._cts.process import pipe_communicate
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
def find_validator(rng_file):
""" Return the command line used to validate XML output, or None if the validator
is not installed.
"""
if os.access("/usr/bin/xmllint", os.X_OK):
if rng_file is None:
return ["xmllint", "-"]
return ["xmllint", "--relaxng", rng_file, "-"]
return None
def rng_directory():
""" Which directory contains the RNG schema files? """
if "PCMK_schema_directory" in os.environ:
return os.environ["PCMK_schema_directory"]
if os.path.exists("%s/cts-fencing.in" % sys.path[0]):
return "xml"
return BuildOptions.SCHEMA_DIR
class Pattern:
""" A class for checking log files for a given pattern """
def __init__(self, pat, negative=False, regex=False):
""" Create a new Pattern instance
Arguments:
pat -- The string to search for
negative -- If True, pat must not be found in any input
regex -- If True, pat is a regex and not a substring
"""
self._pat = pat
self.negative = negative
self.regex = regex
def __str__(self):
return self._pat
def match(self, line):
""" Is this pattern found in the given line? """
if self.regex:
return re.search(self._pat, line) is not None
return self._pat in line
class Test:
""" The base class for a single regression test. A single regression test
may still run multiple commands as part of its execution.
"""
def __init__(self, name, description, **kwargs):
""" Create a new Test instance. This method must be provided by all
subclasses, which must call Test.__init__ first.
Arguments:
description -- A user-readable description of the test, helpful in
identifying what test is running or has failed.
name -- The name of the test. Command line tools use this
attribute to allow running only tests with the exact
name, or tests whose name matches a given pattern.
This should be unique among all tests.
Keyword arguments:
force_wait --
logdir -- The base directory under which to create a directory
to store output and temporary data.
timeout -- How long to wait for the test to complete.
verbose -- Whether to print additional information, including
verbose command output and daemon log files.
"""
self.description = description
self.executed = False
self.name = name
self.force_wait = kwargs.get("force_wait", False)
self.logdir = kwargs.get("logdir", "/tmp")
self.timeout = kwargs.get("timeout", 2)
self.verbose = kwargs.get("verbose", False)
self._cmds = []
self._patterns = []
self._daemon_location = None
self._daemon_output = ""
self._daemon_process = None
self._result_exitcode = ExitStatus.OK
self._result_txt = ""
###
### PROPERTIES
###
@property
def exitcode(self):
""" The final exitcode of the Test. If all commands pass, this property
will be ExitStatus.OK. Otherwise, this property will be the exitcode
of the first command to fail.
"""
return self._result_exitcode
@exitcode.setter
def exitcode(self, value):
self._result_exitcode = value
@property
def logpath(self):
""" The path to the log for whatever daemon is being tested. Note that
this requires all subclasses to set self._daemon_location before
accessing this property or an exception will be raised.
"""
return os.path.join(self.logdir, "%s.log" % self._daemon_location)
###
### PRIVATE METHODS
###
def _kill_daemons(self):
""" Kill any running daemons in preparation for executing the test """
raise NotImplementedError("_kill_daemons not provided by subclass")
def _match_log_patterns(self):
""" Check test output for expected patterns, setting self.exitcode and
self._result_txt as appropriate. Not all subclass will need to do
this.
"""
if len(self._patterns) == 0:
return
n_failed_matches = 0
n_negative_matches = 0
output = self._daemon_output.split("\n")
for pat in self._patterns:
positive_match = False
for line in output:
if pat.match(line):
if pat.negative:
n_negative_matches += 1
if self.verbose:
print("This pattern should not have matched = '%s" % pat)
break
positive_match = True
break
if not pat.negative and not positive_match:
n_failed_matches += 1
print("Pattern Not Matched = '%s'" % pat)
if n_failed_matches > 0 or n_negative_matches > 0:
msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches."
self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches)
self.exitcode = ExitStatus.ERROR
def _new_cmd(self, cmd, args, exitcode, **kwargs):
""" Add a command to be executed as part of this test.
Arguments:
cmd -- The program to run.
args -- Commands line arguments to pass to cmd, as a string.
exitcode -- The expected exit code of cmd. This can be used to
run a command that is expected to fail.
Keyword arguments:
stdout_match -- If not None, a string that is expected to be
present in the stdout of cmd. This can be a
regular expression.
no_wait -- Do not wait for cmd to complete.
stdout_negative_match -- If not None, a string that is expected to be
missing in the stdout of cmd. This can be a
regualr expression.
kill -- A command to be run after cmd, typically in
order to kill a failed process. This should be
the entire command line including arguments as
a single string.
validate -- If True, the output of cmd will be passed to
xmllint for validation. If validation fails,
XmlValidationError will be raised.
check_rng -- If True and validate is True, command output
will additionally be checked against the
api-result.rng file.
check_stderr -- If True, the stderr of cmd will be included in
output.
env -- If not None, variables to set in the environment
"""
self._cmds.append(
{
"args": args,
"check_rng": kwargs.get("check_rng", True),
"check_stderr": kwargs.get("check_stderr", True),
"cmd": cmd,
"expected_exitcode": exitcode,
"kill": kwargs.get("kill", None),
"no_wait": kwargs.get("no_wait", False),
"stdout_match": kwargs.get("stdout_match", None),
"stdout_negative_match": kwargs.get("stdout_negative_match", None),
"validate": kwargs.get("validate", True),
"env": kwargs.get("env", None),
}
)
def _start_daemons(self):
""" Start any necessary daemons in preparation for executing the test """
raise NotImplementedError("_start_daemons not provided by subclass")
###
### PUBLIC METHODS
###
def add_cmd(self, cmd, args, validate=True, check_rng=True, check_stderr=True,
env=None):
""" Add a simple command to be executed as part of this test """
self._new_cmd(cmd, args, ExitStatus.OK, validate=validate, check_rng=check_rng,
check_stderr=check_stderr, env=env)
def add_cmd_and_kill(self, cmd, args, kill_proc):
""" Add a command and system command to be executed as part of this test """
self._new_cmd(cmd, args, ExitStatus.OK, kill=kill_proc)
def add_cmd_check_stdout(self, cmd, args, match, no_match=None, env=None):
""" Add a simple command with expected output to be executed as part of this test """
self._new_cmd(cmd, args, ExitStatus.OK, stdout_match=match,
stdout_negative_match=no_match, env=env)
def add_cmd_expected_fail(self, cmd, args, exitcode=ExitStatus.ERROR):
""" Add a command that is expected to fail to be executed as part of this test """
self._new_cmd(cmd, args, exitcode)
def add_cmd_no_wait(self, cmd, args):
""" Add a simple command to be executed (without waiting) as part of this test """
self._new_cmd(cmd, args, ExitStatus.OK, no_wait=True)
def add_log_pattern(self, pattern, negative=False, regex=False):
""" Add a pattern that should appear in the test's logs """
self._patterns.append(Pattern(pattern, negative=negative, regex=regex))
def _signal_dict(self):
""" Return a dictionary mapping signal numbers to their names """
# FIXME: When we support python >= 3.5, this function can be replaced with:
# signal.Signals(self.daemon_process.returncode).name
- return { getattr(signal, _signame): _signame
- for _signame in dir(signal)
- if _signame.startswith("SIG") and not _signame.startswith("SIG_") }
+ return {
+ getattr(signal, _signame): _signame
+ for _signame in dir(signal)
+ if _signame.startswith("SIG") and not _signame.startswith("SIG_")
+ }
def clean_environment(self):
""" Clean up the host after executing a test """
if self._daemon_process:
if self._daemon_process.poll() is None:
self._daemon_process.terminate()
self._daemon_process.wait()
else:
rc = self._daemon_process.returncode
signame = self._signal_dict().get(-rc, "RET=%s" % rc)
msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)."
self._result_txt = msg % (self.name, self._daemon_location, signame)
self.exitcode = ExitStatus.ERROR
self._daemon_process = None
self._daemon_output = ""
# the default for utf-8 encoding would error out if e.g. memory corruption
# makes fenced output any kind of 8 bit value - while still interesting
# for debugging and we'd still like the regression-test to go over the
# full set of test-cases
- with open(self.logpath, 'rt', encoding = "ISO-8859-1") as logfile:
+ with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile:
for line in logfile.readlines():
self._daemon_output += line
if self.verbose:
print("Daemon Output Start")
print(self._daemon_output)
print("Daemon Output End")
def print_result(self, filler):
""" Print the result of the last test execution """
print("%s%s" % (filler, self._result_txt))
def run(self):
""" Execute this test """
i = 1
self.start_environment()
if self.verbose:
print("\n--- START TEST - %s" % self.name)
self._result_txt = "SUCCESS - '%s'" % (self.name)
self.exitcode = ExitStatus.OK
for cmd in self._cmds:
try:
self.run_cmd(cmd)
except ExitCodeError as e:
print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode']))
self.set_error(i, cmd)
break
except OutputNotFoundError as e:
print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e))
self.set_error(i, cmd)
break
except OutputFoundError as e:
print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e))
self.set_error(i, cmd)
break
except XmlValidationError as e:
print("Step %d FAILED - xmllint failed: %s" % (i, e))
self.set_error(i, cmd)
break
if self.verbose:
print("Step %d SUCCESS" % (i))
i += 1
self.clean_environment()
if self.exitcode == ExitStatus.OK:
self._match_log_patterns()
print(self._result_txt)
if self.verbose:
print("--- END TEST - %s\n" % self.name)
self.executed = True
def run_cmd(self, args):
""" Execute a command as part of this test """
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print("\n\nRunning: %s" % " ".join(cmd))
# FIXME: Using "with" here breaks fencing merge tests.
# pylint: disable=consider-using-with
if args['env']:
new_env = os.environ.copy()
new_env.update(args['env'])
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=new_env)
else:
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print("Also running: %s" % args['kill'])
### Typically, the kill argument is used to detect some sort of
### failure. Without yielding for a few seconds here, the process
### launched earlier that is listening for the failure may not have
### time to connect to pacemaker-execd.
time.sleep(2)
subprocess.Popen(shlex.split(args['kill']))
if not args['no_wait']:
test.wait()
else:
return ExitStatus.OK
output = pipe_communicate(test, check_stderr=args['check_stderr'])
if self.verbose:
print(output)
if test.returncode != args['expected_exitcode']:
raise ExitCodeError(test.returncode)
if args['stdout_match'] is not None and \
re.search(args['stdout_match'], output) is None:
raise OutputNotFoundError(output)
if args['stdout_negative_match'] is not None and \
re.search(args['stdout_negative_match'], output) is not None:
raise OutputFoundError(output)
if args['validate']:
if args['check_rng']:
rng_file = "%s/api/api-result.rng" % rng_directory()
else:
rng_file = None
cmd = find_validator(rng_file)
if not cmd:
raise XmlValidationError("Could not find validator for %s" % rng_file)
if self.verbose:
print("\nRunning: %s" % " ".join(cmd))
with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator:
output = pipe_communicate(validator, check_stderr=True, stdin=output)
if self.verbose:
print(output)
if validator.returncode != 0:
raise XmlValidationError(output)
return ExitStatus.OK
def set_error(self, step, cmd):
""" Record failure of this test """
msg = "FAILURE - '%s' failed at step %d. Command: %s %s"
self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args'])
self.exitcode = ExitStatus.ERROR
def start_environment(self):
""" Prepare the host for executing a test """
if os.path.exists(self.logpath):
os.remove(self.logpath)
self._kill_daemons()
self._start_daemons()
logfile = None
init_time = time.time()
update_time = init_time
while True:
# FIXME: Eventually use 'with' here, which seems complicated given
# everything happens in a loop.
# pylint: disable=consider-using-with
time.sleep(0.1)
if not self.force_wait and logfile is None \
and os.path.exists(self.logpath):
- logfile = io.open(self.logpath, 'rt', encoding = "ISO-8859-1")
+ logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1")
if not self.force_wait and logfile is not None:
for line in logfile.readlines():
if "successfully started" in line:
return
now = time.time()
if self.timeout > 0 and (now - init_time) >= self.timeout:
if not self.force_wait:
print("\tDaemon %s doesn't seem to have been initialized within %fs."
"\n\tConsider specifying a longer '--timeout' value."
%(self._daemon_location, self.timeout))
return
if self.verbose and (now - update_time) >= 5:
print("Waiting for %s to be initialized: %fs ..."
%(self._daemon_location, now - init_time))
update_time = now
class Tests:
""" The base class for a collection of regression tests """
def __init__(self, **kwargs):
""" Create a new Tests instance. This method must be provided by all
subclasses, which must call Tests.__init__ first.
Keywork arguments:
force_wait --
logdir -- The base directory under which to create a directory
to store output and temporary data.
timeout -- How long to wait for the test to complete.
verbose -- Whether to print additional information, including
verbose command output and daemon log files.
"""
self.force_wait = kwargs.get("force_wait", False)
self.logdir = kwargs.get("logdir", "/tmp")
self.timeout = kwargs.get("timeout", 2)
self.verbose = kwargs.get("verbose", False)
self._tests = []
def exit(self):
""" Exit (with error status code if any test failed) """
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
sys.exit(ExitStatus.ERROR)
sys.exit(ExitStatus.OK)
def print_list(self):
""" List all registered tests """
print("\n==== %d TESTS FOUND ====" % len(self._tests))
print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION"))
print("%35s - %s" % ("--------------------", "--------------------"))
for test in self._tests:
print("%35s - %s" % (test.name, test.description))
print("==== END OF LIST ====\n")
def print_results(self):
""" Print summary of results of executed tests """
failures = 0
success = 0
print("\n\n======= FINAL RESULTS ==========")
print("\n--- FAILURE RESULTS:")
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
failures += 1
test.print_result(" ")
else:
success += 1
if failures == 0:
print(" None")
print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures))
def run_single(self, name):
""" Run a single named test """
for test in self._tests:
if test.name == name:
test.run()
break
def run_tests(self):
""" Run all tests """
for test in self._tests:
test.run()
def run_tests_matching(self, pattern):
""" Run all tests whose name matches a pattern """
for test in self._tests:
if test.name.count(pattern) != 0:
test.run()
diff --git a/python/pacemaker/_cts/tests/__init__.py b/python/pacemaker/_cts/tests/__init__.py
index 9703401b94..63b34aad2e 100644
--- a/python/pacemaker/_cts/tests/__init__.py
+++ b/python/pacemaker/_cts/tests/__init__.py
@@ -1,86 +1,87 @@
"""
Test classes for the `pacemaker._cts` package.
"""
__copyright__ = "Copyright 2023 the Pacemaker project contributors"
__license__ = "GNU Lesser General Public License version 2.1 or later (LGPLv2.1+)"
from pacemaker._cts.tests.componentfail import ComponentFail
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.fliptest import FlipTest
from pacemaker._cts.tests.maintenancemode import MaintenanceMode
from pacemaker._cts.tests.nearquorumpointtest import NearQuorumPointTest
from pacemaker._cts.tests.partialstart import PartialStart
from pacemaker._cts.tests.reattach import Reattach
from pacemaker._cts.tests.restartonebyone import RestartOnebyOne
from pacemaker._cts.tests.resourcerecover import ResourceRecover
from pacemaker._cts.tests.restarttest import RestartTest
from pacemaker._cts.tests.resynccib import ResyncCIB
from pacemaker._cts.tests.remotebasic import RemoteBasic
from pacemaker._cts.tests.remotedriver import RemoteDriver
from pacemaker._cts.tests.remotemigrate import RemoteMigrate
from pacemaker._cts.tests.remoterscfailure import RemoteRscFailure
from pacemaker._cts.tests.remotestonithd import RemoteStonithd
from pacemaker._cts.tests.simulstart import SimulStart
from pacemaker._cts.tests.simulstop import SimulStop
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.simulstoplite import SimulStopLite
from pacemaker._cts.tests.splitbraintest import SplitBrainTest
from pacemaker._cts.tests.standbytest import StandbyTest
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.tests.startonebyone import StartOnebyOne
from pacemaker._cts.tests.stonithdtest import StonithdTest
from pacemaker._cts.tests.stoponebyone import StopOnebyOne
from pacemaker._cts.tests.stoptest import StopTest
def test_list(cm, audits):
""" Return a list of test class objects that are enabled and whose
is_applicable methods return True. These are the tests that
should be run.
"""
# cm is a reasonable name here.
# pylint: disable=invalid-name
# A list of all enabled test classes, in the order that they should
# be run (if we're doing --once). There are various other ways of
# specifying which tests should be run, in which case the order here
# will not matter.
#
# Note that just because a test is listed here doesn't mean it will
# definitely be run - is_applicable is still taken into consideration.
# Also note that there are other tests that are excluded from this
# list for various reasons.
- enabled_test_classes = [ FlipTest,
- RestartTest,
- StonithdTest,
- StartOnebyOne,
- SimulStart,
- SimulStop,
- StopOnebyOne,
- RestartOnebyOne,
- PartialStart,
- StandbyTest,
- MaintenanceMode,
- ResourceRecover,
- ComponentFail,
- SplitBrainTest,
- Reattach,
- ResyncCIB,
- NearQuorumPointTest,
- RemoteBasic,
- RemoteStonithd,
- RemoteMigrate,
- RemoteRscFailure,
- ]
+ enabled_test_classes = [
+ FlipTest,
+ RestartTest,
+ StonithdTest,
+ StartOnebyOne,
+ SimulStart,
+ SimulStop,
+ StopOnebyOne,
+ RestartOnebyOne,
+ PartialStart,
+ StandbyTest,
+ MaintenanceMode,
+ ResourceRecover,
+ ComponentFail,
+ SplitBrainTest,
+ Reattach,
+ ResyncCIB,
+ NearQuorumPointTest,
+ RemoteBasic,
+ RemoteStonithd,
+ RemoteMigrate,
+ RemoteRscFailure,
+ ]
result = []
for testclass in enabled_test_classes:
bound_test = testclass(cm)
if bound_test.is_applicable():
bound_test.audits = audits
result.append(bound_test)
return result
diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py
index fba883dc7a..f3d3622879 100644
--- a/python/pacemaker/_cts/tests/componentfail.py
+++ b/python/pacemaker/_cts/tests/componentfail.py
@@ -1,159 +1,167 @@
""" Kill a pacemaker daemon and test how the cluster recovers """
__all__ = ["ComponentFail"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class ComponentFail(CTSTest):
""" A concrete test that kills a random pacemaker daemon and waits for the
cluster to recover
"""
def __init__(self, cm):
""" Create a new ComponentFail instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.is_unsafe = True
self.name = "ComponentFail"
self._complist = cm.components
self._okerrpatterns = []
self._patterns = []
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
self._patterns = []
self._okerrpatterns = []
# start all nodes
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
if not self._cm.cluster_stable(self._env["StableTime"]):
return self.failure("Setup failed - unstable")
node_is_dc = self._cm.is_node_dc(node, None)
# select a component to kill
chosen = self._env.random_gen.choice(self._complist)
while chosen.dc_only and not node_is_dc:
chosen = self._env.random_gen.choice(self._complist)
self.debug("...component %s (dc=%s)" % (chosen.name, node_is_dc))
self.incr(chosen.name)
if chosen.name != "corosync":
- self._patterns.extend([ self.templates["Pat:ChildKilled"] % (node, chosen.name),
- self.templates["Pat:ChildRespawn"] % (node, chosen.name) ])
+ self._patterns.extend([
+ self.templates["Pat:ChildKilled"] % (node, chosen.name),
+ self.templates["Pat:ChildRespawn"] % (node, chosen.name),
+ ])
self._patterns.extend(chosen.pats)
if node_is_dc:
self._patterns.extend(chosen.dc_pats)
# @TODO this should be a flag in the Component
- if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]:
+ if chosen.name in ["corosync", "pacemaker-based", "pacemaker-fenced"]:
# Ignore actions for fence devices if fencer will respawn
# (their registration will be lost, and probes will fail)
- self._okerrpatterns = [ self.templates["Pat:Fencing_active"] ]
+ self._okerrpatterns = [
+ self.templates["Pat:Fencing_active"],
+ ]
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
- self._okerrpatterns.extend([ self.templates["Pat:Fencing_recover"] % r.id,
- self.templates["Pat:Fencing_probe"] % r.id ])
+ self._okerrpatterns.extend([
+ self.templates["Pat:Fencing_recover"] % r.id,
+ self.templates["Pat:Fencing_probe"] % r.id,
+ ])
# supply a copy so self.patterns doesn't end up empty
tmp_pats = self._patterns.copy()
self._patterns.extend(chosen.badnews_ignore)
# Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
- stonith_pats = [ self.templates["Pat:Fencing_ok"] % node ]
+ stonith_pats = [
+ self.templates["Pat:Fencing_ok"] % node
+ ]
stonith = self.create_watch(stonith_pats, 0)
stonith.set_watch()
# set the watch for stable
watch = self.create_watch(
tmp_pats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
# kill the component
chosen.kill(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for any fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
self._cm.cluster_stable(self._env["StartTime"])
self.debug("Checking if %s was shot" % node)
shot = stonith.look(60)
if shot:
self.debug("Found: %r" % shot)
self._okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
if not self._env["at-boot"]:
self._cm.expected_status[node] = "down"
# If fencing occurred, chances are many (if not all) the expected logs
# will not be sent - or will be lost when the node reboots
return self.success()
# check for logs indicating a graceful recovery
matched = watch.look_for_all(allow_multiple_matches=True)
if watch.unmatched:
self._logger.log("Patterns not found: %r" % watch.unmatched)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected %s patterns" % chosen.name)
if not is_stable:
return self.failure("Cluster did not become stable after killing %s" % chosen.name)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Note that okerrpatterns refers to the last time we ran this test
# The good news is that this works fine for us...
self._okerrpatterns.extend(self._patterns)
return self._okerrpatterns
diff --git a/python/pacemaker/_cts/tests/ctstest.py b/python/pacemaker/_cts/tests/ctstest.py
index 54e07cc5ef..fd63b914a1 100644
--- a/python/pacemaker/_cts/tests/ctstest.py
+++ b/python/pacemaker/_cts/tests/ctstest.py
@@ -1,259 +1,261 @@
""" Base classes for CTS tests """
__all__ = ["CTSTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.audits import AuditConstraint, AuditResource
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.timer import Timer
from pacemaker._cts.watcher import LogWatcher
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class CTSTest:
""" The base class for all cluster tests. This implements a basic set of
properties and behaviors like setup, tear down, time keeping, and
statistics tracking. It is up to specific tests to implement their own
specialized behavior on top of this class.
"""
def __init__(self, cm):
""" Create a new CTSTest instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self.audits = []
self.name = None
self.templates = PatternSelector(cm["Name"])
- self.stats = { "auditfail": 0,
- "calls": 0,
- "failure": 0,
- "skipped": 0,
- "success": 0 }
+ self.stats = {
+ "auditfail": 0,
+ "calls": 0,
+ "failure": 0,
+ "skipped": 0,
+ "success": 0
+ }
self._cm = cm
self._env = EnvFactory().getInstance()
self._r_o2cb = None
self._r_ocfs2 = []
self._rsh = RemoteFactory().getInstance()
self._logger = LogFactory()
self._timers = {}
self.benchmark = True # which tests to benchmark
self.failed = False
self.is_experimental = False
self.is_loop = False
self.is_unsafe = False
self.is_valgrind = False
self.passed = True
def log(self, args):
""" Log a message """
self._logger.log(args)
def debug(self, args):
""" Log a debug message """
self._logger.debug(args)
def get_timer(self, key="test"):
""" Get the start time of the given timer """
try:
return self._timers[key].start_time
except KeyError:
return 0
def set_timer(self, key="test"):
""" Set the start time of the given timer to now, and return
that time
"""
if key not in self._timers:
self._timers[key] = Timer(self._logger, self.name, key)
self._timers[key].start()
return self._timers[key].start_time
def log_timer(self, key="test"):
""" Log the elapsed time of the given timer """
if key not in self._timers:
return
elapsed = self._timers[key].elapsed
self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed))
del self._timers[key]
def incr(self, name):
""" Increment the given stats key """
if name not in self.stats:
self.stats[name] = 0
self.stats[name] += 1
# Reset the test passed boolean
if name == "calls":
self.passed = True
def failure(self, reason="none"):
""" Increment the failure count, with an optional failure reason """
self.passed = False
self.incr("failure")
self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
return False
def success(self):
""" Increment the success count """
self.incr("success")
return True
def skipped(self):
""" Increment the skipped count """
self.incr("skipped")
return True
def __call__(self, node):
""" Perform this test """
raise NotImplementedError
def audit(self):
""" Perform all the relevant audits (see ClusterAudit), returning
whether or not they all passed.
"""
passed = True
for audit in self.audits:
if not audit():
self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name))
self.incr("auditfail")
passed = False
return passed
def setup(self, node):
""" Setup this test """
# node is used in subclasses
# pylint: disable=unused-argument
return self.success()
def teardown(self, node):
""" Tear down this test """
# node is used in subclasses
# pylint: disable=unused-argument
return self.success()
def create_watch(self, patterns, timeout, name=None):
""" Create a new LogWatcher object with the given patterns, timeout,
and optional name. This object can be used to search log files
for matching patterns during this test's run.
"""
if not name:
name = self.name
return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout)
def local_badnews(self, prefix, watch, local_ignore=None):
""" Use the given watch object to search through log files for messages
starting with the given prefix. If no prefix is given, use
"LocalBadNews:" by default. The optional local_ignore list should
be a list of regexes that, if found in a line, will cause that line
to be ignored.
Return the number of matches found.
"""
errcount = 0
if not prefix:
prefix = "LocalBadNews:"
ignorelist = [" CTS: ", prefix]
if local_ignore:
ignorelist += local_ignore
while errcount < 100:
match = watch.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._logger.log("%s %s" % (prefix, match))
errcount += 1
else:
break
else:
self._logger.log("Too many errors!")
watch.end()
return errcount
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
if self.is_loop and not self._env["loop-tests"]:
return False
if self.is_unsafe and not self._env["unsafe-tests"]:
return False
if self.is_valgrind and not self._env["valgrind-tests"]:
return False
if self.is_experimental and not self._env["experimental-tests"]:
return False
if self._env["benchmark"] and not self.benchmark:
return False
return True
def can_run_now(self, node):
""" Return True if we can meaningfully run right now """
# node is used in subclasses
# pylint: disable=unused-argument
return True
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return []
diff --git a/python/pacemaker/_cts/tests/maintenancemode.py b/python/pacemaker/_cts/tests/maintenancemode.py
index d8adcb1a41..3c57c074fc 100644
--- a/python/pacemaker/_cts/tests/maintenancemode.py
+++ b/python/pacemaker/_cts/tests/maintenancemode.py
@@ -1,228 +1,238 @@
""" Toggle nodes in and out of maintenance mode """
__all__ = ["MaintenanceMode"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class MaintenanceMode(CTSTest):
""" A concrete test that toggles nodes in and out of maintenance mode """
def __init__(self, cm):
""" Create a new MaintenanceMode instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "MaintenanceMode"
self._action = "asyncmon"
self._rid = "maintenanceDummy"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def _toggle_maintenance_mode(self, node, enabled):
""" Toggle maintenance mode on the given node """
- pats = [ self.templates["Pat:DC_IDLE"] ]
+ pats = [
+ self.templates["Pat:DC_IDLE"]
+ ]
if enabled:
action = "On"
else:
action = "Off"
# fail the resource right after turning Maintenance mode on
# verify it is not recovered until maintenance mode is turned off
if enabled:
pats.append(self.templates["Pat:RscOpFail"] % (self._action, self._rid))
else:
- pats.extend([ self.templates["Pat:RscOpOK"] % ("stop", self._rid),
- self.templates["Pat:RscOpOK"] % ("start", self._rid) ])
+ pats.extend([
+ self.templates["Pat:RscOpOK"] % ("stop", self._rid),
+ self.templates["Pat:RscOpOK"] % ("start", self._rid)
+ ])
watch = self.create_watch(pats, 60)
watch.set_watch()
self.debug("Turning maintenance mode %s" % action)
self._rsh(node, self.templates["MaintenanceMode%s" % action])
if enabled:
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
with Timer(self._logger, self.name, "recover%s" % action):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when turning maintenance mode %s" % action)
return repr(watch.unmatched)
return ""
def _insert_maintenance_dummy(self, node):
""" Create a dummy resource on the given node """
- pats = [ ("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self._rid)) ]
+ pats = [
+ ("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self._rid))
+ ]
watch = self.create_watch(pats, 60)
watch.set_watch()
self._cm.add_dummy_rsc(node, self._rid)
with Timer(self._logger, self.name, "addDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when adding maintenance dummy resource")
return repr(watch.unmatched)
return ""
def _remove_maintenance_dummy(self, node):
""" Remove the previously created dummy resource on the given node """
- pats = [ self.templates["Pat:RscOpOK"] % ("stop", self._rid) ]
+ pats = [
+ self.templates["Pat:RscOpOK"] % ("stop", self._rid)
+ ]
watch = self.create_watch(pats, 60)
watch.set_watch()
self._cm.remove_dummy_rsc(node, self._rid)
with Timer(self._logger, self.name, "removeDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when removing maintenance dummy resource")
return repr(watch.unmatched)
return ""
def _managed_rscs(self, node):
""" Return a list of all resources managed by the cluster """
rscs = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if tmp.managed:
rscs.append(tmp.id)
return rscs
def _verify_resources(self, node, rscs, managed):
""" Verify that all resources in rscList are managed if they are expected
to be, or unmanaged if they are expected to be.
"""
managed_rscs = rscs
managed_str = "managed"
if not managed:
managed_str = "unmanaged"
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if managed and not tmp.managed:
continue
if not managed and tmp.managed:
continue
if managed_rscs.count(tmp.id):
managed_rscs.remove(tmp.id)
if not managed_rscs:
self.debug("Found all %s resources on %s" % (managed_str, node))
return True
self._logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managed_rscs))
return False
def __call__(self, node):
""" Perform this test """
self.incr("calls")
verify_managed = False
verify_unmanaged = False
fail_pat = ""
if not self._startall(None):
return self.failure("Setup failed")
# get a list of all the managed resources. We use this list
# after enabling maintenance mode to verify all managed resources
# become un-managed. After maintenance mode is turned off, we use
# this list to verify all the resources become managed again.
managed_rscs = self._managed_rscs(node)
if not managed_rscs:
self._logger.log("No managed resources on %s" % node)
return self.skipped()
# insert a fake resource we can fail during maintenance mode
# so we can verify recovery does not take place until after maintenance
# mode is disabled.
fail_pat += self._insert_maintenance_dummy(node)
# toggle maintenance mode ON, then fail dummy resource.
fail_pat += self._toggle_maintenance_mode(node, True)
# verify all the resources are now unmanaged
if self._verify_resources(node, managed_rscs, False):
verify_unmanaged = True
# Toggle maintenance mode OFF, verify dummy is recovered.
fail_pat += self._toggle_maintenance_mode(node, False)
# verify all the resources are now managed again
if self._verify_resources(node, managed_rscs, True):
verify_managed = True
# Remove our maintenance dummy resource.
fail_pat += self._remove_maintenance_dummy(node)
self._cm.cluster_stable()
if fail_pat != "":
return self.failure("Unmatched patterns: %s" % fail_pat)
if not verify_unmanaged:
return self.failure("Failed to verify resources became unmanaged during maintenance mode")
if not verify_managed:
return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"Updating failcount for %s" % self._rid,
- r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self._rid,
- r"Unknown operation: fail",
- self.templates["Pat:RscOpOK"] % (self._action, self._rid),
- r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, 0) ]
+ return [
+ r"Updating failcount for %s" % self._rid,
+ r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self._rid,
+ r"Unknown operation: fail",
+ self.templates["Pat:RscOpOK"] % (self._action, self._rid),
+ r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, 0)
+ ]
diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py
index a0a65b2f75..c5b70b7103 100644
--- a/python/pacemaker/_cts/tests/nearquorumpointtest.py
+++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py
@@ -1,125 +1,125 @@
""" Randomly start and stop nodes to bring the cluster close to the quorum point """
__all__ = ["NearQuorumPointTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class NearQuorumPointTest(CTSTest):
""" A concrete test that randomly starts and stops nodes to bring the
cluster close to the quorum point
"""
def __init__(self, cm):
""" Create a new NearQuorumPointTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "NearQuorumPoint"
def __call__(self, dummy):
""" Perform this test """
self.incr("calls")
startset = []
stopset = []
stonith = self._cm.prepare_fencing_watcher()
#decide what to do with each node
for node in self._env["nodes"]:
action = self._env.random_gen.choice(["start", "stop"])
- if action == "start" :
+ if action == "start":
startset.append(node)
- elif action == "stop" :
+ elif action == "stop":
stopset.append(node)
self.debug("start nodes:%r" % startset)
self.debug("stop nodes:%r" % stopset)
#add search patterns
- watchpats = [ ]
+ watchpats = []
for node in stopset:
if self._cm.expected_status[node] == "up":
watchpats.append(self.templates["Pat:We_stopped"] % node)
for node in startset:
if self._cm.expected_status[node] == "down":
watchpats.append(self.templates["Pat:Local_started"] % node)
else:
for stopping in stopset:
if self._cm.expected_status[stopping] == "up":
watchpats.append(self.templates["Pat:They_stopped"] % (node, stopping))
if not watchpats:
return self.skipped()
if startset:
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, self._env["DeadTime"] + 10)
watch.set_watch()
#begin actions
for node in stopset:
if self._cm.expected_status[node] == "up":
self._cm.stop_cm_async(node)
for node in startset:
if self._cm.expected_status[node] == "down":
self._cm.start_cm_async(node)
#get the result
if watch.look_for_all():
self._cm.cluster_stable()
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
return self.success()
self._logger.log("Warn: Patterns not found: %r" % watch.unmatched)
#get the "bad" nodes
upnodes = []
for node in stopset:
if self._cm.stat_cm(node):
upnodes.append(node)
downnodes = []
for node in startset:
if not self._cm.stat_cm(node):
downnodes.append(node)
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
if not upnodes and not downnodes:
self._cm.cluster_stable()
# Make sure they're completely down with no residule
for node in stopset:
self._rsh(node, self.templates["StopCmd"])
return self.success()
if upnodes:
self._logger.log("Warn: Unstoppable nodes: %r" % upnodes)
if downnodes:
self._logger.log("Warn: Unstartable nodes: %r" % downnodes)
return self.failure()
diff --git a/python/pacemaker/_cts/tests/partialstart.py b/python/pacemaker/_cts/tests/partialstart.py
index 745baf2394..1b074e6de0 100644
--- a/python/pacemaker/_cts/tests/partialstart.py
+++ b/python/pacemaker/_cts/tests/partialstart.py
@@ -1,71 +1,75 @@
""" Start a node and then tell it to stop before it is fully running """
__all__ = ["PartialStart"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.simulstoplite import SimulStopLite
from pacemaker._cts.tests.stoptest import StopTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class PartialStart(CTSTest):
""" A concrete test that interrupts a node before it's finished starting up """
def __init__(self, cm):
""" Create a new PartialStart instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "PartialStart"
self._startall = SimulStartLite(cm)
self._stop = StopTest(cm)
self._stopall = SimulStopLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
ret = self._stopall(None)
if not ret:
return self.failure("Setup failed")
- watchpats = [ "pacemaker-controld.*Connecting to .* cluster infrastructure" ]
+ watchpats = [
+ "pacemaker-controld.*Connecting to .* cluster infrastructure"
+ ]
watch = self.create_watch(watchpats, self._env["DeadTime"] + 10)
watch.set_watch()
self._cm.start_cm_async(node)
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: %r" % watch.unmatched)
return self.failure("Setup of %s failed" % node)
ret = self._stop(node)
if not ret:
return self.failure("%s did not stop in time" % node)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# We might do some fencing in the 2-node case if we make it up far enough
- return [ r"Executing reboot fencing operation",
- r"Requesting fencing \([^)]+\) targeting node " ]
+ return [
+ r"Executing reboot fencing operation",
+ r"Requesting fencing \([^)]+\) targeting node "
+ ]
diff --git a/python/pacemaker/_cts/tests/reattach.py b/python/pacemaker/_cts/tests/reattach.py
index a652e9c34f..dbf23e5fca 100644
--- a/python/pacemaker/_cts/tests/reattach.py
+++ b/python/pacemaker/_cts/tests/reattach.py
@@ -1,222 +1,226 @@
""" Restart the cluster and verify resources remain running """
__all__ = ["Reattach"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.simulstoplite import SimulStopLite
from pacemaker._cts.tests.starttest import StartTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class Reattach(CTSTest):
""" A concrete test that restarts the cluster and verifies that resources
remain running throughout
"""
def __init__(self, cm):
""" Create a new Reattach instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "Reattach"
self._startall = SimulStartLite(cm)
self._stopall = SimulStopLite(cm)
def _is_managed(self, node):
""" Are resources managed by the cluster? """
(_, is_managed) = self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
is_managed = is_managed[0].strip()
return is_managed == "true"
def _set_unmanaged(self, node):
""" Disable resource management """
self.debug("Disable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
def _set_managed(self, node):
""" Enable resource management """
self.debug("Re-enable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
def _disable_incompatible_rscs(self, node):
""" Disable resources that are incompatible with this test
Starts and stops of stonith-class resources are implemented internally
by Pacemaker, which means that they must stop when Pacemaker is
stopped, even if unmanaged. Disable them before running the Reattach
test so they don't affect resource placement.
OCFS2 resources must be disabled too for some reason.
Set target-role to "Stopped" for any of these resources in the CIB.
"""
self.debug("Disable incompatible (stonith/OCFS2) resources")
xml = """'
' --scope rsc_defaults"""
return self._rsh(node, self._cm.templates['CibAddXml'] % xml)
def _enable_incompatible_rscs(self, node):
""" Re-enable resources that were incompatible with this test """
self.debug("Re-enable incompatible (stonith/OCFS2) resources")
xml = """"""
return self._rsh(node, """cibadmin --delete --xml-text '%s'""" % xml)
def _reprobe(self, node):
""" Reprobe all resources
The placement of some resources (such as promotable-1 in the
lab-generated CIB) is affected by constraints using node-attribute-based
rules. An earlier test may have erased the relevant node attribute, so
do a reprobe, which should add the attribute back.
"""
return self._rsh(node, """crm_resource --refresh""")
def setup(self, node):
""" Setup this test """
if not self._startall(None):
return self.failure("Startall failed")
(rc, _) = self._disable_incompatible_rscs(node)
if rc != ExitStatus.OK:
return self.failure("Couldn't modify CIB to stop incompatible resources")
(rc, _) = self._reprobe(node)
if rc != ExitStatus.OK:
return self.failure("Couldn't reprobe resources")
if not self._cm.cluster_stable(double_check=True):
return self.failure("Cluster did not stabilize after setup")
return self.success()
def teardown(self, node):
""" Tear down this test """
# Make sure 'node' is up
start = StartTest(self._cm)
start(node)
if not self._is_managed(node):
self._set_managed(node)
(rc, _) = self._enable_incompatible_rscs(node)
if rc != ExitStatus.OK:
return self.failure("Couldn't modify CIB to re-enable incompatible resources")
if not self._cm.cluster_stable():
return self.failure("Cluster did not stabilize after teardown")
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
return self.success()
def can_run_now(self, node):
""" Return True if we can meaningfully run right now """
return True
def __call__(self, node):
""" Perform this test """
self.incr("calls")
# Conveniently, the scheduler will display this message when disabling
# management, even if fencing is not enabled, so we can rely on it.
managed = self.create_watch(["No fencing will be done"], 60)
managed.set_watch()
self._set_unmanaged(node)
if not managed.look_for_all():
self._logger.log("Patterns not found: %r" % managed.unmatched)
return self.failure("Resource management not disabled")
- pats = [ self.templates["Pat:RscOpOK"] % ("start", ".*"),
- self.templates["Pat:RscOpOK"] % ("stop", ".*"),
- self.templates["Pat:RscOpOK"] % ("promote", ".*"),
- self.templates["Pat:RscOpOK"] % ("demote", ".*"),
- self.templates["Pat:RscOpOK"] % ("migrate", ".*") ]
+ pats = [
+ self.templates["Pat:RscOpOK"] % ("start", ".*"),
+ self.templates["Pat:RscOpOK"] % ("stop", ".*"),
+ self.templates["Pat:RscOpOK"] % ("promote", ".*"),
+ self.templates["Pat:RscOpOK"] % ("demote", ".*"),
+ self.templates["Pat:RscOpOK"] % ("migrate", ".*")
+ ]
watch = self.create_watch(pats, 60, "ShutdownActivity")
watch.set_watch()
self.debug("Shutting down the cluster")
ret = self._stopall(None)
if not ret:
self._set_managed(node)
return self.failure("Couldn't shut down the cluster")
self.debug("Bringing the cluster back up")
ret = self._startall(None)
time.sleep(5) # allow ping to update the CIB
if not ret:
self._set_managed(node)
return self.failure("Couldn't restart the cluster")
if self.local_badnews("ResourceActivity:", watch):
self._set_managed(node)
return self.failure("Resources stopped or started during cluster restart")
watch = self.create_watch(pats, 60, "StartupActivity")
watch.set_watch()
# Re-enable resource management (and verify it happened).
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
# Ignore actions for STONITH resources
ignore = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.debug("Ignoring start actions for %s" % r.id)
ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
if self.local_badnews("ResourceActivity:", watch, ignore):
return self.failure("Resources stopped or started after resource management was re-enabled")
return ret
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"resource( was|s were) active at shutdown" ]
+ return [
+ r"resource( was|s were) active at shutdown"
+ ]
diff --git a/python/pacemaker/_cts/tests/remotedriver.py b/python/pacemaker/_cts/tests/remotedriver.py
index d6e2ca2fde..c5b0292977 100644
--- a/python/pacemaker/_cts/tests/remotedriver.py
+++ b/python/pacemaker/_cts/tests/remotedriver.py
@@ -1,538 +1,556 @@
""" Base classes for CTS tests """
__all__ = ["RemoteDriver"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import time
import subprocess
import tempfile
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.tests.stoptest import StopTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class RemoteDriver(CTSTest):
""" A specialized base class for cluster tests that run on Pacemaker
Remote nodes. This builds on top of CTSTest to provide methods
for starting and stopping services and resources, and managing
remote nodes. This is still just an abstract class -- specific
tests need to implement their own specialized behavior.
"""
def __init__(self, cm):
""" Create a new RemoteDriver instance
Arguments:
cm -- A ClusterManager instance
"""
- CTSTest.__init__(self,cm)
+ CTSTest.__init__(self, cm)
self.name = "RemoteDriver"
self._corosync_enabled = False
self._pacemaker_enabled = False
self._remote_node = None
self._remote_rsc = "remote-rsc"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self._stop = StopTest(cm)
self.reset()
def reset(self):
""" Reset the state of this test back to what it was before the test
was run
"""
self.failed = False
self.fail_string = ""
self._pcmk_started = False
self._remote_node_added = False
self._remote_rsc_added = False
- self._remote_use_reconnect_interval = self._env.random_gen.choice([True,False])
+ self._remote_use_reconnect_interval = self._env.random_gen.choice([True, False])
def fail(self, msg):
""" Mark test as failed """
self.failed = True
# Always log the failure.
self._logger.log(msg)
# Use first failure as test status, as it's likely to be most useful.
if not self.fail_string:
self.fail_string = msg
def _get_other_node(self, node):
""" Get the first cluster node out of the environment that is not the
given node. Typically, this is used to find some node that will
still be active that we can run cluster commands on.
"""
for othernode in self._env["nodes"]:
if othernode == node:
# we don't want to try and use the cib that we just shutdown.
# find a cluster node that is not our soon to be remote-node.
continue
return othernode
def _del_rsc(self, node, rsc):
""" Delete the given named resource from the cluster. The given `node`
is the cluster node on which we should *not* run the delete command.
"""
othernode = self._get_other_node(node)
(rc, _) = self._rsh(othernode, "crm_resource -D -r %s -t primitive" % rsc)
if rc != 0:
self.fail("Removal of resource '%s' failed" % rsc)
def _add_rsc(self, node, rsc_xml):
""" Add a resource given in XML format to the cluster. The given `node`
is the cluster node on which we should *not* run the add command.
"""
othernode = self._get_other_node(node)
(rc, _) = self._rsh(othernode, "cibadmin -C -o resources -X '%s'" % rsc_xml)
if rc != 0:
self.fail("resource creation failed")
def _add_primitive_rsc(self, node):
""" Add a primitive heartbeat resource for the remote node to the
cluster. The given `node` is the cluster node on which we should
*not* run the add command.
"""
rsc_xml = """
-""" % { "node": self._remote_rsc }
+""" % {
+ "node": self._remote_rsc
+}
self._add_rsc(node, rsc_xml)
if not self.failed:
self._remote_rsc_added = True
def _add_connection_rsc(self, node):
""" Add a primitive connection resource for the remote node to the
cluster. The given `node` is teh cluster node on which we should
*not* run the add command.
"""
rsc_xml = """
-""" % { "node": self._remote_node, "server": node }
+""" % {
+ "node": self._remote_node, "server": node
+}
if self._remote_use_reconnect_interval:
# Set reconnect interval on resource
rsc_xml += """
""" % self._remote_node
rsc_xml += """
-""" % { "node": self._remote_node }
+""" % {
+ "node": self._remote_node
+}
self._add_rsc(node, rsc_xml)
if not self.failed:
self._remote_node_added = True
def _disable_services(self, node):
""" Disable the corosync and pacemaker services on the given node """
self._corosync_enabled = self._env.service_is_enabled(node, "corosync")
if self._corosync_enabled:
self._env.disable_service(node, "corosync")
self._pacemaker_enabled = self._env.service_is_enabled(node, "pacemaker")
if self._pacemaker_enabled:
self._env.disable_service(node, "pacemaker")
def _enable_services(self, node):
""" Enable the corosync and pacemaker services on the given node """
if self._corosync_enabled:
self._env.enable_service(node, "corosync")
if self._pacemaker_enabled:
self._env.enable_service(node, "pacemaker")
def _stop_pcmk_remote(self, node):
""" Stop the Pacemaker Remote service on the given node """
for _ in range(10):
(rc, _) = self._rsh(node, "service pacemaker_remote stop")
if rc != 0:
time.sleep(6)
else:
break
def _start_pcmk_remote(self, node):
""" Start the Pacemaker Remote service on the given node """
for _ in range(10):
(rc, _) = self._rsh(node, "service pacemaker_remote start")
if rc != 0:
time.sleep(6)
else:
self._pcmk_started = True
break
def _freeze_pcmk_remote(self, node):
""" Simulate a Pacemaker Remote daemon failure """
self._rsh(node, "killall -STOP pacemaker-remoted")
def _resume_pcmk_remote(self, node):
""" Simulate the Pacemaker Remote daemon recovering """
self._rsh(node, "killall -CONT pacemaker-remoted")
def _start_metal(self, node):
""" Setup a Pacemaker Remote configuration. Remove any existing
connection resources or nodes. Start the pacemaker_remote service.
Create a connection resource.
"""
# Cluster nodes are reused as remote nodes in remote tests. If cluster
# services were enabled at boot, in case the remote node got fenced, the
# cluster node would join instead of the expected remote one. Meanwhile
# pacemaker_remote would not be able to start. Depending on the chances,
# the situations might not be able to be orchestrated gracefully any more.
#
# Temporarily disable any enabled cluster serivces.
self._disable_services(node)
# make sure the resource doesn't already exist for some reason
self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_rsc)
self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_node)
if not self._stop(node):
self.fail("Failed to shutdown cluster node %s" % node)
return
self._start_pcmk_remote(node)
if not self._pcmk_started:
self.fail("Failed to start pacemaker_remote on node %s" % node)
return
# Convert node to baremetal now that it has shutdown the cluster stack
- pats = [ ]
+ pats = []
watch = self.create_watch(pats, 120)
watch.set_watch()
- pats.extend([ self.templates["Pat:RscOpOK"] % ("start", self._remote_node),
- self.templates["Pat:DC_IDLE"] ])
+ pats.extend([
+ self.templates["Pat:RscOpOK"] % ("start", self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ])
self._add_connection_rsc(node)
with Timer(self._logger, self.name, "remoteMetalInit"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def migrate_connection(self, node):
""" Move the remote connection resource from the node it's currently
running on to any other available node
"""
if self.failed:
return
- pats = [ self.templates["Pat:RscOpOK"] % ("migrate_to", self._remote_node),
- self.templates["Pat:RscOpOK"] % ("migrate_from", self._remote_node),
- self.templates["Pat:DC_IDLE"] ]
+ pats = [
+ self.templates["Pat:RscOpOK"] % ("migrate_to", self._remote_node),
+ self.templates["Pat:RscOpOK"] % ("migrate_from", self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ]
watch = self.create_watch(pats, 120)
watch.set_watch()
(rc, _) = self._rsh(node, "crm_resource -M -r %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("failed to move remote node connection resource")
return
with Timer(self._logger, self.name, "remoteMetalMigrate"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def fail_rsc(self, node):
""" Cause the dummy resource running on a Pacemaker Remote node to fail
and verify that the failure is logged correctly
"""
if self.failed:
return
- watchpats = [ self.templates["Pat:RscRemoteOpOK"] % ("stop", self._remote_rsc, self._remote_node),
- self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
- self.templates["Pat:DC_IDLE"] ]
+ watchpats = [
+ self.templates["Pat:RscRemoteOpOK"] % ("stop", self._remote_rsc, self._remote_node),
+ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ]
watch = self.create_watch(watchpats, 120)
watch.set_watch()
self.debug("causing dummy rsc to fail.")
self._rsh(node, "rm -f /var/run/resource-agents/Dummy*")
with Timer(self._logger, self.name, "remoteRscFail"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched)
def fail_connection(self, node):
""" Cause the remote connection resource to fail and verify that the
node is fenced and the connection resource is restarted on another
node.
"""
if self.failed:
return
- watchpats = [ self.templates["Pat:Fencing_ok"] % self._remote_node,
- self.templates["Pat:NodeFenced"] % self._remote_node ]
+ watchpats = [
+ self.templates["Pat:Fencing_ok"] % self._remote_node,
+ self.templates["Pat:NodeFenced"] % self._remote_node
+ ]
watch = self.create_watch(watchpats, 120)
watch.set_watch()
# freeze the pcmk remote daemon. this will result in fencing
self.debug("Force stopped active remote node")
self._freeze_pcmk_remote(node)
self.debug("Waiting for remote node to be fenced.")
with Timer(self._logger, self.name, "remoteMetalFence"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
return
self.debug("Waiting for the remote node to come back up")
self._cm.ns.wait_for_node(node, 120)
- pats = [ ]
+ pats = []
watch = self.create_watch(pats, 240)
watch.set_watch()
pats.append(self.templates["Pat:RscOpOK"] % ("start", self._remote_node))
if self._remote_rsc_added:
pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node))
# start the remote node again watch it integrate back into cluster.
self._start_pcmk_remote(node)
if not self._pcmk_started:
self.fail("Failed to start pacemaker_remote on node %s" % node)
return
self.debug("Waiting for remote node to rejoin cluster after being fenced.")
with Timer(self._logger, self.name, "remoteMetalRestart"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def _add_dummy_rsc(self, node):
""" Add a dummy resource that runs on the Pacemaker Remote node """
if self.failed:
return
# verify we can put a resource on the remote node
- pats = [ ]
+ pats = []
watch = self.create_watch(pats, 120)
watch.set_watch()
- pats.extend([ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
- self.templates["Pat:DC_IDLE"] ])
+ pats.extend([
+ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ])
# Add a resource that must live on remote-node
self._add_primitive_rsc(node)
# force that rsc to prefer the remote node.
(rc, _) = self._cm.rsh(node, "crm_resource -M -r %s -N %s -f" % (self._remote_rsc, self._remote_node), verbose=1)
if rc != 0:
self.fail("Failed to place remote resource on remote node.")
return
with Timer(self._logger, self.name, "remoteMetalRsc"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def test_attributes(self, node):
""" Verify that attributes can be set on the Pacemaker Remote node """
if self.failed:
return
# This verifies permanent attributes can be set on a remote-node. It also
# verifies the remote-node can edit its own cib node section remotely.
(rc, line) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line))
return
(rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("Failed to get remote-node attribute")
return
(rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("Failed to delete remote-node attribute")
def cleanup_metal(self, node):
""" Clean up the Pacemaker Remote node configuration previously created by
_setup_metal. Stop and remove dummy resources and connection resources.
Stop the pacemaker_remote service. Remove the remote node itself.
"""
self._enable_services(node)
if not self._pcmk_started:
return
- pats = [ ]
+ pats = []
watch = self.create_watch(pats, 120)
watch.set_watch()
if self._remote_rsc_added:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_rsc))
if self._remote_node_added:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_node))
with Timer(self._logger, self.name, "remoteMetalCleanup"):
self._resume_pcmk_remote(node)
if self._remote_rsc_added:
# Remove dummy resource added for remote node tests
self.debug("Cleaning up dummy rsc put on remote node")
self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_rsc)
self._del_rsc(node, self._remote_rsc)
if self._remote_node_added:
# Remove remote node's connection resource
self.debug("Cleaning up remote node connection resource")
self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_node)
self._del_rsc(node, self._remote_node)
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
self._stop_pcmk_remote(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self._remote_node_added:
# Remove remote node itself
self.debug("Cleaning up node entry for remote node")
self._rsh(self._get_other_node(node), "crm_node --force --remove %s" % self._remote_node)
def _setup_env(self, node):
""" Setup the environment to allow Pacemaker Remote to function. This
involves generating a key and copying it to all nodes in the cluster.
"""
self._remote_node = "remote-%s" % node
# we are assuming if all nodes have a key, that it is
# the right key... If any node doesn't have a remote
# key, we regenerate it everywhere.
if self._rsh.exists_on_all("/etc/pacemaker/authkey", self._env["nodes"]):
return
# create key locally
(handle, keyfile) = tempfile.mkstemp(".cts")
os.close(handle)
subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# sync key throughout the cluster
for n in self._env["nodes"]:
self._rsh(n, "mkdir -p --mode=0750 /etc/pacemaker")
self._rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % n)
self._rsh(n, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey")
self._rsh(n, "chmod 0640 /etc/pacemaker/authkey")
os.unlink(keyfile)
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not CTSTest.is_applicable(self):
return False
for node in self._env["nodes"]:
(rc, _) = self._rsh(node, "which pacemaker-remoted >/dev/null 2>&1")
if rc != 0:
return False
return True
def start_new_test(self, node):
""" Prepare a remote test for running by setting up its environment
and resources
"""
self.incr("calls")
self.reset()
ret = self._startall(None)
if not ret:
return self.failure("setup failed: could not start all nodes")
self._setup_env(node)
self._start_metal(node)
self._add_dummy_rsc(node)
return True
def __call__(self, node):
""" Perform this test """
raise NotImplementedError
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"""is running on remote.*which isn't allowed""",
- r"""Connection terminated""",
- r"""Could not send remote""" ]
+ return [
+ r"""is running on remote.*which isn't allowed""",
+ r"""Connection terminated""",
+ r"""Could not send remote"""
+ ]
diff --git a/python/pacemaker/_cts/tests/remoterscfailure.py b/python/pacemaker/_cts/tests/remoterscfailure.py
index 33c1d3739a..6f221ded88 100644
--- a/python/pacemaker/_cts/tests/remoterscfailure.py
+++ b/python/pacemaker/_cts/tests/remoterscfailure.py
@@ -1,71 +1,73 @@
""" Cause the Pacemaker Remote connection resource to fail """
__all__ = ["RemoteRscFailure"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.remotedriver import RemoteDriver
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class RemoteRscFailure(RemoteDriver):
""" A concrete test that causes the Pacemaker Remote connection resource
to fail
"""
def __init__(self, cm):
""" Create a new RemoteRscFailure instance
Arguments:
cm -- A ClusterManager instance
"""
RemoteDriver.__init__(self, cm)
self.name = "RemoteRscFailure"
def __call__(self, node):
""" Perform this test """
if not self.start_new_test(node):
return self.failure(self.fail_string)
# This is an important step. We are migrating the connection
# before failing the resource. This verifies that the migration
# has properly maintained control over the remote-node.
self.migrate_connection(node)
self.fail_rsc(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
- r"Dummy.*: No process state file found" ] + super().errors_to_ignore
+ return [
+ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
+ r"Dummy.*: No process state file found"
+ ] + super().errors_to_ignore
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not RemoteDriver.is_applicable(self):
return False
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
return len(self._env["nodes"]) >= 3
diff --git a/python/pacemaker/_cts/tests/remotestonithd.py b/python/pacemaker/_cts/tests/remotestonithd.py
index fd5144e2f6..f684992cae 100644
--- a/python/pacemaker/_cts/tests/remotestonithd.py
+++ b/python/pacemaker/_cts/tests/remotestonithd.py
@@ -1,60 +1,62 @@
""" Fail the connection resource and fence the remote node """
__all__ = ["RemoteStonithd"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.remotedriver import RemoteDriver
class RemoteStonithd(RemoteDriver):
""" A concrete test that fails the connection resource and fences the
remote node
"""
def __init__(self, cm):
""" Create a new RemoteStonithd instance
Arguments:
cm -- A ClusterManager instance
"""
RemoteDriver.__init__(self, cm)
self.name = "RemoteStonithd"
def __call__(self, node):
""" Perform this test """
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.fail_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not RemoteDriver.is_applicable(self):
return False
return self._env.get("DoFencing", True)
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"Lost connection to Pacemaker Remote node",
- r"Software caused connection abort",
- r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
- r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
- r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
- r"error: Result of monitor operation for .* on remote-.*: Internal communication failure" ] + super().errors_to_ignore
+ return [
+ r"Lost connection to Pacemaker Remote node",
+ r"Software caused connection abort",
+ r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
+ r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
+ r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
+ r"error: Result of monitor operation for .* on remote-.*: Internal communication failure"
+ ] + super().errors_to_ignore
diff --git a/python/pacemaker/_cts/tests/resourcerecover.py b/python/pacemaker/_cts/tests/resourcerecover.py
index a2ecfa8e31..9fd07e2b4c 100644
--- a/python/pacemaker/_cts/tests/resourcerecover.py
+++ b/python/pacemaker/_cts/tests/resourcerecover.py
@@ -1,171 +1,175 @@
""" Fail a random resource and verify its fail count increases """
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class ResourceRecover(CTSTest):
""" A concrete test that fails a random resource """
def __init__(self, cm):
""" Create a new ResourceRecover instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "ResourceRecover"
self._action = "asyncmon"
self._interval = 0
self._rid = None
self._rid_alt = None
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
if not self._startall(None):
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self._cm.active_resources(node)
if not resourcelist:
self._logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self._choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self._rid)
if rsc.id == rsc.clone_id:
self.debug("Failing %s" % rsc.id)
else:
self.debug("Failing %s (also known as %s)" % (rsc.id, rsc.clone_id))
# Log patterns to watch for (failure, plus restart if managed)
- pats = [ self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id) ]
+ pats = [
+ self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id)
+ ]
if rsc.managed:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._rid))
if rsc.unique:
pats.append(self.templates["Pat:RscOpOK"] % ("start", self._rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self._fail_resource(rsc, node, pats) is None:
# self.failure() already called
return None
return self.success()
def _choose_resource(self, node, resourcelist):
""" Choose a random resource to target """
self._rid = self._env.random_gen.choice(resourcelist)
self._rid_alt = self._rid
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self._cm, line)
if rsc.id == self._rid:
# Handle anonymous clones that get renamed
self._rid = rsc.clone_id
return rsc
return None
def _get_failcount(self, node):
""" Check the fail count of targeted resource on given node """
cmd = "crm_failcount --quiet --query --resource %s --operation %s --interval %d --node %s"
(rc, lines) = self._rsh(node, cmd % (self._rid, self._action, self._interval, node),
verbose=1)
if rc != 0 or len(lines) != 1:
lines = [l.strip() for l in lines]
self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(lines)))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self._logger.log("crm_failcount output on %s unparseable: %s" % (node, " ".join(lines)))
return -1
return failcount
def _fail_resource(self, rsc, node, pats):
""" Fail the targeted resource, and verify as expected """
orig_failcount = self._get_failcount(node)
watch = self.create_watch(pats, 60)
watch.set_watch()
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
with Timer(self._logger, self.name, "recover"):
watch.look_for_all()
self._cm.cluster_stable()
recovered = self._cm.resource_location(self._rid)
if watch.unmatched:
return self.failure("Patterns not found: %r" % watch.unmatched)
if rsc.unique and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %r" % (self._rid, recovered))
if recovered:
self.debug("%s is running on: %r" % (self._rid, recovered))
elif rsc.managed:
return self.failure("%s was not recovered and is inactive" % self._rid)
new_failcount = self._get_failcount(node)
if new_failcount != orig_failcount + 1:
return self.failure("%s fail count is %d not %d" % (self._rid,
new_failcount, orig_failcount + 1))
return 0 # Anything but None is success
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"Updating failcount for %s" % self._rid,
- r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt),
- r"Unknown operation: fail",
- self.templates["Pat:RscOpOK"] % (self._action, self._rid),
- r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval) ]
+ return [
+ r"Updating failcount for %s" % self._rid,
+ r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt),
+ r"Unknown operation: fail",
+ self.templates["Pat:RscOpOK"] % (self._action, self._rid),
+ r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval)
+ ]
diff --git a/python/pacemaker/_cts/tests/resynccib.py b/python/pacemaker/_cts/tests/resynccib.py
index 3e7179072e..fe634d65e1 100644
--- a/python/pacemaker/_cts/tests/resynccib.py
+++ b/python/pacemaker/_cts/tests/resynccib.py
@@ -1,73 +1,75 @@
""" Start the cluster without a CIB and verify it gets copied from another node """
__all__ = ["ResyncCIB"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker import BuildOptions
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.restarttest import RestartTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.simulstoplite import SimulStopLite
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class ResyncCIB(CTSTest):
""" A concrete test that starts the cluster on one node without a CIB and
verifies the CIB is copied over when the remaining nodes join
"""
def __init__(self, cm):
""" Create a new ResyncCIB instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "ResyncCIB"
self._restart1 = RestartTest(cm)
self._startall = SimulStartLite(cm)
self._stopall = SimulStopLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
# Shut down all the nodes...
if not self._stopall(None):
return self.failure("Could not stop all nodes")
# Test config recovery when the other nodes come up
self._rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
# Start the selected node
if not self._restart1(node):
return self.failure("Could not start %s" % node)
# Start all remaining nodes
if not self._startall(None):
return self.failure("Could not start the remaining nodes")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Errors that occur as a result of the CIB being wiped
- return [ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
- r"error.*: Resource start-up disabled since no STONITH resources have been defined",
- r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
- r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity" ]
+ return [
+ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
+ r"error.*: Resource start-up disabled since no STONITH resources have been defined",
+ r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
+ r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity"
+ ]
diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py
index 16cf67472c..c5c51e189f 100644
--- a/python/pacemaker/_cts/tests/simulstartlite.py
+++ b/python/pacemaker/_cts/tests/simulstartlite.py
@@ -1,131 +1,133 @@
""" Simultaneously start stopped nodes """
__all__ = ["SimulStartLite"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class SimulStartLite(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class starts any stopped nodes more or less
simultaneously.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new SimulStartLite instance
Arguments:
cm -- A ClusterManager instance
"""
- CTSTest.__init__(self,cm)
+ CTSTest.__init__(self, cm)
self.name = "SimulStartLite"
def __call__(self, dummy):
""" Start all stopped nodes more or less simultaneously, returning
whether this succeeded or not.
"""
self.incr("calls")
self.debug("Setup: %s" % self.name)
# We ignore the "node" parameter...
node_list = []
for node in self._env["nodes"]:
if self._cm.expected_status[node] == "down":
self.incr("WasStopped")
node_list.append(node)
self.set_timer()
while len(node_list) > 0:
# Repeat until all nodes come up
uppat = self.templates["Pat:NonDC_started"]
if self._cm.upcount() == 0:
uppat = self.templates["Pat:Local_started"]
- watchpats = [ self.templates["Pat:DC_IDLE"] ]
+ watchpats = [
+ self.templates["Pat:DC_IDLE"]
+ ]
for node in node_list:
watchpats.extend([uppat % node,
self.templates["Pat:InfraUp"] % node,
self.templates["Pat:PacemakerUp"] % node])
# Start all the nodes - at about the same time...
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
stonith = self._cm.prepare_fencing_watcher()
for node in node_list:
self._cm.start_cm_async(node)
watch.look_for_all()
node_list = self._cm.fencing_cleanup(self.name, stonith)
if node_list is None:
return self.failure("Cluster did not stabilize")
# Remove node_list messages from watch.unmatched
for node in node_list:
self._logger.debug("Dealing with stonith operations for %s" % node_list)
if watch.unmatched:
try:
watch.unmatched.remove(uppat % node)
except ValueError:
self.debug("Already matched: %s" % (uppat % node))
try:
watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
except ValueError:
self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
try:
watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
except ValueError:
self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
if watch.unmatched:
for regex in watch.unmatched:
- self._logger.log ("Warn: Startup pattern not found: %s" % regex)
+ self._logger.log("Warn: Startup pattern not found: %s" % regex)
if not self._cm.cluster_stable():
return self.failure("Cluster did not stabilize")
did_fail = False
unstable = []
for node in self._env["nodes"]:
if not self._cm.stat_cm(node):
did_fail = True
unstable.append(node)
if did_fail:
return self.failure("Unstarted nodes exist: %s" % unstable)
unstable = []
for node in self._env["nodes"]:
if not self._cm.node_stable(node):
did_fail = True
unstable.append(node)
if did_fail:
return self.failure("Unstable cluster nodes exist: %s" % unstable)
return self.success()
def is_applicable(self):
""" SimulStartLite is a setup test and never applicable """
return False
diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py
index 9dbc1f2d2d..d2e687e8fa 100644
--- a/python/pacemaker/_cts/tests/simulstoplite.py
+++ b/python/pacemaker/_cts/tests/simulstoplite.py
@@ -1,91 +1,91 @@
""" Simultaneously stop running nodes """
__all__ = ["SimulStopLite"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class SimulStopLite(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class stops any running nodes more or less
simultaneously. It can be used both to set up a test or to clean up
a test.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new SimulStopLite instance
Arguments:
cm -- A ClusterManager instance
"""
- CTSTest.__init__(self,cm)
+ CTSTest.__init__(self, cm)
self.name = "SimulStopLite"
def __call__(self, dummy):
""" Stop all running nodes more or less simultaneously, returning
whether this succeeded or not.
"""
self.incr("calls")
self.debug("Setup: %s" % self.name)
# We ignore the "node" parameter...
watchpats = []
for node in self._env["nodes"]:
if self._cm.expected_status[node] == "up":
self.incr("WasStarted")
watchpats.append(self.templates["Pat:We_stopped"] % node)
if len(watchpats) == 0:
return self.success()
# Stop all the nodes - at about the same time...
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self.set_timer()
for node in self._env["nodes"]:
if self._cm.expected_status[node] == "up":
self._cm.stop_cm_async(node)
if watch.look_for_all():
# Make sure they're completely down with no residule
for node in self._env["nodes"]:
self._rsh(node, self.templates["StopCmd"])
return self.success()
did_fail = False
up_nodes = []
for node in self._env["nodes"]:
if self._cm.stat_cm(node):
did_fail = True
up_nodes.append(node)
if did_fail:
return self.failure("Active nodes exist: %s" % up_nodes)
self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched)
return self.failure("Missing log message: %s " % watch.unmatched)
def is_applicable(self):
""" SimulStopLite is a setup test and never applicable """
return False
diff --git a/python/pacemaker/_cts/tests/splitbraintest.py b/python/pacemaker/_cts/tests/splitbraintest.py
index 6664281d7e..09d5f5550c 100644
--- a/python/pacemaker/_cts/tests/splitbraintest.py
+++ b/python/pacemaker/_cts/tests/splitbraintest.py
@@ -1,213 +1,215 @@
""" Create a split brain cluster and verify a resource is multiply managed """
__all__ = ["SplitBrainTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import time
from pacemaker._cts.input import should_continue
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class SplitBrainTest(CTSTest):
""" A concrete test that creates a split brain cluster and verifies that
one node in each partition takes over the resource, resulting in two
nodes running the same resource.
"""
def __init__(self, cm):
""" Create a new SplitBrainTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.is_experimental = True
self.name = "SplitBrain"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def _isolate_partition(self, partition):
""" Create a new partition containing the given nodes """
other_nodes = self._env["nodes"].copy()
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
- self._logger.log("Node %s not in %r from %r" % (node,self._env["nodes"], partition))
+ self._logger.log("Node %s not in %r from %r" % (node, self._env["nodes"], partition))
if not other_nodes:
return
self.debug("Creating partition: %r" % partition)
self.debug("Everyone else: %r" % other_nodes)
for node in partition:
if not self._cm.isolate_node(node, other_nodes):
self._logger.log("Could not isolate %s" % node)
return
def _heal_partition(self, partition):
""" Move the given nodes out of their own partition back into the cluster """
other_nodes = self._env["nodes"].copy()
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self._logger.log("Node %s not in %r" % (node, self._env["nodes"]))
if len(other_nodes) == 0:
return
self.debug("Healing partition: %r" % partition)
self.debug("Everyone else: %r" % other_nodes)
for node in partition:
self._cm.unisolate_node(node, other_nodes)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
self.passed = True
partitions = {}
if not self._startall(None):
return self.failure("Setup failed")
while True:
# Retry until we get multiple partitions
partitions = {}
p_max = len(self._env["nodes"])
for n in self._env["nodes"]:
p = self._env.random_gen.randint(1, p_max)
if p not in partitions:
partitions[p] = []
partitions[p].append(n)
p_max = len(partitions)
if p_max > 1:
break
# else, try again
self.debug("Created %d partitions" % p_max)
for (key, val) in partitions.items():
self.debug("Partition[%s]:\t%r" % (key, val))
# Disabling STONITH to reduce test complexity for now
self._rsh(node, "crm_attribute -V -n stonith-enabled -v false")
for val in partitions.values():
self._isolate_partition(val)
count = 30
while count > 0:
if len(self._cm.find_partitions()) != p_max:
time.sleep(10)
else:
break
else:
self.failure("Expected partitions were not created")
# Target number of partitions formed - wait for stability
if not self._cm.cluster_stable():
self.failure("Partitioned cluster not stable")
# Now audit the cluster state
self._cm.partitions_expected = p_max
if not self.audit():
self.failure("Audits failed")
self._cm.partitions_expected = 1
# And heal them again
for val in partitions.values():
self._heal_partition(val)
# Wait for a single partition to form
count = 30
while count > 0:
if len(self._cm.find_partitions()) != 1:
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not reform")
# Wait for it to have the right number of members
count = 30
while count > 0:
members = []
partitions = self._cm.find_partitions()
if partitions:
members = partitions[0].split()
if len(members) != len(self._env["nodes"]):
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not completely reform")
# Wait up to 20 minutes - the delay is more preferable than
# trying to continue with in a messed up state
if not self._cm.cluster_stable(1200):
self.failure("Reformed cluster not stable")
if not should_continue(self._env):
raise ValueError("Reformed cluster not stable")
# Turn fencing back on
if self._env["DoFencing"]:
self._rsh(node, "crm_attribute -V -D -n stonith-enabled")
self._cm.cluster_stable()
if self.passed:
return self.success()
return self.failure("See previous errors")
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ r"Another DC detected:",
- r"(ERROR|error).*: .*Application of an update diff failed",
- r"pacemaker-controld.*:.*not in our membership list",
- r"CRIT:.*node.*returning after partition" ]
+ return [
+ r"Another DC detected:",
+ r"(ERROR|error).*: .*Application of an update diff failed",
+ r"pacemaker-controld.*:.*not in our membership list",
+ r"CRIT:.*node.*returning after partition"
+ ]
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not CTSTest.is_applicable(self):
return False
return len(self._env["nodes"]) > 2
diff --git a/python/pacemaker/_cts/tests/standbytest.py b/python/pacemaker/_cts/tests/standbytest.py
index 64046b9e22..a9ce8ec3be 100644
--- a/python/pacemaker/_cts/tests/standbytest.py
+++ b/python/pacemaker/_cts/tests/standbytest.py
@@ -1,108 +1,110 @@
""" Put a node into standby mode and check that resources migrate """
__all__ = ["StandbyTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StandbyTest(CTSTest):
""" A concrete tests that puts a node into standby and checks that resources
migrate away from the node
"""
def __init__(self, cm):
""" Create a new StandbyTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "Standby"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
# make sure the node is active
# set the node to standby mode
# check resources, none resource should be running on the node
# set the node to active mode
# check resources, resources should have been migrated back (SHOULD THEY?)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Start all nodes failed")
self.debug("Make sure node %s is active" % node)
if self._cm.in_standby_mode(node):
if not self._cm.set_standby_mode(node, False):
return self.failure("can't set node %s to active mode" % node)
self._cm.cluster_stable()
if self._cm.in_standby_mode(node):
return self.failure("standby status of %s is [on] but we expect [off]" % node)
- watchpats = [ r"State transition .* -> S_POLICY_ENGINE" ]
+ watchpats = [
+ r"State transition .* -> S_POLICY_ENGINE",
+ ]
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self.debug("Setting node %s to standby mode" % node)
if not self._cm.set_standby_mode(node, True):
return self.failure("can't set node %s to standby mode" % node)
self.set_timer("on")
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: %r" % watch.unmatched)
self._cm.set_standby_mode(node, False)
return self.failure("cluster didn't react to standby change on %s" % node)
self._cm.cluster_stable()
if not self._cm.in_standby_mode(node):
return self.failure("standby status of %s is [off] but we expect [on]" % node)
self.log_timer("on")
self.debug("Checking resources")
rscs_on_node = self._cm.active_resources(node)
if rscs_on_node:
rc = self.failure("%s set to standby, %r is still running on it" % (node, rscs_on_node))
self.debug("Setting node %s to active mode" % node)
self._cm.set_standby_mode(node, False)
return rc
self.debug("Setting node %s to active mode" % node)
if not self._cm.set_standby_mode(node, False):
return self.failure("can't set node %s to active mode" % node)
self.set_timer("off")
self._cm.cluster_stable()
if self._cm.in_standby_mode(node):
return self.failure("standby status of %s is [on] but we expect [off]" % node)
self.log_timer("off")
return self.success()
diff --git a/python/pacemaker/_cts/tests/starttest.py b/python/pacemaker/_cts/tests/starttest.py
index 53a347a392..6387511951 100644
--- a/python/pacemaker/_cts/tests/starttest.py
+++ b/python/pacemaker/_cts/tests/starttest.py
@@ -1,54 +1,54 @@
""" Start the cluster manager on a given node """
__all__ = ["StartTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StartTest(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class starts the cluster manager on a given
node.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new StartTest instance
Arguments:
cm -- A ClusterManager instance
"""
- CTSTest.__init__(self,cm)
+ CTSTest.__init__(self, cm)
self.name = "Start"
def __call__(self, node):
""" Start the given node, returning whether this succeeded or not """
self.incr("calls")
if self._cm.upcount() == 0:
self.incr("us")
else:
self.incr("them")
if self._cm.expected_status[node] != "down":
return self.skipped()
if self._cm.start_cm(node):
return self.success()
return self.failure("Startup %s on node %s failed"
% (self._env["Name"], node))
diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py
index e2fa341227..0dce291a8a 100644
--- a/python/pacemaker/_cts/tests/stonithdtest.py
+++ b/python/pacemaker/_cts/tests/stonithdtest.py
@@ -1,139 +1,145 @@
""" Fence a running node and wait for it to restart """
__all__ = ["StonithdTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StonithdTest(CTSTest):
""" A concrete test that fences a running node and waits for it to restart """
def __init__(self, cm):
""" Create a new StonithdTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "Stonithd"
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
if len(self._env["nodes"]) < 2:
return self.skipped()
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
- watchpats = [ self.templates["Pat:Fencing_ok"] % node,
- self.templates["Pat:NodeFenced"] % node ]
+ watchpats = [
+ self.templates["Pat:Fencing_ok"] % node,
+ self.templates["Pat:NodeFenced"] % node,
+ ]
if not self._env["at-boot"]:
self.debug("Expecting %s to stay down" % node)
self._cm.expected_status[node] = "down"
else:
self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"]))
- watchpats.extend([ "%s.* S_STARTING -> S_PENDING" % node,
- "%s.* S_PENDING -> S_NOT_DC" % node ])
+ watchpats.extend([
+ "%s.* S_STARTING -> S_PENDING" % node,
+ "%s.* S_PENDING -> S_NOT_DC" % node,
+ ])
watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
origin = self._env.random_gen.choice(self._env["nodes"])
(rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
if rc == ExitStatus.TIMEOUT:
# Look for the patterns, usually this means the required
# device was running on the node to be fenced - or that
# the required devices were in the process of being loaded
# and/or moved
#
# Effectively the node committed suicide so there will be
# no confirmation, but pacemaker should be watching and
# fence the node again
self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
elif origin != node and rc != 0:
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
elif origin == node and rc != 255:
# 255 == broken pipe, ie. the node was fenced as expected
self._logger.log("Locally originated fencing returned %d" % rc)
with Timer(self._logger, self.name, "fence"):
matched = watch.look_for_all()
self.set_timer("reform")
if watch.unmatched:
self._logger.log("Patterns not found: %r" % watch.unmatched)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected patterns")
if not is_stable:
return self.failure("Cluster did not become stable")
self.log_timer("reform")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
- return [ self.templates["Pat:Fencing_start"] % ".*",
- self.templates["Pat:Fencing_ok"] % ".*",
- self.templates["Pat:Fencing_active"],
- r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ]
+ return [
+ self.templates["Pat:Fencing_start"] % ".*",
+ self.templates["Pat:Fencing_ok"] % ".*",
+ self.templates["Pat:Fencing_active"],
+ r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired"
+ ]
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not CTSTest.is_applicable(self):
return False
# pylint gets confused because of EnvFactory here.
# pylint: disable=unsupported-membership-test
if "DoFencing" in self._env:
return self._env["DoFencing"]
return True
diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py
index 1caa7cd56c..8f496d3263 100644
--- a/python/pacemaker/_cts/tests/stoptest.py
+++ b/python/pacemaker/_cts/tests/stoptest.py
@@ -1,97 +1,99 @@
""" Stop the cluster manager on a given node """
__all__ = ["StopTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.tests.ctstest import CTSTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
# pylint doesn't understand that self._env is subscriptable.
# pylint: disable=unsubscriptable-object
class StopTest(CTSTest):
""" A pseudo-test that is only used to set up conditions before running
some other test. This class stops the cluster manager on a given
node.
Other test classes should not use this one as a superclass.
"""
def __init__(self, cm):
""" Create a new StopTest instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "Stop"
def __call__(self, node):
""" Stop the given node, returning whether this succeeded or not """
self.incr("calls")
if self._cm.expected_status[node] != "up":
return self.skipped()
# Technically we should always be able to notice ourselves stopping
- patterns = [ self.templates["Pat:We_stopped"] % node ]
+ patterns = [
+ self.templates["Pat:We_stopped"] % node,
+ ]
# Any active node needs to notice this one left
# (note that this won't work if we have multiple partitions)
for other in self._env["nodes"]:
if self._cm.expected_status[other] == "up" and other != node:
patterns.append(self.templates["Pat:They_stopped"] %(other, node))
watch = self.create_watch(patterns, self._env["DeadTime"])
watch.set_watch()
if node == self._cm.our_node:
self.incr("us")
else:
if self._cm.upcount() <= 1:
self.incr("all")
else:
self.incr("them")
self._cm.stop_cm(node)
watch.look_for_all()
failreason = None
unmatched_str = "||"
if watch.unmatched:
(_, output) = self._rsh(node, "/bin/ps axf", verbose=1)
for line in output:
self.debug(line)
(_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
for line in output:
self.debug(line)
for regex in watch.unmatched:
- self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex)
- unmatched_str += "%s||" % regex
+ self._logger.log("ERROR: Shutdown pattern not found: %s" % regex)
+ unmatched_str += "%s||" % regex
failreason = "Missing shutdown pattern"
self._cm.cluster_stable(self._env["DeadTime"])
if not watch.unmatched or self._cm.upcount() == 0:
return self.success()
if len(watch.unmatched) >= self._cm.upcount():
return self.failure("no match against (%s)" % unmatched_str)
if failreason is None:
return self.success()
return self.failure(failreason)