diff --git a/python/pacemaker/_cts/tests/ctstest.py b/python/pacemaker/_cts/tests/ctstest.py
index 6f98b50629..54e07cc5ef 100644
--- a/python/pacemaker/_cts/tests/ctstest.py
+++ b/python/pacemaker/_cts/tests/ctstest.py
@@ -1,286 +1,259 @@
""" Base classes for CTS tests """
__all__ = ["CTSTest"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
from pacemaker._cts.audits import AuditConstraint, AuditResource
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.timer import Timer
from pacemaker._cts.watcher import LogWatcher
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class CTSTest:
""" The base class for all cluster tests. This implements a basic set of
properties and behaviors like setup, tear down, time keeping, and
statistics tracking. It is up to specific tests to implement their own
specialized behavior on top of this class.
"""
def __init__(self, cm):
""" Create a new CTSTest instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self.audits = []
self.name = None
self.templates = PatternSelector(cm["Name"])
self.stats = { "auditfail": 0,
"calls": 0,
"failure": 0,
"skipped": 0,
"success": 0 }
self._cm = cm
self._env = EnvFactory().getInstance()
self._r_o2cb = None
self._r_ocfs2 = []
self._rsh = RemoteFactory().getInstance()
self._logger = LogFactory()
self._timers = {}
self.benchmark = True # which tests to benchmark
self.failed = False
self.is_experimental = False
self.is_loop = False
self.is_unsafe = False
self.is_valgrind = False
self.passed = True
def log(self, args):
""" Log a message """
self._logger.log(args)
def debug(self, args):
""" Log a debug message """
self._logger.debug(args)
def get_timer(self, key="test"):
""" Get the start time of the given timer """
try:
return self._timers[key].start_time
except KeyError:
return 0
def set_timer(self, key="test"):
""" Set the start time of the given timer to now, and return
that time
"""
if key not in self._timers:
self._timers[key] = Timer(self._logger, self.name, key)
self._timers[key].start()
return self._timers[key].start_time
def log_timer(self, key="test"):
""" Log the elapsed time of the given timer """
if key not in self._timers:
return
elapsed = self._timers[key].elapsed
self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed))
del self._timers[key]
def incr(self, name):
""" Increment the given stats key """
if name not in self.stats:
self.stats[name] = 0
self.stats[name] += 1
# Reset the test passed boolean
if name == "calls":
self.passed = True
def failure(self, reason="none"):
""" Increment the failure count, with an optional failure reason """
self.passed = False
self.incr("failure")
self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
return False
def success(self):
""" Increment the success count """
self.incr("success")
return True
def skipped(self):
""" Increment the skipped count """
self.incr("skipped")
return True
def __call__(self, node):
""" Perform this test """
raise NotImplementedError
def audit(self):
""" Perform all the relevant audits (see ClusterAudit), returning
whether or not they all passed.
"""
passed = True
for audit in self.audits:
if not audit():
self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name))
self.incr("auditfail")
passed = False
return passed
def setup(self, node):
""" Setup this test """
# node is used in subclasses
# pylint: disable=unused-argument
return self.success()
def teardown(self, node):
""" Tear down this test """
# node is used in subclasses
# pylint: disable=unused-argument
return self.success()
def create_watch(self, patterns, timeout, name=None):
""" Create a new LogWatcher object with the given patterns, timeout,
and optional name. This object can be used to search log files
for matching patterns during this test's run.
"""
if not name:
name = self.name
return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout)
def local_badnews(self, prefix, watch, local_ignore=None):
""" Use the given watch object to search through log files for messages
starting with the given prefix. If no prefix is given, use
"LocalBadNews:" by default. The optional local_ignore list should
be a list of regexes that, if found in a line, will cause that line
to be ignored.
Return the number of matches found.
"""
errcount = 0
if not prefix:
prefix = "LocalBadNews:"
ignorelist = [" CTS: ", prefix]
if local_ignore:
ignorelist += local_ignore
while errcount < 100:
match = watch.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._logger.log("%s %s" % (prefix, match))
errcount += 1
else:
break
else:
self._logger.log("Too many errors!")
watch.end()
return errcount
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
if self.is_loop and not self._env["loop-tests"]:
return False
if self.is_unsafe and not self._env["unsafe-tests"]:
return False
if self.is_valgrind and not self._env["valgrind-tests"]:
return False
if self.is_experimental and not self._env["experimental-tests"]:
return False
if self._env["benchmark"] and not self.benchmark:
return False
return True
- def _find_ocfs2_resources(self, node):
- """ Find any OCFS2 filesystems mounted on the given cluster node,
- populating the internal self._r_ocfs2 list with them and returning
- the number of OCFS2 filesystems.
- """
-
- self._r_o2cb = None
- self._r_ocfs2 = []
-
- (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
- for line in lines:
- if re.search("^Resource", line):
- r = AuditResource(self._cm, line)
-
- if r.rtype == "o2cb" and r.parent != "NA":
- self.debug("Found o2cb: %s" % self._r_o2cb)
- self._r_o2cb = r.parent
-
- if re.search("^Constraint", line):
- c = AuditConstraint(self._cm, line)
-
- if c.type == "rsc_colocation" and c.target == self._r_o2cb:
- self._r_ocfs2.append(c.rsc)
-
- self.debug("Found ocfs2 filesystems: %s" % self._r_ocfs2)
- return len(self._r_ocfs2)
-
def can_run_now(self, node):
""" Return True if we can meaningfully run right now """
# node is used in subclasses
# pylint: disable=unused-argument
return True
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return []
diff --git a/python/pacemaker/_cts/tests/reattach.py b/python/pacemaker/_cts/tests/reattach.py
index d008acdeda..a652e9c34f 100644
--- a/python/pacemaker/_cts/tests/reattach.py
+++ b/python/pacemaker/_cts/tests/reattach.py
@@ -1,182 +1,222 @@
""" Restart the cluster and verify resources remain running """
__all__ = ["Reattach"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
+from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.simulstoplite import SimulStopLite
from pacemaker._cts.tests.starttest import StartTest
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class Reattach(CTSTest):
""" A concrete test that restarts the cluster and verifies that resources
remain running throughout
"""
def __init__(self, cm):
""" Create a new Reattach instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.name = "Reattach"
self._startall = SimulStartLite(cm)
self._stopall = SimulStopLite(cm)
def _is_managed(self, node):
""" Are resources managed by the cluster? """
(_, is_managed) = self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
is_managed = is_managed[0].strip()
return is_managed == "true"
def _set_unmanaged(self, node):
""" Disable resource management """
self.debug("Disable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
def _set_managed(self, node):
""" Enable resource management """
self.debug("Re-enable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
+ def _disable_incompatible_rscs(self, node):
+ """ Disable resources that are incompatible with this test
+
+ Starts and stops of stonith-class resources are implemented internally
+ by Pacemaker, which means that they must stop when Pacemaker is
+ stopped, even if unmanaged. Disable them before running the Reattach
+ test so they don't affect resource placement.
+
+ OCFS2 resources must be disabled too for some reason.
+
+ Set target-role to "Stopped" for any of these resources in the CIB.
+ """
+
+ self.debug("Disable incompatible (stonith/OCFS2) resources")
+ xml = """'
+
+
+
+
+
+ ' --scope rsc_defaults"""
+ return self._rsh(node, self._cm.templates['CibAddXml'] % xml)
+
+ def _enable_incompatible_rscs(self, node):
+ """ Re-enable resources that were incompatible with this test """
+
+ self.debug("Re-enable incompatible (stonith/OCFS2) resources")
+ xml = """"""
+ return self._rsh(node, """cibadmin --delete --xml-text '%s'""" % xml)
+
+ def _reprobe(self, node):
+ """ Reprobe all resources
+
+ The placement of some resources (such as promotable-1 in the
+ lab-generated CIB) is affected by constraints using node-attribute-based
+ rules. An earlier test may have erased the relevant node attribute, so
+ do a reprobe, which should add the attribute back.
+ """
+
+ return self._rsh(node, """crm_resource --refresh""")
+
def setup(self, node):
""" Setup this test """
- attempt = 0
if not self._startall(None):
return self.failure("Startall failed")
- # Make sure we are really _really_ stable and that all
- # resources, including those that depend on transient node
- # attributes, are started
- while not self._cm.cluster_stable(double_check=True):
- if attempt < 5:
- attempt += 1
- self.debug("Not stable yet, re-testing")
- else:
- self._logger.log("Cluster is not stable")
- return self.failure("Cluster is not stable")
+ (rc, _) = self._disable_incompatible_rscs(node)
+ if rc != ExitStatus.OK:
+ return self.failure("Couldn't modify CIB to stop incompatible resources")
+
+ (rc, _) = self._reprobe(node)
+ if rc != ExitStatus.OK:
+ return self.failure("Couldn't reprobe resources")
+
+ if not self._cm.cluster_stable(double_check=True):
+ return self.failure("Cluster did not stabilize after setup")
return self.success()
def teardown(self, node):
""" Tear down this test """
# Make sure 'node' is up
start = StartTest(self._cm)
start(node)
if not self._is_managed(node):
- self._logger.log("Attempting to re-enable resource management on %s" % node)
self._set_managed(node)
- self._cm.cluster_stable()
- if not self._is_managed(node):
- self._logger.log("Could not re-enable resource management")
- return self.failure("Could not re-establish resource management")
+ (rc, _) = self._enable_incompatible_rscs(node)
+ if rc != ExitStatus.OK:
+ return self.failure("Couldn't modify CIB to re-enable incompatible resources")
+
+ if not self._cm.cluster_stable():
+ return self.failure("Cluster did not stabilize after teardown")
+ if not self._is_managed(node):
+ return self.failure("Could not re-enable resource management")
return self.success()
def can_run_now(self, node):
""" Return True if we can meaningfully run right now """
- if self._find_ocfs2_resources(node):
- self._logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present")
- return False
-
return True
def __call__(self, node):
""" Perform this test """
self.incr("calls")
# Conveniently, the scheduler will display this message when disabling
# management, even if fencing is not enabled, so we can rely on it.
managed = self.create_watch(["No fencing will be done"], 60)
managed.set_watch()
self._set_unmanaged(node)
if not managed.look_for_all():
self._logger.log("Patterns not found: %r" % managed.unmatched)
return self.failure("Resource management not disabled")
pats = [ self.templates["Pat:RscOpOK"] % ("start", ".*"),
self.templates["Pat:RscOpOK"] % ("stop", ".*"),
self.templates["Pat:RscOpOK"] % ("promote", ".*"),
self.templates["Pat:RscOpOK"] % ("demote", ".*"),
self.templates["Pat:RscOpOK"] % ("migrate", ".*") ]
watch = self.create_watch(pats, 60, "ShutdownActivity")
watch.set_watch()
self.debug("Shutting down the cluster")
ret = self._stopall(None)
if not ret:
self._set_managed(node)
return self.failure("Couldn't shut down the cluster")
self.debug("Bringing the cluster back up")
ret = self._startall(None)
time.sleep(5) # allow ping to update the CIB
if not ret:
self._set_managed(node)
return self.failure("Couldn't restart the cluster")
if self.local_badnews("ResourceActivity:", watch):
self._set_managed(node)
return self.failure("Resources stopped or started during cluster restart")
watch = self.create_watch(pats, 60, "StartupActivity")
watch.set_watch()
# Re-enable resource management (and verify it happened).
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
# Ignore actions for STONITH resources
ignore = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.debug("Ignoring start actions for %s" % r.id)
ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
if self.local_badnews("ResourceActivity:", watch, ignore):
return self.failure("Resources stopped or started after resource management was re-enabled")
return ret
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"resource( was|s were) active at shutdown" ]