diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py
index acd94b9ae1..9e26d1797e 100644
--- a/cts/lab/CTSscenarios.py
+++ b/cts/lab/CTSscenarios.py
@@ -1,562 +1,562 @@
""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import sys
import time
from pacemaker._cts.audits import ClusterAudit
-from pacemaker._cts.tests.base import CTSTest
+from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.watcher import LogWatcher
class ScenarioComponent(object):
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''Return True if the current ScenarioComponent is applicable
in the given LabEnvironment given to the constructor.
'''
raise ValueError("Abstract Class member (IsApplicable)")
def SetUp(self, CM):
'''Set up the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
def TearDown(self, CM):
'''Tear down (undo) the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
class Scenario(object):
(
'''The basic idea of a scenario is that of an ordered list of
ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn,
and then after the tests have been run, they are torn down using TearDown()
(in reverse order).
A Scenario is applicable to a particular cluster manager iff each
ScenarioComponent is applicable.
A partially set up scenario is torn down if it fails during setup.
''')
def __init__(self, ClusterManager, Components, Audits, Tests):
"Initialize the Scenario from the list of ScenarioComponents"
self.ClusterManager = ClusterManager
self.Components = Components
self.Audits = Audits
self.Tests = Tests
self.BadNews = None
self.TestSets = []
self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0}
self.Sets = []
#self.ns=CTS.NodeStatus(self.Env)
for comp in Components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in Audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in Tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def IsApplicable(self):
(
'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable()
'''
)
for comp in self.Components:
if not comp.IsApplicable():
return None
return True
def SetUp(self):
'''Set up the Scenario. Return TRUE on success.'''
self.ClusterManager.prepare()
self.audit() # Also detects remote/local log config
self.ClusterManager.ns.wait_for_all_nodes(self.ClusterManager.Env["nodes"])
self.audit()
self.ClusterManager.install_support()
self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"],
self.ClusterManager.templates.get_patterns("BadNews"),
self.ClusterManager.Env["nodes"],
self.ClusterManager.Env["LogWatcher"],
"BadNews", 0)
self.BadNews.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self.Components):
if not self.Components[j].SetUp(self.ClusterManager):
# OOPS! We failed. Tear partial setups down.
self.audit()
self.ClusterManager.log("Tearing down partial setup")
self.TearDown(j)
return None
j = j + 1
self.audit()
return 1
def TearDown(self, max=None):
'''Tear Down the Scenario - in reverse order.'''
if max == None:
max = len(self.Components)-1
j = max
while j >= 0:
self.Components[j].TearDown(self.ClusterManager)
j = j - 1
self.audit()
self.ClusterManager.install_support("uninstall")
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
def run(self, Iterations):
self.ClusterManager.oprofileStart()
try:
self.run_loop(Iterations)
self.ClusterManager.oprofileStop()
except:
self.ClusterManager.oprofileStop()
raise
def run_loop(self, Iterations):
raise ValueError("Abstract Class member (run_loop)")
def run_test(self, test, testcount):
nodechoice = self.ClusterManager.Env.random_node()
ret = True
where = ""
did_run = 0
self.ClusterManager.instance_errorstoignore_clear()
self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]")
starttime = test.set_timer()
if not test.setup(nodechoice):
self.ClusterManager.log("Setup failed")
ret = False
elif not test.can_run_now(nodechoice):
self.ClusterManager.log("Skipped")
test.skipped()
else:
did_run = 1
ret = test(nodechoice)
if not test.teardown(nodechoice):
self.ClusterManager.log("Teardown failed")
if self.ClusterManager.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = False
stoptime = time.time()
self.ClusterManager.oprofileSave(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if "min_time" not in test.stats:
test.stats["elapsed_time"] = elapsed_time
test.stats["min_time"] = test_time
test.stats["max_time"] = test_time
else:
test.stats["elapsed_time"] = test.stats["elapsed_time"] + elapsed_time
if test_time < test.stats["min_time"]:
test.stats["min_time"] = test_time
if test_time > test.stats["max_time"]:
test.stats["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self.ClusterManager.statall()
did_run = 1 # Force the test count to be incremented anyway so test extraction works
self.audit(test.errors_to_ignore)
return did_run
def summarize(self):
self.ClusterManager.log("****************")
self.ClusterManager.log("Overall Results:" + repr(self.Stats))
self.ClusterManager.log("****************")
stat_filter = {
"calls":0,
"failure":0,
"skipped":0,
"auditfail":0,
}
self.ClusterManager.log("Test Summary")
for test in self.Tests:
for key in list(stat_filter.keys()):
stat_filter[key] = test.stats[key]
self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter))
self.ClusterManager.debug("Detailed Results")
for test in self.Tests:
self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.stats))
self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, LocalIgnore=[]):
errcount = 0
ignorelist = []
ignorelist.append("CTS:")
ignorelist.extend(LocalIgnore)
ignorelist.extend(self.ClusterManager.errorstoignore())
ignorelist.extend(self.ClusterManager.instance_errorstoignore())
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self.Audits:
if not audit():
self.ClusterManager.log("Audit " + audit.name + " FAILED.")
failed += 1
else:
self.ClusterManager.debug("Audit " + audit.name + " passed.")
while errcount < 1000:
match = None
if self.BadNews:
match = self.BadNews.look(0)
if match:
add_err = 1
for ignore in ignorelist:
if add_err == 1 and re.search(ignore, match):
add_err = 0
if add_err == 1:
self.ClusterManager.log("BadNews: " + match)
self.incr("BadNews")
errcount = errcount + 1
else:
break
else:
if self.ClusterManager.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Big problems. Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
self.ClusterManager.log("Shutting down.")
self.summarize()
self.TearDown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self.BadNews:
self.BadNews.end()
return failed
class AllOnce(Scenario):
'''Every Test Once''' # Accessable as __doc__
def run_loop(self, Iterations):
testcount = 1
for test in self.Tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
'''Random Test Execution'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
test = self.ClusterManager.Env.random_gen.choice(self.Tests)
self.run_test(test, testcount)
testcount += 1
class BasicSanity(Scenario):
'''Basic Cluster Sanity'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
test = self.Environment.random_gen.choice(self.Tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
'''Named Tests in Sequence'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
for test in self.Tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
'''Start the Cluster'''
def run_loop(self, Iterations):
testcount = 0
class BootCluster(ScenarioComponent):
(
'''BootCluster is the most basic of ScenarioComponents.
This ScenarioComponent simply starts the cluster manager on all the nodes.
It is fairly robust as it waits for all nodes to come up before starting
as they might have been rebooted or crashed for some reason beforehand.
''')
def __init__(self, Env):
pass
def IsApplicable(self):
'''BootCluster is so generic it is always Applicable'''
return True
def SetUp(self, CM):
'''Basic Cluster Manager startup. Start everything'''
CM.prepare()
# Clear out the cobwebs ;-)
CM.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all nodes.")
return CM.startall(verbose=True, quick=True)
def TearDown(self, CM, force=False):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Stopping Cluster Manager on all nodes")
return CM.stopall(verbose=True, force=force)
class LeaveBooted(BootCluster):
def TearDown(self, CM):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Leaving Cluster running on all nodes")
return 1
class PingFest(ScenarioComponent):
(
'''PingFest does a flood ping to each node in the cluster from the test machine.
If the LabEnvironment Parameter PingSize is set, it will be used as the size
of ping packet requested (via the -s option). If it is not set, it defaults
to 1024 bytes.
According to the manual page for ping:
Outputs packets as fast as they come back or one hundred times per
second, whichever is more. For every ECHO_REQUEST sent a period ``.''
is printed, while for every ECHO_REPLY received a backspace is printed.
This provides a rapid display of how many packets are being dropped.
Only the super-user may use this option. This can be very hard on a net-
work and should be used with caution.
''' )
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''PingFests are always applicable ;-)
'''
return True
def SetUp(self, CM):
'''Start the PingFest!'''
self.PingSize = 1024
if "PingSize" in list(CM.Env.keys()):
self.PingSize = CM.Env["PingSize"]
CM.log("Starting %d byte flood pings" % self.PingSize)
self.PingPids = []
for node in CM.Env["nodes"]:
self.PingPids.append(self._pingchild(node))
CM.log("Ping PIDs: " + repr(self.PingPids))
return 1
def TearDown(self, CM):
'''Stop it right now! My ears are pinging!!'''
for pid in self.PingPids:
if pid != None:
CM.log("Stopping ping process %d" % pid)
os.kill(pid, signal.SIGKILL)
def _pingchild(self, node):
Args = ["ping", "-qfn", "-s", str(self.PingSize), node]
sys.stdin.flush()
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid < 0:
self.Env.log("Cannot fork ping child")
return None
if pid > 0:
return pid
# Otherwise, we're the child process.
os.execvp("ping", Args)
self.Env.log("Cannot execvp ping: " + repr(Args))
sys.exit(1)
class BasicSanityCheck(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["DoBSC"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on BSC node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on BSC node(s).")
return CM.stopall()
class Benchmark(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["benchmark"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM, force=True)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on all node(s).")
return CM.stopall()
class RollingUpgrade(ScenarioComponent):
(
'''
Test a rolling upgrade between two versions of the stack
''')
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
if not self.Env["rpm-dir"]:
return None
if not self.Env["current-version"]:
return None
if not self.Env["previous-version"]:
return None
return True
def install(self, node, version):
target_dir = "/tmp/rpm-%s" % version
src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version)
self.CM.rsh(node, "mkdir -p %s" % target_dir)
rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir))
self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir))
return self.success()
def upgrade(self, node):
return self.install(node, self.CM.Env["current-version"])
def downgrade(self, node):
return self.install(node, self.CM.Env["previous-version"])
def SetUp(self, CM):
print(repr(self)+"prepare")
CM.prepare()
# Clear out the cobwebs
CM.stopall(force=True)
CM.log("Downgrading all nodes to %s." % self.Env["previous-version"])
for node in self.Env["nodes"]:
if not self.downgrade(node):
CM.log("Couldn't downgrade %s" % node)
return None
return 1
def TearDown(self, CM):
# Stop everything
CM.log("Stopping Cluster Manager on Upgrade nodes.")
CM.stopall()
CM.log("Upgrading all nodes to %s." % self.Env["current-version"])
for node in self.Env["nodes"]:
if not self.upgrade(node):
CM.log("Couldn't upgrade %s" % node)
return None
return 1
diff --git a/cts/lab/CTStests.py b/cts/lab/CTStests.py
index e995252065..5105abe1ce 100644
--- a/cts/lab/CTStests.py
+++ b/cts/lab/CTStests.py
@@ -1,2290 +1,2290 @@
""" Test-specific classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
#
# SPECIAL NOTE:
#
# Tests may NOT implement any cluster-manager-specific code in them.
# EXTEND the ClusterManager object to provide the base capabilities
# the test needs if you need to do something that the current CM classes
# do not. Otherwise you screw up the whole point of the object structure
# in CTS.
#
# Thank you.
#
import os
import re
import time
import tempfile
from stat import *
from pacemaker import BuildOptions
from pacemaker._cts.CTS import NodeStatus
from pacemaker._cts.audits import AuditResource
-from pacemaker._cts.tests.base import CTSTest, RemoteDriver, SimulStartLite, SimulStopLite, StartTest, StopTest
+from pacemaker._cts.tests import CTSTest, RemoteDriver, SimulStartLite, SimulStopLite, StartTest, StopTest
from pacemaker._cts.timer import Timer
AllTestClasses = [ ]
class FlipTest(CTSTest):
'''If it's running, stop it. If it's stopped start it.
Overthrow the status quo...
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Flip"
self._start = StartTest(cm)
self._stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'Flip' test. '''
self.incr("calls")
if self._cm.ShouldBeStatus[node] == "up":
self.incr("stopped")
ret = self._stop(node)
type = "up->down"
# Give the cluster time to recognize it's gone...
time.sleep(self._env["StableTime"])
elif self._cm.ShouldBeStatus[node] == "down":
self.incr("started")
ret = self._start(node)
type = "down->up"
else:
return self.skipped()
self.incr(type)
if ret:
return self.success()
else:
return self.failure("%s failure" % type)
# Register FlipTest as a good test to run
AllTestClasses.append(FlipTest)
class RestartTest(CTSTest):
'''Stop and restart a node'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Restart"
self._start = StartTest(cm)
self._stop = StopTest(cm)
self.benchmark = True
def __call__(self, node):
'''Perform the 'restart' test. '''
self.incr("calls")
self.incr("node:" + node)
ret1 = 1
if self._cm.StataCM(node):
self.incr("WasStopped")
if not self._start(node):
return self.failure("start (setup) failure: "+node)
self.set_timer()
if not self._stop(node):
return self.failure("stop failure: "+node)
if not self._start(node):
return self.failure("start failure: "+node)
return self.success()
# Register RestartTest as a good test to run
AllTestClasses.append(RestartTest)
class StonithdTest(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "Stonithd"
self._startall = SimulStartLite(cm)
self.benchmark = True
def __call__(self, node):
self.incr("calls")
if len(self._env["nodes"]) < 2:
return self.skipped()
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
is_dc = self._cm.is_node_dc(node)
watchpats = []
watchpats.append(self.templates["Pat:Fencing_ok"] % node)
watchpats.append(self.templates["Pat:NodeFenced"] % node)
if not self._env["at-boot"]:
self.debug("Expecting %s to stay down" % node)
self._cm.ShouldBeStatus[node] = "down"
else:
self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"]))
watchpats.append("%s.* S_STARTING -> S_PENDING" % node)
watchpats.append("%s.* S_PENDING -> S_NOT_DC" % node)
watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
origin = self._env.random_gen.choice(self._env["nodes"])
(rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
if rc == 124: # CRM_EX_TIMEOUT
# Look for the patterns, usually this means the required
# device was running on the node to be fenced - or that
# the required devices were in the process of being loaded
# and/or moved
#
# Effectively the node committed suicide so there will be
# no confirmation, but pacemaker should be watching and
# fence the node again
self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
elif origin != node and rc != 0:
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
elif origin == node and rc != 255:
# 255 == broken pipe, ie. the node was fenced as expected
self._logger.log("Locally originated fencing returned %d" % rc)
with Timer(self._logger, self.name, "fence"):
matched = watch.look_for_all()
self.set_timer("reform")
if watch.unmatched:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected patterns")
elif not is_stable:
return self.failure("Cluster did not become stable")
self.log_timer("reform")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ self.templates["Pat:Fencing_start"] % ".*",
self.templates["Pat:Fencing_ok"] % ".*",
self.templates["Pat:Fencing_active"],
r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ]
def is_applicable(self):
if not CTSTest.is_applicable(self):
return False
if "DoFencing" in list(self._env.keys()):
return self._env["DoFencing"]
return True
AllTestClasses.append(StonithdTest)
class StartOnebyOne(CTSTest):
'''Start all the nodes ~ one by one'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "StartOnebyOne"
self.stopall = SimulStopLite(cm)
self._start = StartTest(cm)
self.ns = NodeStatus(cm.Env)
def __call__(self, dummy):
'''Perform the 'StartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Test setup failed")
failed = []
self.set_timer()
for node in self._env["nodes"]:
if not self._start(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to start: " + repr(failed))
return self.success()
# Register StartOnebyOne as a good test to run
AllTestClasses.append(StartOnebyOne)
class SimulStart(CTSTest):
'''Start all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStart"
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStart' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
if not self._startall(None):
return self.failure("Startall failed")
return self.success()
# Register SimulStart as a good test to run
AllTestClasses.append(SimulStart)
class SimulStop(CTSTest):
'''Stop all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStop"
self._startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStop' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
if not self.stopall(None):
return self.failure("Stopall failed")
return self.success()
# Register SimulStop as a good test to run
AllTestClasses.append(SimulStop)
class StopOnebyOne(CTSTest):
'''Stop all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "StopOnebyOne"
self._startall = SimulStartLite(cm)
self._stop = StopTest(cm)
def __call__(self, dummy):
'''Perform the 'StopOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
failed = []
self.set_timer()
for node in self._env["nodes"]:
if not self._stop(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to stop: " + repr(failed))
return self.success()
# Register StopOnebyOne as a good test to run
AllTestClasses.append(StopOnebyOne)
class RestartOnebyOne(CTSTest):
'''Restart all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RestartOnebyOne"
self._startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'RestartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
did_fail = []
self.set_timer()
self.restart = RestartTest(self._cm)
for node in self._env["nodes"]:
if not self.restart(node):
did_fail.append(node)
if did_fail:
return self.failure("Could not restart %d nodes: %s"
% (len(did_fail), repr(did_fail)))
return self.success()
# Register StopOnebyOne as a good test to run
AllTestClasses.append(RestartOnebyOne)
class PartialStart(CTSTest):
'''Start a node - but tell it to stop before it finishes starting up'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "PartialStart"
self._startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
self._stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'PartialStart' test. '''
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
watchpats = []
watchpats.append("pacemaker-controld.*Connecting to .* cluster infrastructure")
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self._cm.StartaCMnoBlock(node)
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
return self.failure("Setup of %s failed" % node)
ret = self._stop(node)
if not ret:
return self.failure("%s did not stop in time" % node)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# We might do some fencing in the 2-node case if we make it up far enough
return [ r"Executing reboot fencing operation",
r"Requesting fencing \([^)]+\) targeting node " ]
# Register StopOnebyOne as a good test to run
AllTestClasses.append(PartialStart)
class StandbyTest(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Standby"
self.benchmark = True
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
# make sure the node is active
# set the node to standby mode
# check resources, none resource should be running on the node
# set the node to active mode
# check resouces, resources should have been migrated back (SHOULD THEY?)
def __call__(self, node):
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Start all nodes failed")
self.debug("Make sure node %s is active" % node)
if self._cm.StandbyStatus(node) != "off":
if not self._cm.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self._cm.cluster_stable()
status = self._cm.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.debug("Getting resources running on node %s" % node)
rsc_on_node = self._cm.active_resources(node)
watchpats = []
watchpats.append(r"State transition .* -> S_POLICY_ENGINE")
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self.debug("Setting node %s to standby mode" % node)
if not self._cm.SetStandbyMode(node, "on"):
return self.failure("can't set node %s to standby mode" % node)
self.set_timer("on")
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
self._cm.SetStandbyMode(node, "off")
return self.failure("cluster didn't react to standby change on %s" % node)
self._cm.cluster_stable()
status = self._cm.StandbyStatus(node)
if status != "on":
return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status))
self.log_timer("on")
self.debug("Checking resources")
bad_run = self._cm.active_resources(node)
if len(bad_run) > 0:
rc = self.failure("%s set to standby, %s is still running on it" % (node, repr(bad_run)))
self.debug("Setting node %s to active mode" % node)
self._cm.SetStandbyMode(node, "off")
return rc
self.debug("Setting node %s to active mode" % node)
if not self._cm.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self.set_timer("off")
self._cm.cluster_stable()
status = self._cm.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.log_timer("off")
return self.success()
AllTestClasses.append(StandbyTest)
class ValgrindTest(CTSTest):
'''Check for memory leaks'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Valgrind"
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
self.is_valgrind = True
self.is_loop = True
def setup(self, node):
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Stop all nodes failed")
# @TODO Edit /etc/sysconfig/pacemaker on all nodes to enable valgrind,
# and clear any valgrind logs from previous runs. For now, we rely on
# the user to do this manually.
ret = self._startall(None)
if not ret:
return self.failure("Start all nodes failed")
return self.success()
def teardown(self, node):
# Return all nodes to normal
# @TODO Edit /etc/sysconfig/pacemaker on all nodes to disable valgrind
ret = self.stopall(None)
if not ret:
return self.failure("Stop all nodes failed")
return self.success()
def find_leaks(self):
# Check for leaks
# (no longer used but kept in case feature is restored)
leaked = []
self._stop = StopTest(self._cm)
for node in self._env["nodes"]:
rc = self._stop(node)
if not rc:
self.failure("Couldn't shut down %s" % node)
(rc, _) = self._rsh(node, "grep -e indirectly.*lost:.*[1-9] -e definitely.*lost:.*[1-9] -e (ERROR|error).*SUMMARY:.*[1-9].*errors %s" % self._logger.logPat)
if rc != 1:
leaked.append(node)
self.failure("Valgrind errors detected on %s" % node)
(_, output) = self._rsh(node, "grep -e lost: -e SUMMARY: %s" % self._logger.logPat, verbose=1)
for line in output:
self._logger.log(line)
(_, output) = self._rsh(node, "cat %s" % self._logger.logPat, verbose=1)
for line in output:
self.debug(line)
self._rsh(node, "rm -f %s" % self._logger.logPat, verbose=1)
return leaked
def __call__(self, node):
#leaked = self.find_leaks()
#if len(leaked) > 0:
# return self.failure("Nodes %s leaked" % repr(leaked))
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"pacemaker-based.*: \*\*\*\*\*\*\*\*\*\*\*\*\*",
r"pacemaker-based.*: .* avoid confusing Valgrind",
r"HA_VALGRIND_ENABLED" ]
class StandbyLoopTest(ValgrindTest):
'''Check for memory leaks by putting a node in and out of standby for an hour'''
# @TODO This is not a useful test for memory leaks
def __init__(self, cm):
ValgrindTest.__init__(self,cm)
self.name = "StandbyLoop"
def __call__(self, node):
lpc = 0
delay = 2
failed = 0
done = time.time() + self._env["loop-minutes"] * 60
while time.time() <= done and not failed:
lpc = lpc + 1
time.sleep(delay)
if not self._cm.SetStandbyMode(node, "on"):
self.failure("can't set node %s to standby mode" % node)
failed = lpc
time.sleep(delay)
if not self._cm.SetStandbyMode(node, "off"):
self.failure("can't set node %s to active mode" % node)
failed = lpc
leaked = self.find_leaks()
if failed:
return self.failure("Iteration %d failed" % failed)
elif len(leaked) > 0:
return self.failure("Nodes %s leaked" % repr(leaked))
return self.success()
#AllTestClasses.append(StandbyLoopTest)
class BandwidthTest(CTSTest):
# Tests should not be cluster-manager-specific
# If you need to find out cluster manager configuration to do this, then
# it should be added to the generic cluster manager API.
'''Test the bandwidth which the cluster uses'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.stats["min"] = 0
self.stats["max"] = 0
self.stats["totalbandwidth"] = 0
self.name = "Bandwidth"
self._start = StartTest(cm)
(handle, self.tempfile) = tempfile.mkstemp(".cts")
os.close(handle)
self._startall = SimulStartLite(cm)
def __call__(self, node):
'''Perform the Bandwidth test'''
self.incr("calls")
if self._cm.upcount() < 1:
return self.skipped()
Path = self._cm.InternalCommConfig()
if "ip" not in Path["mediatype"]:
return self.skipped()
port = Path["port"][0]
port = int(port)
ret = self._startall(None)
if not ret:
return self.failure("Test setup failed")
time.sleep(5) # We get extra messages right after startup.
fstmpfile = "/var/run/band_estimate"
dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \
% (port, fstmpfile)
(rc, _) = self._rsh(node, dumpcmd)
if rc == 0:
farfile = "root@%s:%s" % (node, fstmpfile)
self._rsh.copy(farfile, self.tempfile)
Bandwidth = self.countbandwidth(self.tempfile)
if not Bandwidth:
self._logger.log("Could not compute bandwidth.")
return self.success()
intband = int(Bandwidth + 0.5)
self._logger.log("...bandwidth: %d bits/sec" % intband)
self.stats["totalbandwidth"] += Bandwidth
if self.stats["min"] == 0:
self.stats["min"] = Bandwidth
if Bandwidth > self.stats["max"]:
self.stats["max"] = Bandwidth
if Bandwidth < self.stats["min"]:
self.stats["min"] = Bandwidth
self._rsh(node, "rm -f %s" % fstmpfile)
os.unlink(self.tempfile)
return self.success()
else:
return self.failure("no response from tcpdump command [%d]!" % rc)
def countbandwidth(self, file):
fp = open(file, "r")
fp.seek(0)
count = 0
sum = 0
while 1:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count + 1
linesplit = line.split(" ")
for j in range(len(linesplit)-1):
if linesplit[j] == "udp": break
if linesplit[j] == "length:": break
try:
sum = sum + int(linesplit[j+1])
except ValueError:
self._logger.log("Invalid tcpdump line: %s" % line)
return None
T1 = linesplit[0]
timesplit = T1.split(":")
time2split = timesplit[2].split(".")
time1 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
break
while count < 100:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count+1
linessplit = line.split(" ")
for j in range(len(linessplit)-1):
if linessplit[j] == "udp": break
if linessplit[j] == "length:": break
try:
sum = int(linessplit[j+1]) + sum
except ValueError:
self._logger.log("Invalid tcpdump line: %s" % line)
return None
T2 = linessplit[0]
timesplit = T2.split(":")
time2split = timesplit[2].split(".")
time2 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
time = time2-time1
if (time <= 0):
return 0
return int((sum*8)/time)
def is_applicable(self):
'''BandwidthTest never applicable'''
return False
AllTestClasses.append(BandwidthTest)
###################################################################
class MaintenanceMode(CTSTest):
###################################################################
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "MaintenanceMode"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.max = 30
self.benchmark = True
self.action = "asyncmon"
self.interval = 0
self.rid = "maintenanceDummy"
def toggleMaintenanceMode(self, node, action):
pats = []
pats.append(self.templates["Pat:DC_IDLE"])
# fail the resource right after turning Maintenance mode on
# verify it is not recovered until maintenance mode is turned off
if action == "On":
pats.append(self.templates["Pat:RscOpFail"] % (self.action, self.rid))
else:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
watch = self.create_watch(pats, 60)
watch.set_watch()
self.debug("Turning maintenance mode %s" % action)
self._rsh(node, self.templates["MaintenanceMode%s" % (action)])
if (action == "On"):
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
with Timer(self._logger, self.name, "recover%s" % action):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when turning maintenance mode %s" % action)
return repr(watch.unmatched)
return ""
def insertMaintenanceDummy(self, node):
pats = []
pats.append(("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self.rid)))
watch = self.create_watch(pats, 60)
watch.set_watch()
self._cm.AddDummyRsc(node, self.rid)
with Timer(self._logger, self.name, "addDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when adding maintenance dummy resource")
return repr(watch.unmatched)
return ""
def removeMaintenanceDummy(self, node):
pats = []
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
watch = self.create_watch(pats, 60)
watch.set_watch()
self._cm.RemoveDummyRsc(node, self.rid)
with Timer(self._logger, self.name, "removeDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when removing maintenance dummy resource")
return repr(watch.unmatched)
return ""
def managedRscList(self, node):
rscList = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if tmp.managed:
rscList.append(tmp.id)
return rscList
def verifyResources(self, node, rscList, managed):
managedList = list(rscList)
managed_str = "managed"
if not managed:
managed_str = "unmanaged"
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if managed and not tmp.managed:
continue
elif not managed and tmp.managed:
continue
elif managedList.count(tmp.id):
managedList.remove(tmp.id)
if len(managedList) == 0:
self.debug("Found all %s resources on %s" % (managed_str, node))
return True
self._logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managedList))
return False
def __call__(self, node):
'''Perform the 'MaintenanceMode' test. '''
self.incr("calls")
verify_managed = False
verify_unmanaged = False
failPat = ""
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
# get a list of all the managed resources. We use this list
# after enabling maintenance mode to verify all managed resources
# become un-managed. After maintenance mode is turned off, we use
# this list to verify all the resources become managed again.
managedResources = self.managedRscList(node)
if len(managedResources) == 0:
self._logger.log("No managed resources on %s" % node)
return self.skipped()
# insert a fake resource we can fail during maintenance mode
# so we can verify recovery does not take place until after maintenance
# mode is disabled.
failPat = failPat + self.insertMaintenanceDummy(node)
# toggle maintenance mode ON, then fail dummy resource.
failPat = failPat + self.toggleMaintenanceMode(node, "On")
# verify all the resources are now unmanaged
if self.verifyResources(node, managedResources, False):
verify_unmanaged = True
# Toggle maintenance mode OFF, verify dummy is recovered.
failPat = failPat + self.toggleMaintenanceMode(node, "Off")
# verify all the resources are now managed again
if self.verifyResources(node, managedResources, True):
verify_managed = True
# Remove our maintenance dummy resource.
failPat = failPat + self.removeMaintenanceDummy(node)
self._cm.cluster_stable()
if failPat != "":
return self.failure("Unmatched patterns: %s" % (failPat))
elif verify_unmanaged is False:
return self.failure("Failed to verify resources became unmanaged during maintenance mode")
elif verify_managed is False:
return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for %s" % self.rid,
r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self.rid,
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self.action, self.rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval) ]
AllTestClasses.append(MaintenanceMode)
class ResourceRecover(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "ResourceRecover"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.max = 30
self.rid = None
self.rid_alt = None
self.benchmark = True
# these are the values used for the new LRM API call
self.action = "asyncmon"
self.interval = 0
def __call__(self, node):
'''Perform the 'ResourceRecover' test. '''
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self._cm.active_resources(node)
if len(resourcelist) == 0:
self._logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self.choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self.rid)
if rsc.id == rsc.clone_id:
self.debug("Failing " + rsc.id)
else:
self.debug("Failing " + rsc.id + " (also known as " + rsc.clone_id + ")")
# Log patterns to watch for (failure, plus restart if managed)
pats = []
pats.append(self.templates["Pat:CloneOpFail"] % (self.action, rsc.id, rsc.clone_id))
if rsc.managed:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
if rsc.unique:
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self.fail_resource(rsc, node, pats) is None:
return None # self.failure() already called
return self.success()
def choose_resource(self, node, resourcelist):
""" Choose a random resource to target """
self.rid = self._env.random_gen.choice(resourcelist)
self.rid_alt = self.rid
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self._cm, line)
if rsc.id == self.rid:
# Handle anonymous clones that get renamed
self.rid = rsc.clone_id
return rsc
return None
def get_failcount(self, node):
""" Check the fail count of targeted resource on given node """
(rc, lines) = self._rsh(node,
"crm_failcount --quiet --query --resource %s "
"--operation %s --interval %d "
"--node %s" % (self.rid, self.action,
self.interval, node), verbose=1)
if rc != 0 or len(lines) != 1:
self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc,
" // ".join(map(str.strip, lines))))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self._logger.log("crm_failcount output on %s unparseable: %s" % (node,
' '.join(lines)))
return -1
return failcount
def fail_resource(self, rsc, node, pats):
""" Fail the targeted resource, and verify as expected """
orig_failcount = self.get_failcount(node)
watch = self.create_watch(pats, 60)
watch.set_watch()
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
with Timer(self._logger, self.name, "recover"):
watch.look_for_all()
self._cm.cluster_stable()
recovered = self._cm.ResourceLocation(self.rid)
if watch.unmatched:
return self.failure("Patterns not found: %s" % repr(watch.unmatched))
elif rsc.unique and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %s"%(self.rid, repr(recovered)))
elif len(recovered) > 0:
self.debug("%s is running on: %s" % (self.rid, repr(recovered)))
elif rsc.managed:
return self.failure("%s was not recovered and is inactive" % self.rid)
new_failcount = self.get_failcount(node)
if new_failcount != (orig_failcount + 1):
return self.failure("%s fail count is %d not %d" % (self.rid,
new_failcount, orig_failcount + 1))
return 0 # Anything but None is success
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for %s" % self.rid,
r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self.rid, self.rid_alt),
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self.action, self.rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval) ]
AllTestClasses.append(ResourceRecover)
class ComponentFail(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "ComponentFail"
self._startall = SimulStartLite(cm)
self.complist = cm.Components()
self.patterns = []
self.okerrpatterns = []
self.is_unsafe = True
def __call__(self, node):
'''Perform the 'ComponentFail' test. '''
self.incr("calls")
self.patterns = []
self.okerrpatterns = []
# start all nodes
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
if not self._cm.cluster_stable(self._env["StableTime"]):
return self.failure("Setup failed - unstable")
node_is_dc = self._cm.is_node_dc(node, None)
# select a component to kill
chosen = self._env.random_gen.choice(self.complist)
while chosen.dc_only and node_is_dc == 0:
chosen = self._env.random_gen.choice(self.complist)
self.debug("...component %s (dc=%d)" % (chosen.name, node_is_dc))
self.incr(chosen.name)
if chosen.name != "corosync":
self.patterns.append(self.templates["Pat:ChildKilled"] %(node, chosen.name))
self.patterns.append(self.templates["Pat:ChildRespawn"] %(node, chosen.name))
self.patterns.extend(chosen.pats)
if node_is_dc:
self.patterns.extend(chosen.dc_pats)
# @TODO this should be a flag in the Component
if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]:
# Ignore actions for fence devices if fencer will respawn
# (their registration will be lost, and probes will fail)
self.okerrpatterns = [ self.templates["Pat:Fencing_active"] ]
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.okerrpatterns.append(self.templates["Pat:Fencing_recover"] % r.id)
self.okerrpatterns.append(self.templates["Pat:Fencing_probe"] % r.id)
# supply a copy so self.patterns doesn't end up empty
tmpPats = []
tmpPats.extend(self.patterns)
self.patterns.extend(chosen.badnews_ignore)
# Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
stonithPats = []
stonithPats.append(self.templates["Pat:Fencing_ok"] % node)
stonith = self.create_watch(stonithPats, 0)
stonith.set_watch()
# set the watch for stable
watch = self.create_watch(
tmpPats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
# kill the component
chosen.kill(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for any fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
self._cm.cluster_stable(self._env["StartTime"])
self.debug("Checking if %s was shot" % node)
shot = stonith.look(60)
if shot:
self.debug("Found: " + repr(shot))
self.okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
if not self._env["at-boot"]:
self._cm.ShouldBeStatus[node] = "down"
# If fencing occurred, chances are many (if not all) the expected logs
# will not be sent - or will be lost when the node reboots
return self.success()
# check for logs indicating a graceful recovery
matched = watch.look_for_all(allow_multiple_matches=True)
if watch.unmatched:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected %s patterns" % chosen.name)
elif not is_stable:
return self.failure("Cluster did not become stable after killing %s" % chosen.name)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Note that okerrpatterns refers to the last time we ran this test
# The good news is that this works fine for us...
self.okerrpatterns.extend(self.patterns)
return self.okerrpatterns
AllTestClasses.append(ComponentFail)
class SplitBrainTest(CTSTest):
'''It is used to test split-brain. when the path between the two nodes break
check the two nodes both take over the resource'''
def __init__(self,cm):
CTSTest.__init__(self,cm)
self.name = "SplitBrain"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.is_experimental = True
def isolate_partition(self, partition):
other_nodes = []
other_nodes.extend(self._env["nodes"])
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self._logger.log("Node "+node+" not in " + repr(self._env["nodes"]) + " from " +repr(partition))
if len(other_nodes) == 0:
return 1
self.debug("Creating partition: " + repr(partition))
self.debug("Everyone else: " + repr(other_nodes))
for node in partition:
if not self._cm.isolate_node(node, other_nodes):
self._logger.log("Could not isolate %s" % node)
return 0
return 1
def heal_partition(self, partition):
other_nodes = []
other_nodes.extend(self._env["nodes"])
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self._logger.log("Node "+node+" not in " + repr(self._env["nodes"]))
if len(other_nodes) == 0:
return 1
self.debug("Healing partition: " + repr(partition))
self.debug("Everyone else: " + repr(other_nodes))
for node in partition:
self._cm.unisolate_node(node, other_nodes)
def __call__(self, node):
'''Perform split-brain test'''
self.incr("calls")
self.passed = True
partitions = {}
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
while 1:
# Retry until we get multiple partitions
partitions = {}
p_max = len(self._env["nodes"])
for node in self._env["nodes"]:
p = self._env.random_gen.randint(1, p_max)
if not p in partitions:
partitions[p] = []
partitions[p].append(node)
p_max = len(list(partitions.keys()))
if p_max > 1:
break
# else, try again
self.debug("Created %d partitions" % p_max)
for key in list(partitions.keys()):
self.debug("Partition["+str(key)+"]:\t"+repr(partitions[key]))
# Disabling STONITH to reduce test complexity for now
self._rsh(node, "crm_attribute -V -n stonith-enabled -v false")
for key in list(partitions.keys()):
self.isolate_partition(partitions[key])
count = 30
while count > 0:
if len(self._cm.find_partitions()) != p_max:
time.sleep(10)
else:
break
else:
self.failure("Expected partitions were not created")
# Target number of partitions formed - wait for stability
if not self._cm.cluster_stable():
self.failure("Partitioned cluster not stable")
# Now audit the cluster state
self._cm.partitions_expected = p_max
if not self.audit():
self.failure("Audits failed")
self._cm.partitions_expected = 1
# And heal them again
for key in list(partitions.keys()):
self.heal_partition(partitions[key])
# Wait for a single partition to form
count = 30
while count > 0:
if len(self._cm.find_partitions()) != 1:
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not reform")
# Wait for it to have the right number of members
count = 30
while count > 0:
members = []
partitions = self._cm.find_partitions()
if len(partitions) > 0:
members = partitions[0].split()
if len(members) != len(self._env["nodes"]):
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not completely reform")
# Wait up to 20 minutes - the delay is more preferable than
# trying to continue with in a messed up state
if not self._cm.cluster_stable(1200):
self.failure("Reformed cluster not stable")
if self._env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Reformed cluster not stable")
# Turn fencing back on
if self._env["DoFencing"]:
self._rsh(node, "crm_attribute -V -D -n stonith-enabled")
self._cm.cluster_stable()
if self.passed:
return self.success()
return self.failure("See previous errors")
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Another DC detected:",
r"(ERROR|error).*: .*Application of an update diff failed",
r"pacemaker-controld.*:.*not in our membership list",
r"CRIT:.*node.*returning after partition" ]
def is_applicable(self):
if not CTSTest.is_applicable(self):
return False
return len(self._env["nodes"]) > 2
AllTestClasses.append(SplitBrainTest)
class Reattach(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Reattach"
self._startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
self.is_unsafe = False
def _is_managed(self, node):
(_, is_managed) = self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
is_managed = is_managed[0].strip()
return is_managed == "true"
def _set_unmanaged(self, node):
self.debug("Disable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
def _set_managed(self, node):
self.debug("Re-enable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
def setup(self, node):
attempt = 0
if not self._startall(None):
return None
# Make sure we are really _really_ stable and that all
# resources, including those that depend on transient node
# attributes, are started
while not self._cm.cluster_stable(double_check=True):
if attempt < 5:
attempt += 1
self.debug("Not stable yet, re-testing")
else:
self._logger.log("Cluster is not stable")
return None
return 1
def teardown(self, node):
# Make sure 'node' is up
start = StartTest(self._cm)
start(node)
if not self._is_managed(node):
self._logger.log("Attempting to re-enable resource management on %s" % node)
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
self._logger.log("Could not re-enable resource management")
return 0
return 1
def can_run_now(self, node):
""" Return True if we can meaningfully run right now"""
if self._find_ocfs2_resources(node):
self._logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present")
return False
return True
def __call__(self, node):
self.incr("calls")
pats = []
# Conveniently, the scheduler will display this message when disabling
# management, even if fencing is not enabled, so we can rely on it.
managed = self.create_watch(["No fencing will be done"], 60)
managed.set_watch()
self._set_unmanaged(node)
if not managed.look_for_all():
self._logger.log("Patterns not found: " + repr(managed.unmatched))
return self.failure("Resource management not disabled")
pats = []
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("stop", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("promote", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("demote", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("migrate", ".*"))
watch = self.create_watch(pats, 60, "ShutdownActivity")
watch.set_watch()
self.debug("Shutting down the cluster")
ret = self.stopall(None)
if not ret:
self._set_managed(node)
return self.failure("Couldn't shut down the cluster")
self.debug("Bringing the cluster back up")
ret = self._startall(None)
time.sleep(5) # allow ping to update the CIB
if not ret:
self._set_managed(node)
return self.failure("Couldn't restart the cluster")
if self.local_badnews("ResourceActivity:", watch):
self._set_managed(node)
return self.failure("Resources stopped or started during cluster restart")
watch = self.create_watch(pats, 60, "StartupActivity")
watch.set_watch()
# Re-enable resource management (and verify it happened).
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
# Ignore actions for STONITH resources
ignore = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.debug("Ignoring start actions for %s" % r.id)
ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
if self.local_badnews("ResourceActivity:", watch, ignore):
return self.failure("Resources stopped or started after resource management was re-enabled")
return ret
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"resource( was|s were) active at shutdown" ]
def is_applicable(self):
return True
AllTestClasses.append(Reattach)
class SpecialTest1(CTSTest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SpecialTest1"
self._startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, node):
'''Perform the 'SpecialTest1' test for Andrew. '''
self.incr("calls")
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Could not stop all nodes")
# Test config recovery when the other nodes come up
self._rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*")
# Start the selected node
ret = self.restart1(node)
if not ret:
return self.failure("Could not start "+node)
# Start all remaining nodes
ret = self._startall(None)
if not ret:
return self.failure("Could not start the remaining nodes")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Errors that occur as a result of the CIB being wiped
return [ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
r"error.*: Resource start-up disabled since no STONITH resources have been defined",
r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity" ]
AllTestClasses.append(SpecialTest1)
class HAETest(CTSTest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "HAETest"
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
self.is_loop = True
def setup(self, node):
# Start all remaining nodes
ret = self._startall(None)
if not ret:
return self.failure("Couldn't start all nodes")
return self.success()
def teardown(self, node):
# Stop everything
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
return self.success()
def wait_on_state(self, node, resource, expected_clones, attempts=240):
while attempts > 0:
active = 0
(rc, lines) = self._rsh(node, "crm_resource -r %s -W -Q" % resource, verbose=1)
# Hack until crm_resource does the right thing
if rc == 0 and lines:
active = len(lines)
if len(lines) == expected_clones:
return 1
elif rc == 1:
self.debug("Resource %s is still inactive" % resource)
elif rc == 234:
self._logger.log("Unknown resource %s" % resource)
return 0
elif rc == 246:
self._logger.log("Cluster is inactive")
return 0
elif rc != 0:
self._logger.log("Call to crm_resource failed, rc=%d" % rc)
return 0
else:
self.debug("Resource %s is active on %d times instead of %d" % (resource, active, expected_clones))
attempts -= 1
time.sleep(1)
return 0
def find_dlm(self, node):
self.r_dlm = None
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rtype == "controld" and r.parent != "NA":
self.debug("Found dlm: %s" % self.r_dlm)
self.r_dlm = r.parent
return 1
return 0
def find_hae_resources(self, node):
self.r_dlm = None
self._r_o2cb = None
self._r_ocfs2 = []
if self.find_dlm(node):
self._find_ocfs2_resources(node)
def is_applicable(self):
if not CTSTest.is_applicable(self):
return False
if self._env["Schema"] == "hae":
return True
return None
class HAERoleTest(HAETest):
def __init__(self, cm):
'''Lars' mount/unmount test for the HA extension. '''
HAETest.__init__(self,cm)
self.name = "HAERoleTest"
def change_state(self, node, resource, target):
(rc, _) = self._rsh(node, "crm_resource -V -r %s -p target-role -v %s --meta" % (resource, target))
return rc
def __call__(self, node):
self.incr("calls")
lpc = 0
failed = 0
delay = 2
done = time.time() + self._env["loop-minutes"]*60
self.find_hae_resources(node)
clone_max = len(self._env["nodes"])
while time.time() <= done and not failed:
lpc = lpc + 1
self.change_state(node, self.r_dlm, "Stopped")
if not self.wait_on_state(node, self.r_dlm, 0):
self.failure("%s did not go down correctly" % self.r_dlm)
failed = lpc
self.change_state(node, self.r_dlm, "Started")
if not self.wait_on_state(node, self.r_dlm, clone_max):
self.failure("%s did not come up correctly" % self.r_dlm)
failed = lpc
if not self.wait_on_state(node, self._r_o2cb, clone_max):
self.failure("%s did not come up correctly" % self._r_o2cb)
failed = lpc
for fs in self._r_ocfs2:
if not self.wait_on_state(node, fs, clone_max):
self.failure("%s did not come up correctly" % fs)
failed = lpc
if failed:
return self.failure("iteration %d failed" % failed)
return self.success()
AllTestClasses.append(HAERoleTest)
class HAEStandbyTest(HAETest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
HAETest.__init__(self,cm)
self.name = "HAEStandbyTest"
def change_state(self, node, resource, target):
(rc, _) = self._rsh(node, "crm_standby -V -l reboot -v %s" % (target))
return rc
def __call__(self, node):
self.incr("calls")
lpc = 0
failed = 0
done = time.time() + self._env["loop-minutes"]*60
self.find_hae_resources(node)
clone_max = len(self._env["nodes"])
while time.time() <= done and not failed:
lpc = lpc + 1
self.change_state(node, self.r_dlm, "true")
if not self.wait_on_state(node, self.r_dlm, clone_max-1):
self.failure("%s did not go down correctly" % self.r_dlm)
failed = lpc
self.change_state(node, self.r_dlm, "false")
if not self.wait_on_state(node, self.r_dlm, clone_max):
self.failure("%s did not come up correctly" % self.r_dlm)
failed = lpc
if not self.wait_on_state(node, self._r_o2cb, clone_max):
self.failure("%s did not come up correctly" % self._r_o2cb)
failed = lpc
for fs in self._r_ocfs2:
if not self.wait_on_state(node, fs, clone_max):
self.failure("%s did not come up correctly" % fs)
failed = lpc
if failed:
return self.failure("iteration %d failed" % failed)
return self.success()
AllTestClasses.append(HAEStandbyTest)
class NearQuorumPointTest(CTSTest):
'''
This test brings larger clusters near the quorum point (50%).
In addition, it will test doing starts and stops at the same time.
Here is how I think it should work:
- loop over the nodes and decide randomly which will be up and which
will be down Use a 50% probability for each of up/down.
- figure out what to do to get into that state from the current state
- in parallel, bring up those going up and bring those going down.
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "NearQuorumPoint"
def __call__(self, dummy):
'''Perform the 'NearQuorumPoint' test. '''
self.incr("calls")
startset = []
stopset = []
stonith = self._cm.prepare_fencing_watcher("NearQuorumPoint")
#decide what to do with each node
for node in self._env["nodes"]:
action = self._env.random_gen.choice(["start","stop"])
#action = self._env.random_gen.choice(["start","stop","no change"])
if action == "start" :
startset.append(node)
elif action == "stop" :
stopset.append(node)
self.debug("start nodes:" + repr(startset))
self.debug("stop nodes:" + repr(stopset))
#add search patterns
watchpats = [ ]
for node in stopset:
if self._cm.ShouldBeStatus[node] == "up":
watchpats.append(self.templates["Pat:We_stopped"] % node)
for node in startset:
if self._cm.ShouldBeStatus[node] == "down":
#watchpats.append(self.templates["Pat:NonDC_started"] % node)
watchpats.append(self.templates["Pat:Local_started"] % node)
else:
for stopping in stopset:
if self._cm.ShouldBeStatus[stopping] == "up":
watchpats.append(self.templates["Pat:They_stopped"] % (node, self._cm.key_for_node(stopping)))
if len(watchpats) == 0:
return self.skipped()
if len(startset) != 0:
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
#begin actions
for node in stopset:
if self._cm.ShouldBeStatus[node] == "up":
self._cm.StopaCMnoBlock(node)
for node in startset:
if self._cm.ShouldBeStatus[node] == "down":
self._cm.StartaCMnoBlock(node)
#get the result
if watch.look_for_all():
self._cm.cluster_stable()
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
return self.success()
self._logger.log("Warn: Patterns not found: " + repr(watch.unmatched))
#get the "bad" nodes
upnodes = []
for node in stopset:
if self._cm.StataCM(node) == 1:
upnodes.append(node)
downnodes = []
for node in startset:
if self._cm.StataCM(node) == 0:
downnodes.append(node)
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
if upnodes == [] and downnodes == []:
self._cm.cluster_stable()
# Make sure they're completely down with no residule
for node in stopset:
self._rsh(node, self.templates["StopCmd"])
return self.success()
if len(upnodes) > 0:
self._logger.log("Warn: Unstoppable nodes: " + repr(upnodes))
if len(downnodes) > 0:
self._logger.log("Warn: Unstartable nodes: " + repr(downnodes))
return self.failure()
def is_applicable(self):
return True
AllTestClasses.append(NearQuorumPointTest)
class RollingUpgradeTest(CTSTest):
'''Perform a rolling upgrade of the cluster'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RollingUpgrade"
self._start = StartTest(cm)
self._stop = StopTest(cm)
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
def setup(self, node):
# Start all remaining nodes
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
for node in self._env["nodes"]:
if not self.downgrade(node, None):
return self.failure("Couldn't downgrade %s" % node)
ret = self._startall(None)
if not ret:
return self.failure("Couldn't start all nodes")
return self.success()
def teardown(self, node):
# Stop everything
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
for node in self._env["nodes"]:
if not self.upgrade(node, None):
return self.failure("Couldn't upgrade %s" % node)
return self.success()
def install(self, node, version, start=1, flags="--force"):
target_dir = "/tmp/rpm-%s" % version
src_dir = "%s/%s" % (self._env["rpm-dir"], version)
self._logger.log("Installing %s on %s with %s" % (version, node, flags))
if not self._stop(node):
return self.failure("stop failure: "+node)
self._rsh(node, "mkdir -p %s" % target_dir)
self._rsh(node, "rm -f %s/*.rpm" % target_dir)
(_, lines) = self._rsh(node, "ls -1 %s/*.rpm" % src_dir, verbose=1)
for line in lines:
line = line[:-1]
rc = self._rsh.copy("%s" % (line), "%s:%s/" % (node, target_dir))
self._rsh(node, "rpm -Uvh %s %s/*.rpm" % (flags, target_dir))
if start and not self._start(node):
return self.failure("start failure: "+node)
return self.success()
def upgrade(self, node, start=1):
return self.install(node, self._env["current-version"], start)
def downgrade(self, node, start=1):
return self.install(node, self._env["previous-version"], start, "--force --nodeps")
def __call__(self, node):
'''Perform the 'Rolling Upgrade' test. '''
self.incr("calls")
for node in self._env["nodes"]:
if self.upgrade(node):
return self.failure("Couldn't upgrade %s" % node)
self._cm.cluster_stable()
return self.success()
def is_applicable(self):
if not CTSTest.is_applicable(self):
return None
if not "rpm-dir" in list(self._env.keys()):
return None
if not "current-version" in list(self._env.keys()):
return None
if not "previous-version" in list(self._env.keys()):
return None
return 1
# Register RestartTest as a good test to run
AllTestClasses.append(RollingUpgradeTest)
class BSC_AddResource(CTSTest):
'''Add a resource to the cluster'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "AddResource"
self.resource_offset = 0
self.cib_cmd = """cibadmin -C -o %s -X '%s' """
def __call__(self, node):
self.incr("calls")
self.resource_offset = self.resource_offset + 1
r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset)
start_pat = "pacemaker-controld.*%s_start_0.*confirmed.*ok"
patterns = []
patterns.append(start_pat % r_id)
watch = self.create_watch(patterns, self._env["DeadTime"])
watch.set_watch()
ip = self.NextIP()
if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip):
return self.failure("Make resource %s failed" % r_id)
failed = 0
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self._logger.log ("Warn: Pattern not found: %s" % (regex))
failed = 1
if failed:
return self.failure("Resource pattern(s) not found")
if not self._cm.cluster_stable(self._env["DeadTime"]):
return self.failure("Unstable cluster")
return self.success()
def NextIP(self):
ip = self._env["IPBase"]
if ":" in ip:
fields = ip.rpartition(":")
fields[2] = str(hex(int(fields[2], 16)+1))
print(str(hex(int(f[2], 16)+1)))
else:
fields = ip.rpartition('.')
fields[2] = str(int(fields[2])+1)
ip = fields[0] + fields[1] + fields[3];
self._env["IPBase"] = ip
return ip.strip()
def make_ip_resource(self, node, id, rclass, type, ip):
self._logger.log("Creating %s:%s:%s (%s) on %s" % (rclass,type,id,ip,node))
rsc_xml="""
""" % (id, rclass, type, id, id, ip)
node_constraint = """
""" % (id, id, id, id, node)
rc = 0
(rc, _) = self._rsh(node, self.cib_cmd % ("constraints", node_constraint), verbose=1)
if rc != 0:
self._logger.log("Constraint creation failed: %d" % rc)
return None
(rc, _) = self._rsh(node, self.cib_cmd % ("resources", rsc_xml), verbose=1)
if rc != 0:
self._logger.log("Resource creation failed: %d" % rc)
return None
return 1
def is_applicable(self):
if self._env["DoBSC"]:
return True
return None
AllTestClasses.append(BSC_AddResource)
def TestList(cm, audits):
result = []
for testclass in AllTestClasses:
bound_test = testclass(cm)
if bound_test.is_applicable():
bound_test.audits = audits
result.append(bound_test)
return result
class RemoteLXC(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RemoteLXC"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.num_containers = 2
self.is_container = True
self.fail_string = ""
def start_lxc_simple(self, node):
# restore any artifacts laying around from a previous test.
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
# generate the containers, put them in the config, add some resources to them
pats = [ ]
watch = self.create_watch(pats, 120)
watch.set_watch()
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc1"))
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc2"))
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc-ms"))
pats.append(self.templates["Pat:RscOpOK"] % ("promote", "lxc-ms"))
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -g -a -m -s -c %d &>/dev/null" % self.num_containers)
with Timer(self._logger, self.name, "remoteSimpleInit"):
watch.look_for_all()
if watch.unmatched:
self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
self.failed = True
def cleanup_lxc_simple(self, node):
pats = [ ]
# if the test failed, attempt to clean up the cib and libvirt environment
# as best as possible
if self.failed:
# restore libvirt and cib
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
return
watch = self.create_watch(pats, 120)
watch.set_watch()
pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container1"))
pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container2"))
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -p &>/dev/null")
with Timer(self._logger, self.name, "remoteSimpleCleanup"):
watch.look_for_all()
if watch.unmatched:
self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
self.failed = True
# cleanup libvirt
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
def __call__(self, node):
'''Perform the 'RemoteLXC' test. '''
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Setup failed, start all nodes failed.")
(rc, _) = self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -v &>/dev/null")
if rc == 1:
self.log("Environment test for lxc support failed.")
return self.skipped()
self.start_lxc_simple(node)
self.cleanup_lxc_simple(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for ping",
r"schedulerd.*: Recover\s+(ping|lxc-ms|container)\s+\(.*\)",
# The orphaned lxc-ms resource causes an expected transition error
# that is a result of the scheduler not having knowledge that the
# promotable resource used to be a clone. As a result, it looks like that
# resource is running in multiple locations when it shouldn't... But in
# this instance we know why this error is occurring and that it is expected.
r"Calculated [Tt]ransition .*pe-error",
r"Resource lxc-ms .* is active on 2 nodes attempting recovery",
r"Unknown operation: fail",
r"VirtualDomain.*ERROR: Unable to determine emulator" ]
AllTestClasses.append(RemoteLXC)
class RemoteBasic(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteBasic"
def __call__(self, node):
'''Perform the 'RemoteBaremetal' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.test_attributes(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
AllTestClasses.append(RemoteBasic)
class RemoteStonithd(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteStonithd"
def __call__(self, node):
'''Perform the 'RemoteStonithd' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.fail_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return False
if "DoFencing" in list(self._env.keys()):
return self._env["DoFencing"]
return True
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Lost connection to Pacemaker Remote node",
r"Software caused connection abort",
r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
r"error: Result of monitor operation for .* on remote-.*: Internal communication failure" ] + super().errors_to_ignore
AllTestClasses.append(RemoteStonithd)
class RemoteMigrate(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteMigrate"
def __call__(self, node):
'''Perform the 'RemoteMigrate' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.migrate_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return 0
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
if len(self._env["nodes"]) < 3:
return 0
return 1
AllTestClasses.append(RemoteMigrate)
class RemoteRscFailure(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteRscFailure"
def __call__(self, node):
'''Perform the 'RemoteRscFailure' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
# This is an important step. We are migrating the connection
# before failing the resource. This verifies that the migration
# has properly maintained control over the remote-node.
self.migrate_connection(node)
self.fail_rsc(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
r"Dummy.*: No process state file found" ] + super().errors_to_ignore
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return 0
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
if len(self._env["nodes"]) < 3:
return 0
return 1
AllTestClasses.append(RemoteRscFailure)
# vim:ts=4:sw=4:et:
diff --git a/python/pacemaker/_cts/tests/Makefile.am b/python/pacemaker/_cts/tests/Makefile.am
index f48f1f4756..2a9bd7d134 100644
--- a/python/pacemaker/_cts/tests/Makefile.am
+++ b/python/pacemaker/_cts/tests/Makefile.am
@@ -1,15 +1,20 @@
#
# Copyright 2023 the Pacemaker project contributors
#
# The version control history for this file may have further details.
#
# This source code is licensed under the GNU General Public License version 2
# or later (GPLv2+) WITHOUT ANY WARRANTY.
#
MAINTAINERCLEANFILES = Makefile.in
pkgpythondir = $(pythondir)/$(PACKAGE)/_cts/tests
pkgpython_PYTHON = __init__.py \
- base.py
+ ctstest.py \
+ remotedriver.py \
+ simulstartlite.py \
+ simulstoplite.py \
+ starttest.py \
+ stoptest.py
diff --git a/python/pacemaker/_cts/tests/__init__.py b/python/pacemaker/_cts/tests/__init__.py
index 3f5f34ac68..82fa9ba362 100644
--- a/python/pacemaker/_cts/tests/__init__.py
+++ b/python/pacemaker/_cts/tests/__init__.py
@@ -1,6 +1,13 @@
"""
Test classes for the `pacemaker._cts` package.
"""
__copyright__ = "Copyright 2023 the Pacemaker project contributors"
__license__ = "GNU Lesser General Public License version 2.1 or later (LGPLv2.1+)"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.remotedriver import RemoteDriver
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.tests.stoptest import StopTest
diff --git a/python/pacemaker/_cts/tests/ctstest.py b/python/pacemaker/_cts/tests/ctstest.py
new file mode 100644
index 0000000000..f2fe8bf710
--- /dev/null
+++ b/python/pacemaker/_cts/tests/ctstest.py
@@ -0,0 +1,290 @@
+""" Base classes for CTS tests """
+
+__all__ = ["CTSTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+
+from pacemaker._cts.audits import AuditConstraint, AuditResource
+from pacemaker._cts.environment import EnvFactory
+from pacemaker._cts.logging import LogFactory
+from pacemaker._cts.patterns import PatternSelector
+from pacemaker._cts.remote import RemoteFactory
+from pacemaker._cts.timer import Timer
+from pacemaker._cts.watcher import LogWatcher
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class CTSTest:
+ """ The base class for all cluster tests. This implements a basic set of
+ properties and behaviors like setup, tear down, time keeping, and
+ statistics tracking. It is up to specific tests to implement their own
+ specialized behavior on top of this class.
+ """
+
+ def __init__(self, cm):
+ """ Create a new CTSTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ # pylint: disable=invalid-name
+
+ self.audits = []
+ self.name = None
+ self.templates = PatternSelector(cm["Name"])
+
+ self.stats = { "auditfail": 0,
+ "calls": 0,
+ "failure": 0,
+ "skipped": 0,
+ "success": 0 }
+
+ self._cm = cm
+ self._env = EnvFactory().getInstance()
+ self._r_o2cb = None
+ self._r_ocfs2 = []
+ self._rsh = RemoteFactory().getInstance()
+ self._logger = LogFactory()
+ self._timers = {}
+
+ self.benchmark = True # which tests to benchmark
+ self.failed = False
+ self.is_container = False
+ self.is_experimental = False
+ self.is_loop = False
+ self.is_unsafe = False
+ self.is_valgrind = False
+ self.passed = True
+
+ def log(self, args):
+ """ Log a message """
+
+ self._logger.log(args)
+
+ def debug(self, args):
+ """ Log a debug message """
+
+ self._logger.debug(args)
+
+ def get_timer(self, key="test"):
+ """ Get the start time of the given timer """
+
+ try:
+ return self._timers[key].start_time
+ except KeyError:
+ return 0
+
+ def set_timer(self, key="test"):
+ """ Set the start time of the given timer to now, and return
+ that time
+ """
+
+ if key not in self._timers:
+ self._timers[key] = Timer(self._logger, self.name, key)
+
+ self._timers[key].start()
+ return self._timers[key].start_time
+
+ def log_timer(self, key="test"):
+ """ Log the elapsed time of the given timer """
+
+ if key not in self._timers:
+ return
+
+ elapsed = self._timers[key].elapsed
+ self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed))
+ del self._timers[key]
+
+ def incr(self, name):
+ """ Increment the given stats key """
+
+ if name not in self.stats:
+ self.stats[name] = 0
+
+ self.stats[name] += 1
+
+ # Reset the test passed boolean
+ if name == "calls":
+ self.passed = True
+
+ def failure(self, reason="none"):
+ """ Increment the failure count, with an optional failure reason """
+
+ self.passed = False
+ self.incr("failure")
+ self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
+
+ return False
+
+ def success(self):
+ """ Increment the success count """
+
+ self.incr("success")
+ return True
+
+ def skipped(self):
+ """ Increment the skipped count """
+
+ self.incr("skipped")
+ return True
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ raise NotImplementedError
+
+ def audit(self):
+ """ Perform all the relevant audits (see ClusterAudit), returning
+ whether or not they all passed.
+ """
+
+ passed = True
+
+ for audit in self.audits:
+ if not audit():
+ self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name))
+ self.incr("auditfail")
+ passed = False
+
+ return passed
+
+ def setup(self, node):
+ """ Setup this test """
+
+ # node is used in subclasses
+ # pylint: disable=unused-argument
+
+ return self.success()
+
+ def teardown(self, node):
+ """ Tear down this test """
+
+ # node is used in subclasses
+ # pylint: disable=unused-argument
+
+ return self.success()
+
+ def create_watch(self, patterns, timeout, name=None):
+ """ Create a new LogWatcher object with the given patterns, timeout,
+ and optional name. This object can be used to search log files
+ for matching patterns during this test's run.
+ """
+ if not name:
+ name = self.name
+
+ return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout)
+
+ def local_badnews(self, prefix, watch, local_ignore=None):
+ """ Use the given watch object to search through log files for messages
+ starting with the given prefix. If no prefix is given, use
+ "LocalBadNews:" by default. The optional local_ignore list should
+ be a list of regexes that, if found in a line, will cause that line
+ to be ignored.
+
+ Return the number of matches found.
+ """
+ errcount = 0
+ if not prefix:
+ prefix = "LocalBadNews:"
+
+ ignorelist = [" CTS: ", prefix]
+
+ if local_ignore:
+ ignorelist += local_ignore
+
+ while errcount < 100:
+ match = watch.look(0)
+ if match:
+ add_err = True
+
+ for ignore in ignorelist:
+ if add_err and re.search(ignore, match):
+ add_err = False
+
+ if add_err:
+ self._logger.log("%s %s" % (prefix, match))
+ errcount += 1
+ else:
+ break
+ else:
+ self._logger.log("Too many errors!")
+
+ watch.end()
+ return errcount
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration.
+ This method must be implemented by all subclasses.
+ """
+
+ if self.is_loop and not self._env["loop-tests"]:
+ return False
+
+ if self.is_unsafe and not self._env["unsafe-tests"]:
+ return False
+
+ if self.is_valgrind and not self._env["valgrind-tests"]:
+ return False
+
+ if self.is_experimental and not self._env["experimental-tests"]:
+ return False
+
+ if self.is_container and not self._env["container-tests"]:
+ return False
+
+ if self._env["benchmark"] and not self.benchmark:
+ return False
+
+ return True
+
+ def _find_ocfs2_resources(self, node):
+ """ Find any OCFS2 filesystems mounted on the given cluster node,
+ populating the internal self._r_ocfs2 list with them and returning
+ the number of OCFS2 filesystems.
+ """
+
+ self._r_o2cb = None
+ self._r_ocfs2 = []
+
+ (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self._cm, line)
+
+ if r.rtype == "o2cb" and r.parent != "NA":
+ self.debug("Found o2cb: %s" % self._r_o2cb)
+ self._r_o2cb = r.parent
+
+ if re.search("^Constraint", line):
+ c = AuditConstraint(self._cm, line)
+
+ if c.type == "rsc_colocation" and c.target == self._r_o2cb:
+ self._r_ocfs2.append(c.rsc)
+
+ self.debug("Found ocfs2 filesystems: %s" % self._r_ocfs2)
+ return len(self._r_ocfs2)
+
+ def can_run_now(self, node):
+ """ Return True if we can meaningfully run right now """
+
+ # node is used in subclasses
+ # pylint: disable=unused-argument
+
+ return True
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return []
diff --git a/python/pacemaker/_cts/tests/base.py b/python/pacemaker/_cts/tests/remotedriver.py
similarity index 51%
rename from python/pacemaker/_cts/tests/base.py
rename to python/pacemaker/_cts/tests/remotedriver.py
index 5f6e8ac96e..852976113d 100644
--- a/python/pacemaker/_cts/tests/base.py
+++ b/python/pacemaker/_cts/tests/remotedriver.py
@@ -1,1114 +1,533 @@
""" Base classes for CTS tests """
-__all__ = ["CTSTest", "RemoteDriver", "SimulStartLite", "SimulStopLite", "StartTest", "StopTest"]
+__all__ = ["RemoteDriver"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
-import re
import time
import subprocess
import tempfile
-from pacemaker._cts.audits import AuditConstraint, AuditResource
-from pacemaker._cts.environment import EnvFactory
-from pacemaker._cts.logging import LogFactory
-from pacemaker._cts.patterns import PatternSelector
-from pacemaker._cts.remote import RemoteFactory
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.tests.stoptest import StopTest
from pacemaker._cts.timer import Timer
-from pacemaker._cts.watcher import LogWatcher
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
-# pylint doesn't understand that self._env is subscriptable.
-# pylint: disable=unsubscriptable-object
-
-
-class CTSTest:
- """ The base class for all cluster tests. This implements a basic set of
- properties and behaviors like setup, tear down, time keeping, and
- statistics tracking. It is up to specific tests to implement their own
- specialized behavior on top of this class.
- """
-
- def __init__(self, cm):
- """ Create a new CTSTest instance
-
- Arguments:
-
- cm -- A ClusterManager instance
- """
-
- # pylint: disable=invalid-name
-
- self.audits = []
- self.name = None
- self.templates = PatternSelector(cm["Name"])
-
- self.stats = { "auditfail": 0,
- "calls": 0,
- "failure": 0,
- "skipped": 0,
- "success": 0 }
-
- self._cm = cm
- self._env = EnvFactory().getInstance()
- self._r_o2cb = None
- self._r_ocfs2 = []
- self._rsh = RemoteFactory().getInstance()
- self._logger = LogFactory()
- self._timers = {}
-
- self.benchmark = True # which tests to benchmark
- self.failed = False
- self.is_container = False
- self.is_experimental = False
- self.is_loop = False
- self.is_unsafe = False
- self.is_valgrind = False
- self.passed = True
-
- def log(self, args):
- """ Log a message """
-
- self._logger.log(args)
-
- def debug(self, args):
- """ Log a debug message """
-
- self._logger.debug(args)
-
- def get_timer(self, key="test"):
- """ Get the start time of the given timer """
-
- try:
- return self._timers[key].start_time
- except KeyError:
- return 0
-
- def set_timer(self, key="test"):
- """ Set the start time of the given timer to now, and return
- that time
- """
-
- if key not in self._timers:
- self._timers[key] = Timer(self._logger, self.name, key)
-
- self._timers[key].start()
- return self._timers[key].start_time
-
- def log_timer(self, key="test"):
- """ Log the elapsed time of the given timer """
-
- if key not in self._timers:
- return
-
- elapsed = self._timers[key].elapsed
- self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed))
- del self._timers[key]
-
- def incr(self, name):
- """ Increment the given stats key """
-
- if name not in self.stats:
- self.stats[name] = 0
-
- self.stats[name] += 1
-
- # Reset the test passed boolean
- if name == "calls":
- self.passed = True
-
- def failure(self, reason="none"):
- """ Increment the failure count, with an optional failure reason """
-
- self.passed = False
- self.incr("failure")
- self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
-
- return False
-
- def success(self):
- """ Increment the success count """
-
- self.incr("success")
- return True
-
- def skipped(self):
- """ Increment the skipped count """
-
- self.incr("skipped")
- return True
-
- def __call__(self, node):
- """ Perform this test """
-
- raise NotImplementedError
-
- def audit(self):
- """ Perform all the relevant audits (see ClusterAudit), returning
- whether or not they all passed.
- """
-
- passed = True
-
- for audit in self.audits:
- if not audit():
- self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name))
- self.incr("auditfail")
- passed = False
-
- return passed
-
- def setup(self, node):
- """ Setup this test """
-
- # node is used in subclasses
- # pylint: disable=unused-argument
-
- return self.success()
-
- def teardown(self, node):
- """ Tear down this test """
-
- # node is used in subclasses
- # pylint: disable=unused-argument
-
- return self.success()
-
- def create_watch(self, patterns, timeout, name=None):
- """ Create a new LogWatcher object with the given patterns, timeout,
- and optional name. This object can be used to search log files
- for matching patterns during this test's run.
- """
- if not name:
- name = self.name
-
- return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout)
-
- def local_badnews(self, prefix, watch, local_ignore=None):
- """ Use the given watch object to search through log files for messages
- starting with the given prefix. If no prefix is given, use
- "LocalBadNews:" by default. The optional local_ignore list should
- be a list of regexes that, if found in a line, will cause that line
- to be ignored.
-
- Return the number of matches found.
- """
- errcount = 0
- if not prefix:
- prefix = "LocalBadNews:"
-
- ignorelist = [" CTS: ", prefix]
-
- if local_ignore:
- ignorelist += local_ignore
-
- while errcount < 100:
- match = watch.look(0)
- if match:
- add_err = True
-
- for ignore in ignorelist:
- if add_err and re.search(ignore, match):
- add_err = False
-
- if add_err:
- self._logger.log("%s %s" % (prefix, match))
- errcount += 1
- else:
- break
- else:
- self._logger.log("Too many errors!")
-
- watch.end()
- return errcount
-
- def is_applicable(self):
- """ Return True if this test is applicable in the current test configuration.
- This method must be implemented by all subclasses.
- """
-
- if self.is_loop and not self._env["loop-tests"]:
- return False
-
- if self.is_unsafe and not self._env["unsafe-tests"]:
- return False
-
- if self.is_valgrind and not self._env["valgrind-tests"]:
- return False
-
- if self.is_experimental and not self._env["experimental-tests"]:
- return False
-
- if self.is_container and not self._env["container-tests"]:
- return False
-
- if self._env["benchmark"] and not self.benchmark:
- return False
-
- return True
-
- def _find_ocfs2_resources(self, node):
- """ Find any OCFS2 filesystems mounted on the given cluster node,
- populating the internal self._r_ocfs2 list with them and returning
- the number of OCFS2 filesystems.
- """
-
- self._r_o2cb = None
- self._r_ocfs2 = []
-
- (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
- for line in lines:
- if re.search("^Resource", line):
- r = AuditResource(self._cm, line)
-
- if r.rtype == "o2cb" and r.parent != "NA":
- self.debug("Found o2cb: %s" % self._r_o2cb)
- self._r_o2cb = r.parent
-
- if re.search("^Constraint", line):
- c = AuditConstraint(self._cm, line)
-
- if c.type == "rsc_colocation" and c.target == self._r_o2cb:
- self._r_ocfs2.append(c.rsc)
-
- self.debug("Found ocfs2 filesystems: %s" % self._r_ocfs2)
- return len(self._r_ocfs2)
-
- def can_run_now(self, node):
- """ Return True if we can meaningfully run right now """
-
- # node is used in subclasses
- # pylint: disable=unused-argument
-
- return True
-
- @property
- def errors_to_ignore(self):
- """ Return list of errors which should be ignored """
-
- return []
class RemoteDriver(CTSTest):
""" A specialized base class for cluster tests that run on Pacemaker
Remote nodes. This builds on top of CTSTest to provide methods
for starting and stopping services and resources, and managing
remote nodes. This is still just an abstract class -- specific
tests need to implement their own specialized behavior.
"""
def __init__(self, cm):
""" Create a new RemoteDriver instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self,cm)
self.name = "RemoteDriver"
self._corosync_enabled = False
self._pacemaker_enabled = False
self._remote_node = None
self._remote_rsc = "remote-rsc"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self._stop = StopTest(cm)
self.reset()
def reset(self):
""" Reset the state of this test back to what it was before the test
was run
"""
self.failed = False
self.fail_string = ""
self._pcmk_started = False
self._remote_node_added = False
self._remote_rsc_added = False
self._remote_use_reconnect_interval = self._env.random_gen.choice([True,False])
def fail(self, msg):
""" Mark test as failed """
self.failed = True
# Always log the failure.
self._logger.log(msg)
# Use first failure as test status, as it's likely to be most useful.
if not self.fail_string:
self.fail_string = msg
def _get_other_node(self, node):
""" Get the first cluster node out of the environment that is not the
given node. Typically, this is used to find some node that will
still be active that we can run cluster commands on.
"""
for othernode in self._env["nodes"]:
if othernode == node:
# we don't want to try and use the cib that we just shutdown.
# find a cluster node that is not our soon to be remote-node.
continue
return othernode
def _del_rsc(self, node, rsc):
""" Delete the given named resource from the cluster. The given `node`
is the cluster node on which we should *not* run the delete command.
"""
othernode = self._get_other_node(node)
(rc, _) = self._rsh(othernode, "crm_resource -D -r %s -t primitive" % rsc)
if rc != 0:
self.fail("Removal of resource '%s' failed" % rsc)
def _add_rsc(self, node, rsc_xml):
""" Add a resource given in XML format to the cluster. The given `node`
is the cluster node on which we should *not* run the add command.
"""
othernode = self._get_other_node(node)
(rc, _) = self._rsh(othernode, "cibadmin -C -o resources -X '%s'" % rsc_xml)
if rc != 0:
self.fail("resource creation failed")
def _add_primitive_rsc(self, node):
""" Add a primitive heartbeat resource for the remote node to the
cluster. The given `node` is the cluster node on which we should
*not* run the add command.
"""
rsc_xml = """
""" % { "node": self._remote_rsc }
self._add_rsc(node, rsc_xml)
if not self.failed:
self._remote_rsc_added = True
def _add_connection_rsc(self, node):
""" Add a primitive connection resource for the remote node to the
cluster. The given `node` is teh cluster node on which we should
*not* run the add command.
"""
rsc_xml = """
""" % { "node": self._remote_node, "server": node }
if self._remote_use_reconnect_interval:
# Set reconnect interval on resource
rsc_xml += """
""" % self._remote_node
rsc_xml += """
""" % { "node": self._remote_node }
self._add_rsc(node, rsc_xml)
if not self.failed:
self._remote_node_added = True
def _disable_services(self, node):
""" Disable the corosync and pacemaker services on the given node """
self._corosync_enabled = self._env.service_is_enabled(node, "corosync")
if self._corosync_enabled:
self._env.disable_service(node, "corosync")
self._pacemaker_enabled = self._env.service_is_enabled(node, "pacemaker")
if self._pacemaker_enabled:
self._env.disable_service(node, "pacemaker")
def _enable_services(self, node):
""" Enable the corosync and pacemaker services on the given node """
if self._corosync_enabled:
self._env.enable_service(node, "corosync")
if self._pacemaker_enabled:
self._env.enable_service(node, "pacemaker")
def _stop_pcmk_remote(self, node):
""" Stop the Pacemaker Remote service on the given node """
for _ in range(10):
(rc, _) = self._rsh(node, "service pacemaker_remote stop")
if rc != 0:
time.sleep(6)
else:
break
def _start_pcmk_remote(self, node):
""" Start the Pacemaker Remote service on the given node """
for _ in range(10):
(rc, _) = self._rsh(node, "service pacemaker_remote start")
if rc != 0:
time.sleep(6)
else:
self._pcmk_started = True
break
def _freeze_pcmk_remote(self, node):
""" Simulate a Pacemaker Remote daemon failure """
self._rsh(node, "killall -STOP pacemaker-remoted")
def _resume_pcmk_remote(self, node):
""" Simulate the Pacemaker Remote daemon recovering """
self._rsh(node, "killall -CONT pacemaker-remoted")
def _start_metal(self, node):
""" Setup a Pacemaker Remote configuration. Remove any existing
connection resources or nodes. Start the pacemaker_remote service.
Create a connection resource.
"""
# Cluster nodes are reused as remote nodes in remote tests. If cluster
# services were enabled at boot, in case the remote node got fenced, the
# cluster node would join instead of the expected remote one. Meanwhile
# pacemaker_remote would not be able to start. Depending on the chances,
# the situations might not be able to be orchestrated gracefully any more.
#
# Temporarily disable any enabled cluster serivces.
self._disable_services(node)
# make sure the resource doesn't already exist for some reason
self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_rsc)
self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_node)
if not self._stop(node):
self.fail("Failed to shutdown cluster node %s" % node)
return
self._start_pcmk_remote(node)
if not self._pcmk_started:
self.fail("Failed to start pacemaker_remote on node %s" % node)
return
# Convert node to baremetal now that it has shutdown the cluster stack
pats = [ self.templates["Pat:RscOpOK"] % ("start", self._remote_node),
self.templates["Pat:DC_IDLE"] ]
watch = self.create_watch(pats, 120)
watch.set_watch()
self._add_connection_rsc(node)
with Timer(self._logger, self.name, "remoteMetalInit"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def migrate_connection(self, node):
""" Move the remote connection resource from the node it's currently
running on to any other available node
"""
if self.failed:
return
pats = [ self.templates["Pat:RscOpOK"] % ("migrate_to", self._remote_node),
self.templates["Pat:RscOpOK"] % ("migrate_from", self._remote_node),
self.templates["Pat:DC_IDLE"] ]
watch = self.create_watch(pats, 120)
watch.set_watch()
(rc, _) = self._rsh(node, "crm_resource -M -r %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("failed to move remote node connection resource")
return
with Timer(self._logger, self.name, "remoteMetalMigrate"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def fail_rsc(self, node):
""" Cause the dummy resource running on a Pacemaker Remote node to fail
and verify that the failure is logged correctly
"""
if self.failed:
return
watchpats = [ self.templates["Pat:RscRemoteOpOK"] % ("stop", self._remote_rsc, self._remote_node),
self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
self.templates["Pat:DC_IDLE"] ]
watch = self.create_watch(watchpats, 120)
watch.set_watch()
self.debug("causing dummy rsc to fail.")
self._rsh(node, "rm -f /var/run/resource-agents/Dummy*")
with Timer(self._logger, self.name, "remoteRscFail"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched)
def fail_connection(self, node):
""" Cause the remote connection resource to fail and verify that the
node is fenced and the connection resource is restarted on another
node.
"""
if self.failed:
return
watchpats = [ self.templates["Pat:Fencing_ok"] % self._remote_node,
self.templates["Pat:NodeFenced"] % self._remote_node ]
watch = self.create_watch(watchpats, 120)
watch.set_watch()
# freeze the pcmk remote daemon. this will result in fencing
self.debug("Force stopped active remote node")
self._freeze_pcmk_remote(node)
self.debug("Waiting for remote node to be fenced.")
with Timer(self._logger, self.name, "remoteMetalFence"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
return
self.debug("Waiting for the remote node to come back up")
self._cm.ns.wait_for_node(node, 120)
pats = [ self.templates["Pat:RscOpOK"] % ("start", self._remote_node) ]
if self._remote_rsc_added:
pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node))
watch = self.create_watch([], 240)
watch.set_watch()
# start the remote node again watch it integrate back into cluster.
self._start_pcmk_remote(node)
if not self._pcmk_started:
self.fail("Failed to start pacemaker_remote on node %s" % node)
return
self.debug("Waiting for remote node to rejoin cluster after being fenced.")
with Timer(self._logger, self.name, "remoteMetalRestart"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def _add_dummy_rsc(self, node):
""" Add a dummy resource that runs on the Pacemaker Remote node """
if self.failed:
return
# verify we can put a resource on the remote node
pats = [ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
self.templates["Pat:DC_IDLE"] ]
watch = self.create_watch(pats, 120)
watch.set_watch()
# Add a resource that must live on remote-node
self._add_primitive_rsc(node)
# force that rsc to prefer the remote node.
(rc, _) = self._cm.rsh(node, "crm_resource -M -r %s -N %s -f" % (self._remote_rsc, self._remote_node), verbose=1)
if rc != 0:
self.fail("Failed to place remote resource on remote node.")
return
with Timer(self._logger, self.name, "remoteMetalRsc"):
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def test_attributes(self, node):
""" Verify that attributes can be set on the Pacemaker Remote node """
if self.failed:
return
# This verifies permanent attributes can be set on a remote-node. It also
# verifies the remote-node can edit its own cib node section remotely.
(rc, line) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line))
return
(rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("Failed to get remote-node attribute")
return
(rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % self._remote_node, verbose=1)
if rc != 0:
self.fail("Failed to delete remote-node attribute")
def cleanup_metal(self, node):
""" Clean up the Pacemaker Remote node configuration previously created by
_setup_metal. Stop and remove dummy resources and connection resources.
Stop the pacemaker_remote service. Remove the remote node itself.
"""
self._enable_services(node)
if not self._pcmk_started:
return
pats = [ ]
watch = self.create_watch(pats, 120)
watch.set_watch()
if self._remote_rsc_added:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_rsc))
if self._remote_node_added:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_node))
with Timer(self._logger, self.name, "remoteMetalCleanup"):
self._resume_pcmk_remote(node)
if self._remote_rsc_added:
# Remove dummy resource added for remote node tests
self.debug("Cleaning up dummy rsc put on remote node")
self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_rsc)
self._del_rsc(node, self._remote_rsc)
if self._remote_node_added:
# Remove remote node's connection resource
self.debug("Cleaning up remote node connection resource")
self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_node)
self._del_rsc(node, self._remote_node)
watch.look_for_all()
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
self._stop_pcmk_remote(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self._remote_node_added:
# Remove remote node itself
self.debug("Cleaning up node entry for remote node")
self._rsh(self._get_other_node(node), "crm_node --force --remove %s" % self._remote_node)
def _setup_env(self, node):
""" Setup the environment to allow Pacemaker Remote to function. This
involves generating a key and copying it to all nodes in the cluster.
"""
self._remote_node = "remote-%s" % node
# we are assuming if all nodes have a key, that it is
# the right key... If any node doesn't have a remote
# key, we regenerate it everywhere.
if self._rsh.exists_on_all("/etc/pacemaker/authkey", self._env["nodes"]):
return
# create key locally
(handle, keyfile) = tempfile.mkstemp(".cts")
os.close(handle)
subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# sync key throughout the cluster
for n in self._env["nodes"]:
self._rsh(n, "mkdir -p --mode=0750 /etc/pacemaker")
self._rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % n)
self._rsh(n, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey")
self._rsh(n, "chmod 0640 /etc/pacemaker/authkey")
os.unlink(keyfile)
def is_applicable(self):
""" Return True if this test is applicable in the current test configuration. """
if not CTSTest.is_applicable(self):
return False
for node in self._env["nodes"]:
(rc, _) = self._rsh(node, "which pacemaker-remoted >/dev/null 2>&1")
if rc != 0:
return False
return True
def start_new_test(self, node):
""" Prepare a remote test for running by setting up its environment
and resources
"""
self.incr("calls")
self.reset()
ret = self._startall(None)
if not ret:
return self.failure("setup failed: could not start all nodes")
self._setup_env(node)
self._start_metal(node)
self._add_dummy_rsc(node)
return True
def __call__(self, node):
""" Perform this test """
raise NotImplementedError
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"""is running on remote.*which isn't allowed""",
r"""Connection terminated""",
r"""Could not send remote""" ]
-
-
-class SimulStartLite(CTSTest):
- """ A pseudo-test that is only used to set up conditions before running
- some other test. This class starts any stopped nodes more or less
- simultaneously.
-
- Other test classes should not use this one as a superclass.
- """
-
- def __init__(self, cm):
- """ Create a new SimulStartLite instance
-
- Arguments:
-
- cm -- A ClusterManager instance
- """
-
- CTSTest.__init__(self,cm)
- self.name = "SimulStartLite"
-
- def __call__(self, dummy):
- """ Start all stopped nodes more or less simultaneously, returning
- whether this succeeded or not.
- """
-
- self.incr("calls")
- self.debug("Setup: %s" % self.name)
-
- # We ignore the "node" parameter...
- node_list = []
- for node in self._env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "down":
- self.incr("WasStopped")
- node_list.append(node)
-
- self.set_timer()
- while len(node_list) > 0:
- # Repeat until all nodes come up
- uppat = self.templates["Pat:NonDC_started"]
- if self._cm.upcount() == 0:
- uppat = self.templates["Pat:Local_started"]
-
- watchpats = [ self.templates["Pat:DC_IDLE"] ]
- for node in node_list:
- watchpats.extend([uppat % node,
- self.templates["Pat:InfraUp"] % node,
- self.templates["Pat:PacemakerUp"] % node])
-
- # Start all the nodes - at about the same time...
- watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
- watch.set_watch()
-
- stonith = self._cm.prepare_fencing_watcher(self.name)
-
- for node in node_list:
- self._cm.StartaCMnoBlock(node)
-
- watch.look_for_all()
-
- node_list = self._cm.fencing_cleanup(self.name, stonith)
-
- if node_list is None:
- return self.failure("Cluster did not stabilize")
-
- # Remove node_list messages from watch.unmatched
- for node in node_list:
- self._logger.debug("Dealing with stonith operations for %s" % node_list)
- if watch.unmatched:
- try:
- watch.unmatched.remove(uppat % node)
- except ValueError:
- self.debug("Already matched: %s" % (uppat % node))
-
- try:
- watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
- except ValueError:
- self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
-
- try:
- watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
- except ValueError:
- self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
-
- if watch.unmatched:
- for regex in watch.unmatched:
- self._logger.log ("Warn: Startup pattern not found: %s" % regex)
-
- if not self._cm.cluster_stable():
- return self.failure("Cluster did not stabilize")
-
- did_fail = False
- unstable = []
- for node in self._env["nodes"]:
- if self._cm.StataCM(node) == 0:
- did_fail = True
- unstable.append(node)
-
- if did_fail:
- return self.failure("Unstarted nodes exist: %s" % unstable)
-
- unstable = []
- for node in self._env["nodes"]:
- if not self._cm.node_stable(node):
- did_fail = True
- unstable.append(node)
-
- if did_fail:
- return self.failure("Unstable cluster nodes exist: %s" % unstable)
-
- return self.success()
-
- def is_applicable(self):
- """ SimulStartLite is a setup test and never applicable """
-
- return False
-
-
-class SimulStopLite(CTSTest):
- """ A pseudo-test that is only used to set up conditions before running
- some other test. This class stops any running nodes more or less
- simultaneously. It can be used both to set up a test or to clean up
- a test.
-
- Other test classes should not use this one as a superclass.
- """
-
- def __init__(self, cm):
- """ Create a new SimulStopLite instance
-
- Arguments:
-
- cm -- A ClusterManager instance
- """
-
- CTSTest.__init__(self,cm)
- self.name = "SimulStopLite"
-
- def __call__(self, dummy):
- """ Stop all running nodes more or less simultaneously, returning
- whether this succeeded or not.
- """
-
- self.incr("calls")
- self.debug("Setup: %s" % self.name)
-
- # We ignore the "node" parameter...
- watchpats = []
-
- for node in self._env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "up":
- self.incr("WasStarted")
- watchpats.append(self.templates["Pat:We_stopped"] % node)
-
- if len(watchpats) == 0:
- return self.success()
-
- # Stop all the nodes - at about the same time...
- watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
-
- watch.set_watch()
- self.set_timer()
- for node in self._env["nodes"]:
- if self._cm.ShouldBeStatus[node] == "up":
- self._cm.StopaCMnoBlock(node)
-
- if watch.look_for_all():
- # Make sure they're completely down with no residule
- for node in self._env["nodes"]:
- self._rsh(node, self.templates["StopCmd"])
-
- return self.success()
-
- did_fail = False
- up_nodes = []
- for node in self._env["nodes"]:
- if self._cm.StataCM(node) == 1:
- did_fail = True
- up_nodes.append(node)
-
- if did_fail:
- return self.failure("Active nodes exist: %s" % up_nodes)
-
- self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched)
- return self.failure("Missing log message: %s " % watch.unmatched)
-
- def is_applicable(self):
- """ SimulStopLite is a setup test and never applicable """
-
- return False
-
-
-class StartTest(CTSTest):
- """ A pseudo-test that is only used to set up conditions before running
- some other test. This class starts the cluster manager on a given
- node.
-
- Other test classes should not use this one as a superclass.
- """
-
- def __init__(self, cm):
- """ Create a new StartTest instance
-
- Arguments:
-
- cm -- A ClusterManager instance
- """
-
- CTSTest.__init__(self,cm)
- self.name = "Start"
-
- def __call__(self, node):
- """ Start the given node, returning whether this succeeded or not """
-
- self.incr("calls")
-
- if self._cm.upcount() == 0:
- self.incr("us")
- else:
- self.incr("them")
-
- if self._cm.ShouldBeStatus[node] != "down":
- return self.skipped()
-
- if self._cm.StartaCM(node):
- return self.success()
-
- return self.failure("Startup %s on node %s failed"
- % (self._env["Name"], node))
-
-
-class StopTest(CTSTest):
- """ A pseudo-test that is only used to set up conditions before running
- some other test. This class stops the cluster manager on a given
- node.
-
- Other test classes should not use this one as a superclass.
- """
-
- def __init__(self, cm):
- """ Create a new StopTest instance
-
- Arguments:
-
- cm -- A ClusterManager instance
- """
-
- CTSTest.__init__(self, cm)
- self.name = "Stop"
-
- def __call__(self, node):
- """ Stop the given node, returning whether this succeeded or not """
-
- self.incr("calls")
- if self._cm.ShouldBeStatus[node] != "up":
- return self.skipped()
-
- # Technically we should always be able to notice ourselves stopping
- patterns = [ self.templates["Pat:We_stopped"] % node ]
-
- # Any active node needs to notice this one left
- # (note that this won't work if we have multiple partitions)
- for other in self._env["nodes"]:
- if self._cm.ShouldBeStatus[other] == "up" and other != node:
- patterns.append(self.templates["Pat:They_stopped"] %(other, self._cm.key_for_node(node)))
-
- watch = self.create_watch(patterns, self._env["DeadTime"])
- watch.set_watch()
-
- if node == self._cm.OurNode:
- self.incr("us")
- else:
- if self._cm.upcount() <= 1:
- self.incr("all")
- else:
- self.incr("them")
-
- self._cm.StopaCM(node)
- watch.look_for_all()
-
- failreason = None
- unmatched_str = "||"
-
- if watch.unmatched:
- (_, output) = self._rsh(node, "/bin/ps axf", verbose=1)
- for line in output:
- self.debug(line)
-
- (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
- for line in output:
- self.debug(line)
-
- for regex in watch.unmatched:
- self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex)
- unmatched_str += "%s||" % regex
- failreason = "Missing shutdown pattern"
-
- self._cm.cluster_stable(self._env["DeadTime"])
-
- if not watch.unmatched or self._cm.upcount() == 0:
- return self.success()
-
- if len(watch.unmatched) >= self._cm.upcount():
- return self.failure("no match against (%s)" % unmatched_str)
-
- if failreason is None:
- return self.success()
-
- return self.failure(failreason)
diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py
new file mode 100644
index 0000000000..66f4d5eaf1
--- /dev/null
+++ b/python/pacemaker/_cts/tests/simulstartlite.py
@@ -0,0 +1,131 @@
+""" Simultaneously start stopped nodes """
+
+__all__ = ["SimulStartLite"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class SimulStartLite(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class starts any stopped nodes more or less
+ simultaneously.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new SimulStartLite instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self,cm)
+ self.name = "SimulStartLite"
+
+ def __call__(self, dummy):
+ """ Start all stopped nodes more or less simultaneously, returning
+ whether this succeeded or not.
+ """
+
+ self.incr("calls")
+ self.debug("Setup: %s" % self.name)
+
+ # We ignore the "node" parameter...
+ node_list = []
+ for node in self._env["nodes"]:
+ if self._cm.ShouldBeStatus[node] == "down":
+ self.incr("WasStopped")
+ node_list.append(node)
+
+ self.set_timer()
+ while len(node_list) > 0:
+ # Repeat until all nodes come up
+ uppat = self.templates["Pat:NonDC_started"]
+ if self._cm.upcount() == 0:
+ uppat = self.templates["Pat:Local_started"]
+
+ watchpats = [ self.templates["Pat:DC_IDLE"] ]
+ for node in node_list:
+ watchpats.extend([uppat % node,
+ self.templates["Pat:InfraUp"] % node,
+ self.templates["Pat:PacemakerUp"] % node])
+
+ # Start all the nodes - at about the same time...
+ watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
+ watch.set_watch()
+
+ stonith = self._cm.prepare_fencing_watcher(self.name)
+
+ for node in node_list:
+ self._cm.StartaCMnoBlock(node)
+
+ watch.look_for_all()
+
+ node_list = self._cm.fencing_cleanup(self.name, stonith)
+
+ if node_list is None:
+ return self.failure("Cluster did not stabilize")
+
+ # Remove node_list messages from watch.unmatched
+ for node in node_list:
+ self._logger.debug("Dealing with stonith operations for %s" % node_list)
+ if watch.unmatched:
+ try:
+ watch.unmatched.remove(uppat % node)
+ except ValueError:
+ self.debug("Already matched: %s" % (uppat % node))
+
+ try:
+ watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
+ except ValueError:
+ self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
+
+ try:
+ watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
+ except ValueError:
+ self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
+
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self._logger.log ("Warn: Startup pattern not found: %s" % regex)
+
+ if not self._cm.cluster_stable():
+ return self.failure("Cluster did not stabilize")
+
+ did_fail = False
+ unstable = []
+ for node in self._env["nodes"]:
+ if self._cm.StataCM(node) == 0:
+ did_fail = True
+ unstable.append(node)
+
+ if did_fail:
+ return self.failure("Unstarted nodes exist: %s" % unstable)
+
+ unstable = []
+ for node in self._env["nodes"]:
+ if not self._cm.node_stable(node):
+ did_fail = True
+ unstable.append(node)
+
+ if did_fail:
+ return self.failure("Unstable cluster nodes exist: %s" % unstable)
+
+ return self.success()
+
+ def is_applicable(self):
+ """ SimulStartLite is a setup test and never applicable """
+
+ return False
diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py
new file mode 100644
index 0000000000..a5b965e798
--- /dev/null
+++ b/python/pacemaker/_cts/tests/simulstoplite.py
@@ -0,0 +1,91 @@
+""" Simultaneously stop running nodes """
+
+__all__ = ["SimulStopLite"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class SimulStopLite(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class stops any running nodes more or less
+ simultaneously. It can be used both to set up a test or to clean up
+ a test.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new SimulStopLite instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self,cm)
+ self.name = "SimulStopLite"
+
+ def __call__(self, dummy):
+ """ Stop all running nodes more or less simultaneously, returning
+ whether this succeeded or not.
+ """
+
+ self.incr("calls")
+ self.debug("Setup: %s" % self.name)
+
+ # We ignore the "node" parameter...
+ watchpats = []
+
+ for node in self._env["nodes"]:
+ if self._cm.ShouldBeStatus[node] == "up":
+ self.incr("WasStarted")
+ watchpats.append(self.templates["Pat:We_stopped"] % node)
+
+ if len(watchpats) == 0:
+ return self.success()
+
+ # Stop all the nodes - at about the same time...
+ watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
+
+ watch.set_watch()
+ self.set_timer()
+ for node in self._env["nodes"]:
+ if self._cm.ShouldBeStatus[node] == "up":
+ self._cm.StopaCMnoBlock(node)
+
+ if watch.look_for_all():
+ # Make sure they're completely down with no residule
+ for node in self._env["nodes"]:
+ self._rsh(node, self.templates["StopCmd"])
+
+ return self.success()
+
+ did_fail = False
+ up_nodes = []
+ for node in self._env["nodes"]:
+ if self._cm.StataCM(node) == 1:
+ did_fail = True
+ up_nodes.append(node)
+
+ if did_fail:
+ return self.failure("Active nodes exist: %s" % up_nodes)
+
+ self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched)
+ return self.failure("Missing log message: %s " % watch.unmatched)
+
+ def is_applicable(self):
+ """ SimulStopLite is a setup test and never applicable """
+
+ return False
diff --git a/python/pacemaker/_cts/tests/starttest.py b/python/pacemaker/_cts/tests/starttest.py
new file mode 100644
index 0000000000..aba2899fd2
--- /dev/null
+++ b/python/pacemaker/_cts/tests/starttest.py
@@ -0,0 +1,54 @@
+""" Start the cluster manager on a given node """
+
+__all__ = ["StartTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StartTest(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class starts the cluster manager on a given
+ node.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new StartTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self,cm)
+ self.name = "Start"
+
+ def __call__(self, node):
+ """ Start the given node, returning whether this succeeded or not """
+
+ self.incr("calls")
+
+ if self._cm.upcount() == 0:
+ self.incr("us")
+ else:
+ self.incr("them")
+
+ if self._cm.ShouldBeStatus[node] != "down":
+ return self.skipped()
+
+ if self._cm.StartaCM(node):
+ return self.success()
+
+ return self.failure("Startup %s on node %s failed"
+ % (self._env["Name"], node))
diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py
new file mode 100644
index 0000000000..a068b4d828
--- /dev/null
+++ b/python/pacemaker/_cts/tests/stoptest.py
@@ -0,0 +1,97 @@
+""" Stop the cluster manager on a given node """
+
+__all__ = ["StopTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StopTest(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class stops the cluster manager on a given
+ node.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new StopTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "Stop"
+
+ def __call__(self, node):
+ """ Stop the given node, returning whether this succeeded or not """
+
+ self.incr("calls")
+ if self._cm.ShouldBeStatus[node] != "up":
+ return self.skipped()
+
+ # Technically we should always be able to notice ourselves stopping
+ patterns = [ self.templates["Pat:We_stopped"] % node ]
+
+ # Any active node needs to notice this one left
+ # (note that this won't work if we have multiple partitions)
+ for other in self._env["nodes"]:
+ if self._cm.ShouldBeStatus[other] == "up" and other != node:
+ patterns.append(self.templates["Pat:They_stopped"] %(other, self._cm.key_for_node(node)))
+
+ watch = self.create_watch(patterns, self._env["DeadTime"])
+ watch.set_watch()
+
+ if node == self._cm.OurNode:
+ self.incr("us")
+ else:
+ if self._cm.upcount() <= 1:
+ self.incr("all")
+ else:
+ self.incr("them")
+
+ self._cm.StopaCM(node)
+ watch.look_for_all()
+
+ failreason = None
+ unmatched_str = "||"
+
+ if watch.unmatched:
+ (_, output) = self._rsh(node, "/bin/ps axf", verbose=1)
+ for line in output:
+ self.debug(line)
+
+ (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
+ for line in output:
+ self.debug(line)
+
+ for regex in watch.unmatched:
+ self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex)
+ unmatched_str += "%s||" % regex
+ failreason = "Missing shutdown pattern"
+
+ self._cm.cluster_stable(self._env["DeadTime"])
+
+ if not watch.unmatched or self._cm.upcount() == 0:
+ return self.success()
+
+ if len(watch.unmatched) >= self._cm.upcount():
+ return self.failure("no match against (%s)" % unmatched_str)
+
+ if failreason is None:
+ return self.success()
+
+ return self.failure(failreason)