diff --git a/cts/lab/CTSlab.py.in b/cts/lab/CTSlab.py.in
index 048f41ae52..2815535ec2 100644
--- a/cts/lab/CTSlab.py.in
+++ b/cts/lab/CTSlab.py.in
@@ -1,135 +1,135 @@
#!@PYTHON@
""" Command-line interface to Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2001-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys, signal, os
pdir = os.path.dirname(sys.path[0])
sys.path.insert(0, pdir) # So that things work from the source directory
try:
from cts.CM_corosync import *
from cts.CTStests import TestList
from cts.CTSscenarios import *
from pacemaker._cts.CTS import CtsLab
from pacemaker._cts.audits import audit_list
from pacemaker._cts.logging import LogFactory
except ImportError as e:
sys.stderr.write("abort: %s\n" % e)
sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" %
' '.join(sys.path))
sys.exit(1)
# These are globals so they can be used by the signal handler.
scenario = None
LogFactory().add_stderr()
def sig_handler(signum, frame) :
LogFactory().log("Interrupted by signal %d"%signum)
if scenario: scenario.summarize()
if signum == 15 :
if scenario: scenario.TearDown()
sys.exit(1)
def plural_s(n, uppercase=False):
if n == 1:
return ""
elif uppercase:
return "S"
else:
return "s"
if __name__ == '__main__':
Environment = CtsLab(sys.argv[1:])
NumIter = Environment["iterations"]
Tests = []
# Set the signal handler
signal.signal(15, sig_handler)
signal.signal(10, sig_handler)
# Create the Cluster Manager object
cm = None
if Environment["Stack"] == "corosync 2+":
cm = crm_corosync()
else:
LogFactory().log("Unknown stack: "+Environment["stack"])
sys.exit(1)
if Environment["TruncateLog"]:
if Environment["OutputFile"] is None:
LogFactory().log("Ignoring truncate request because no output file specified")
else:
LogFactory().log("Truncating %s" % Environment["OutputFile"])
with open(Environment["OutputFile"], "w") as outputfile:
outputfile.truncate(0)
Audits = audit_list(cm)
if Environment["ListTests"]:
Tests = TestList(cm, Audits)
LogFactory().log("Total %d tests"%len(Tests))
for test in Tests :
LogFactory().log(str(test.name));
sys.exit(0)
elif len(Environment["tests"]) == 0:
Tests = TestList(cm, Audits)
else:
Chosen = Environment["tests"]
for TestCase in Chosen:
match = None
for test in TestList(cm, Audits):
if test.name == TestCase:
match = test
if not match:
LogFactory().log("--choose: No applicable/valid tests chosen")
sys.exit(1)
else:
Tests.append(match)
# Scenario selection
if Environment["scenario"] == "basic-sanity":
scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests)
elif Environment["scenario"] == "all-once":
NumIter = len(Tests)
scenario = AllOnce(
cm, [ BootCluster(Environment) ], Audits, Tests)
elif Environment["scenario"] == "sequence":
scenario = Sequence(
cm, [ BootCluster(Environment) ], Audits, Tests)
elif Environment["scenario"] == "boot":
scenario = Boot(cm, [ LeaveBooted(Environment)], Audits, [])
else:
scenario = RandomTests(
cm, [ BootCluster(Environment) ], Audits, Tests)
LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TEST" + plural_s(NumIter, True) + " ")
LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"]))
LogFactory().log("Schema: %s" % Environment["Schema"])
LogFactory().log("Scenario: %s" % scenario.__doc__)
LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"])
LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"])
LogFactory().log("Random Seed: %s" % Environment["RandSeed"])
LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip())
LogFactory().log("System log files: %s" % Environment["LogFileName"])
- if Environment.has_key("IPBase"):
+ if "IPBase" in Environment:
LogFactory().log("Base IP for resources: %s" % Environment["IPBase"])
LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"])
Environment.dump()
rc = Environment.run(scenario, NumIter)
sys.exit(rc)
diff --git a/cts/lab/CTStests.py b/cts/lab/CTStests.py
index 5105abe1ce..03075ce6b4 100644
--- a/cts/lab/CTStests.py
+++ b/cts/lab/CTStests.py
@@ -1,2290 +1,2290 @@
""" Test-specific classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
#
# SPECIAL NOTE:
#
# Tests may NOT implement any cluster-manager-specific code in them.
# EXTEND the ClusterManager object to provide the base capabilities
# the test needs if you need to do something that the current CM classes
# do not. Otherwise you screw up the whole point of the object structure
# in CTS.
#
# Thank you.
#
import os
import re
import time
import tempfile
from stat import *
from pacemaker import BuildOptions
from pacemaker._cts.CTS import NodeStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests import CTSTest, RemoteDriver, SimulStartLite, SimulStopLite, StartTest, StopTest
from pacemaker._cts.timer import Timer
AllTestClasses = [ ]
class FlipTest(CTSTest):
'''If it's running, stop it. If it's stopped start it.
Overthrow the status quo...
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Flip"
self._start = StartTest(cm)
self._stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'Flip' test. '''
self.incr("calls")
if self._cm.ShouldBeStatus[node] == "up":
self.incr("stopped")
ret = self._stop(node)
type = "up->down"
# Give the cluster time to recognize it's gone...
time.sleep(self._env["StableTime"])
elif self._cm.ShouldBeStatus[node] == "down":
self.incr("started")
ret = self._start(node)
type = "down->up"
else:
return self.skipped()
self.incr(type)
if ret:
return self.success()
else:
return self.failure("%s failure" % type)
# Register FlipTest as a good test to run
AllTestClasses.append(FlipTest)
class RestartTest(CTSTest):
'''Stop and restart a node'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Restart"
self._start = StartTest(cm)
self._stop = StopTest(cm)
self.benchmark = True
def __call__(self, node):
'''Perform the 'restart' test. '''
self.incr("calls")
self.incr("node:" + node)
ret1 = 1
if self._cm.StataCM(node):
self.incr("WasStopped")
if not self._start(node):
return self.failure("start (setup) failure: "+node)
self.set_timer()
if not self._stop(node):
return self.failure("stop failure: "+node)
if not self._start(node):
return self.failure("start failure: "+node)
return self.success()
# Register RestartTest as a good test to run
AllTestClasses.append(RestartTest)
class StonithdTest(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "Stonithd"
self._startall = SimulStartLite(cm)
self.benchmark = True
def __call__(self, node):
self.incr("calls")
if len(self._env["nodes"]) < 2:
return self.skipped()
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
is_dc = self._cm.is_node_dc(node)
watchpats = []
watchpats.append(self.templates["Pat:Fencing_ok"] % node)
watchpats.append(self.templates["Pat:NodeFenced"] % node)
if not self._env["at-boot"]:
self.debug("Expecting %s to stay down" % node)
self._cm.ShouldBeStatus[node] = "down"
else:
self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"]))
watchpats.append("%s.* S_STARTING -> S_PENDING" % node)
watchpats.append("%s.* S_PENDING -> S_NOT_DC" % node)
watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
origin = self._env.random_gen.choice(self._env["nodes"])
(rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
if rc == 124: # CRM_EX_TIMEOUT
# Look for the patterns, usually this means the required
# device was running on the node to be fenced - or that
# the required devices were in the process of being loaded
# and/or moved
#
# Effectively the node committed suicide so there will be
# no confirmation, but pacemaker should be watching and
# fence the node again
self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
elif origin != node and rc != 0:
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
elif origin == node and rc != 255:
# 255 == broken pipe, ie. the node was fenced as expected
self._logger.log("Locally originated fencing returned %d" % rc)
with Timer(self._logger, self.name, "fence"):
matched = watch.look_for_all()
self.set_timer("reform")
if watch.unmatched:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected patterns")
elif not is_stable:
return self.failure("Cluster did not become stable")
self.log_timer("reform")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ self.templates["Pat:Fencing_start"] % ".*",
self.templates["Pat:Fencing_ok"] % ".*",
self.templates["Pat:Fencing_active"],
r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ]
def is_applicable(self):
if not CTSTest.is_applicable(self):
return False
- if "DoFencing" in list(self._env.keys()):
+ if "DoFencing" in self._env:
return self._env["DoFencing"]
return True
AllTestClasses.append(StonithdTest)
class StartOnebyOne(CTSTest):
'''Start all the nodes ~ one by one'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "StartOnebyOne"
self.stopall = SimulStopLite(cm)
self._start = StartTest(cm)
self.ns = NodeStatus(cm.Env)
def __call__(self, dummy):
'''Perform the 'StartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Test setup failed")
failed = []
self.set_timer()
for node in self._env["nodes"]:
if not self._start(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to start: " + repr(failed))
return self.success()
# Register StartOnebyOne as a good test to run
AllTestClasses.append(StartOnebyOne)
class SimulStart(CTSTest):
'''Start all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStart"
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStart' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
if not self._startall(None):
return self.failure("Startall failed")
return self.success()
# Register SimulStart as a good test to run
AllTestClasses.append(SimulStart)
class SimulStop(CTSTest):
'''Stop all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStop"
self._startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStop' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
if not self.stopall(None):
return self.failure("Stopall failed")
return self.success()
# Register SimulStop as a good test to run
AllTestClasses.append(SimulStop)
class StopOnebyOne(CTSTest):
'''Stop all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "StopOnebyOne"
self._startall = SimulStartLite(cm)
self._stop = StopTest(cm)
def __call__(self, dummy):
'''Perform the 'StopOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
failed = []
self.set_timer()
for node in self._env["nodes"]:
if not self._stop(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to stop: " + repr(failed))
return self.success()
# Register StopOnebyOne as a good test to run
AllTestClasses.append(StopOnebyOne)
class RestartOnebyOne(CTSTest):
'''Restart all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RestartOnebyOne"
self._startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'RestartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
did_fail = []
self.set_timer()
self.restart = RestartTest(self._cm)
for node in self._env["nodes"]:
if not self.restart(node):
did_fail.append(node)
if did_fail:
return self.failure("Could not restart %d nodes: %s"
% (len(did_fail), repr(did_fail)))
return self.success()
# Register StopOnebyOne as a good test to run
AllTestClasses.append(RestartOnebyOne)
class PartialStart(CTSTest):
'''Start a node - but tell it to stop before it finishes starting up'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "PartialStart"
self._startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
self._stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'PartialStart' test. '''
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
watchpats = []
watchpats.append("pacemaker-controld.*Connecting to .* cluster infrastructure")
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self._cm.StartaCMnoBlock(node)
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
return self.failure("Setup of %s failed" % node)
ret = self._stop(node)
if not ret:
return self.failure("%s did not stop in time" % node)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# We might do some fencing in the 2-node case if we make it up far enough
return [ r"Executing reboot fencing operation",
r"Requesting fencing \([^)]+\) targeting node " ]
# Register StopOnebyOne as a good test to run
AllTestClasses.append(PartialStart)
class StandbyTest(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Standby"
self.benchmark = True
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
# make sure the node is active
# set the node to standby mode
# check resources, none resource should be running on the node
# set the node to active mode
# check resouces, resources should have been migrated back (SHOULD THEY?)
def __call__(self, node):
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Start all nodes failed")
self.debug("Make sure node %s is active" % node)
if self._cm.StandbyStatus(node) != "off":
if not self._cm.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self._cm.cluster_stable()
status = self._cm.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.debug("Getting resources running on node %s" % node)
rsc_on_node = self._cm.active_resources(node)
watchpats = []
watchpats.append(r"State transition .* -> S_POLICY_ENGINE")
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
self.debug("Setting node %s to standby mode" % node)
if not self._cm.SetStandbyMode(node, "on"):
return self.failure("can't set node %s to standby mode" % node)
self.set_timer("on")
ret = watch.look_for_all()
if not ret:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
self._cm.SetStandbyMode(node, "off")
return self.failure("cluster didn't react to standby change on %s" % node)
self._cm.cluster_stable()
status = self._cm.StandbyStatus(node)
if status != "on":
return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status))
self.log_timer("on")
self.debug("Checking resources")
bad_run = self._cm.active_resources(node)
if len(bad_run) > 0:
rc = self.failure("%s set to standby, %s is still running on it" % (node, repr(bad_run)))
self.debug("Setting node %s to active mode" % node)
self._cm.SetStandbyMode(node, "off")
return rc
self.debug("Setting node %s to active mode" % node)
if not self._cm.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self.set_timer("off")
self._cm.cluster_stable()
status = self._cm.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.log_timer("off")
return self.success()
AllTestClasses.append(StandbyTest)
class ValgrindTest(CTSTest):
'''Check for memory leaks'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Valgrind"
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
self.is_valgrind = True
self.is_loop = True
def setup(self, node):
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Stop all nodes failed")
# @TODO Edit /etc/sysconfig/pacemaker on all nodes to enable valgrind,
# and clear any valgrind logs from previous runs. For now, we rely on
# the user to do this manually.
ret = self._startall(None)
if not ret:
return self.failure("Start all nodes failed")
return self.success()
def teardown(self, node):
# Return all nodes to normal
# @TODO Edit /etc/sysconfig/pacemaker on all nodes to disable valgrind
ret = self.stopall(None)
if not ret:
return self.failure("Stop all nodes failed")
return self.success()
def find_leaks(self):
# Check for leaks
# (no longer used but kept in case feature is restored)
leaked = []
self._stop = StopTest(self._cm)
for node in self._env["nodes"]:
rc = self._stop(node)
if not rc:
self.failure("Couldn't shut down %s" % node)
(rc, _) = self._rsh(node, "grep -e indirectly.*lost:.*[1-9] -e definitely.*lost:.*[1-9] -e (ERROR|error).*SUMMARY:.*[1-9].*errors %s" % self._logger.logPat)
if rc != 1:
leaked.append(node)
self.failure("Valgrind errors detected on %s" % node)
(_, output) = self._rsh(node, "grep -e lost: -e SUMMARY: %s" % self._logger.logPat, verbose=1)
for line in output:
self._logger.log(line)
(_, output) = self._rsh(node, "cat %s" % self._logger.logPat, verbose=1)
for line in output:
self.debug(line)
self._rsh(node, "rm -f %s" % self._logger.logPat, verbose=1)
return leaked
def __call__(self, node):
#leaked = self.find_leaks()
#if len(leaked) > 0:
# return self.failure("Nodes %s leaked" % repr(leaked))
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"pacemaker-based.*: \*\*\*\*\*\*\*\*\*\*\*\*\*",
r"pacemaker-based.*: .* avoid confusing Valgrind",
r"HA_VALGRIND_ENABLED" ]
class StandbyLoopTest(ValgrindTest):
'''Check for memory leaks by putting a node in and out of standby for an hour'''
# @TODO This is not a useful test for memory leaks
def __init__(self, cm):
ValgrindTest.__init__(self,cm)
self.name = "StandbyLoop"
def __call__(self, node):
lpc = 0
delay = 2
failed = 0
done = time.time() + self._env["loop-minutes"] * 60
while time.time() <= done and not failed:
lpc = lpc + 1
time.sleep(delay)
if not self._cm.SetStandbyMode(node, "on"):
self.failure("can't set node %s to standby mode" % node)
failed = lpc
time.sleep(delay)
if not self._cm.SetStandbyMode(node, "off"):
self.failure("can't set node %s to active mode" % node)
failed = lpc
leaked = self.find_leaks()
if failed:
return self.failure("Iteration %d failed" % failed)
elif len(leaked) > 0:
return self.failure("Nodes %s leaked" % repr(leaked))
return self.success()
#AllTestClasses.append(StandbyLoopTest)
class BandwidthTest(CTSTest):
# Tests should not be cluster-manager-specific
# If you need to find out cluster manager configuration to do this, then
# it should be added to the generic cluster manager API.
'''Test the bandwidth which the cluster uses'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.stats["min"] = 0
self.stats["max"] = 0
self.stats["totalbandwidth"] = 0
self.name = "Bandwidth"
self._start = StartTest(cm)
(handle, self.tempfile) = tempfile.mkstemp(".cts")
os.close(handle)
self._startall = SimulStartLite(cm)
def __call__(self, node):
'''Perform the Bandwidth test'''
self.incr("calls")
if self._cm.upcount() < 1:
return self.skipped()
Path = self._cm.InternalCommConfig()
if "ip" not in Path["mediatype"]:
return self.skipped()
port = Path["port"][0]
port = int(port)
ret = self._startall(None)
if not ret:
return self.failure("Test setup failed")
time.sleep(5) # We get extra messages right after startup.
fstmpfile = "/var/run/band_estimate"
dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \
% (port, fstmpfile)
(rc, _) = self._rsh(node, dumpcmd)
if rc == 0:
farfile = "root@%s:%s" % (node, fstmpfile)
self._rsh.copy(farfile, self.tempfile)
Bandwidth = self.countbandwidth(self.tempfile)
if not Bandwidth:
self._logger.log("Could not compute bandwidth.")
return self.success()
intband = int(Bandwidth + 0.5)
self._logger.log("...bandwidth: %d bits/sec" % intband)
self.stats["totalbandwidth"] += Bandwidth
if self.stats["min"] == 0:
self.stats["min"] = Bandwidth
if Bandwidth > self.stats["max"]:
self.stats["max"] = Bandwidth
if Bandwidth < self.stats["min"]:
self.stats["min"] = Bandwidth
self._rsh(node, "rm -f %s" % fstmpfile)
os.unlink(self.tempfile)
return self.success()
else:
return self.failure("no response from tcpdump command [%d]!" % rc)
def countbandwidth(self, file):
fp = open(file, "r")
fp.seek(0)
count = 0
sum = 0
while 1:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count + 1
linesplit = line.split(" ")
for j in range(len(linesplit)-1):
if linesplit[j] == "udp": break
if linesplit[j] == "length:": break
try:
sum = sum + int(linesplit[j+1])
except ValueError:
self._logger.log("Invalid tcpdump line: %s" % line)
return None
T1 = linesplit[0]
timesplit = T1.split(":")
time2split = timesplit[2].split(".")
time1 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
break
while count < 100:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count+1
linessplit = line.split(" ")
for j in range(len(linessplit)-1):
if linessplit[j] == "udp": break
if linessplit[j] == "length:": break
try:
sum = int(linessplit[j+1]) + sum
except ValueError:
self._logger.log("Invalid tcpdump line: %s" % line)
return None
T2 = linessplit[0]
timesplit = T2.split(":")
time2split = timesplit[2].split(".")
time2 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
time = time2-time1
if (time <= 0):
return 0
return int((sum*8)/time)
def is_applicable(self):
'''BandwidthTest never applicable'''
return False
AllTestClasses.append(BandwidthTest)
###################################################################
class MaintenanceMode(CTSTest):
###################################################################
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "MaintenanceMode"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.max = 30
self.benchmark = True
self.action = "asyncmon"
self.interval = 0
self.rid = "maintenanceDummy"
def toggleMaintenanceMode(self, node, action):
pats = []
pats.append(self.templates["Pat:DC_IDLE"])
# fail the resource right after turning Maintenance mode on
# verify it is not recovered until maintenance mode is turned off
if action == "On":
pats.append(self.templates["Pat:RscOpFail"] % (self.action, self.rid))
else:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
watch = self.create_watch(pats, 60)
watch.set_watch()
self.debug("Turning maintenance mode %s" % action)
self._rsh(node, self.templates["MaintenanceMode%s" % (action)])
if (action == "On"):
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
with Timer(self._logger, self.name, "recover%s" % action):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when turning maintenance mode %s" % action)
return repr(watch.unmatched)
return ""
def insertMaintenanceDummy(self, node):
pats = []
pats.append(("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self.rid)))
watch = self.create_watch(pats, 60)
watch.set_watch()
self._cm.AddDummyRsc(node, self.rid)
with Timer(self._logger, self.name, "addDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when adding maintenance dummy resource")
return repr(watch.unmatched)
return ""
def removeMaintenanceDummy(self, node):
pats = []
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
watch = self.create_watch(pats, 60)
watch.set_watch()
self._cm.RemoveDummyRsc(node, self.rid)
with Timer(self._logger, self.name, "removeDummy"):
watch.look_for_all()
if watch.unmatched:
self.debug("Failed to find patterns when removing maintenance dummy resource")
return repr(watch.unmatched)
return ""
def managedRscList(self, node):
rscList = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if tmp.managed:
rscList.append(tmp.id)
return rscList
def verifyResources(self, node, rscList, managed):
managedList = list(rscList)
managed_str = "managed"
if not managed:
managed_str = "unmanaged"
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self._cm, line)
if managed and not tmp.managed:
continue
elif not managed and tmp.managed:
continue
elif managedList.count(tmp.id):
managedList.remove(tmp.id)
if len(managedList) == 0:
self.debug("Found all %s resources on %s" % (managed_str, node))
return True
self._logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managedList))
return False
def __call__(self, node):
'''Perform the 'MaintenanceMode' test. '''
self.incr("calls")
verify_managed = False
verify_unmanaged = False
failPat = ""
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
# get a list of all the managed resources. We use this list
# after enabling maintenance mode to verify all managed resources
# become un-managed. After maintenance mode is turned off, we use
# this list to verify all the resources become managed again.
managedResources = self.managedRscList(node)
if len(managedResources) == 0:
self._logger.log("No managed resources on %s" % node)
return self.skipped()
# insert a fake resource we can fail during maintenance mode
# so we can verify recovery does not take place until after maintenance
# mode is disabled.
failPat = failPat + self.insertMaintenanceDummy(node)
# toggle maintenance mode ON, then fail dummy resource.
failPat = failPat + self.toggleMaintenanceMode(node, "On")
# verify all the resources are now unmanaged
if self.verifyResources(node, managedResources, False):
verify_unmanaged = True
# Toggle maintenance mode OFF, verify dummy is recovered.
failPat = failPat + self.toggleMaintenanceMode(node, "Off")
# verify all the resources are now managed again
if self.verifyResources(node, managedResources, True):
verify_managed = True
# Remove our maintenance dummy resource.
failPat = failPat + self.removeMaintenanceDummy(node)
self._cm.cluster_stable()
if failPat != "":
return self.failure("Unmatched patterns: %s" % (failPat))
elif verify_unmanaged is False:
return self.failure("Failed to verify resources became unmanaged during maintenance mode")
elif verify_managed is False:
return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for %s" % self.rid,
r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self.rid,
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self.action, self.rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval) ]
AllTestClasses.append(MaintenanceMode)
class ResourceRecover(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "ResourceRecover"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.max = 30
self.rid = None
self.rid_alt = None
self.benchmark = True
# these are the values used for the new LRM API call
self.action = "asyncmon"
self.interval = 0
def __call__(self, node):
'''Perform the 'ResourceRecover' test. '''
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self._cm.active_resources(node)
if len(resourcelist) == 0:
self._logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self.choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self.rid)
if rsc.id == rsc.clone_id:
self.debug("Failing " + rsc.id)
else:
self.debug("Failing " + rsc.id + " (also known as " + rsc.clone_id + ")")
# Log patterns to watch for (failure, plus restart if managed)
pats = []
pats.append(self.templates["Pat:CloneOpFail"] % (self.action, rsc.id, rsc.clone_id))
if rsc.managed:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
if rsc.unique:
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self.fail_resource(rsc, node, pats) is None:
return None # self.failure() already called
return self.success()
def choose_resource(self, node, resourcelist):
""" Choose a random resource to target """
self.rid = self._env.random_gen.choice(resourcelist)
self.rid_alt = self.rid
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self._cm, line)
if rsc.id == self.rid:
# Handle anonymous clones that get renamed
self.rid = rsc.clone_id
return rsc
return None
def get_failcount(self, node):
""" Check the fail count of targeted resource on given node """
(rc, lines) = self._rsh(node,
"crm_failcount --quiet --query --resource %s "
"--operation %s --interval %d "
"--node %s" % (self.rid, self.action,
self.interval, node), verbose=1)
if rc != 0 or len(lines) != 1:
self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc,
" // ".join(map(str.strip, lines))))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self._logger.log("crm_failcount output on %s unparseable: %s" % (node,
' '.join(lines)))
return -1
return failcount
def fail_resource(self, rsc, node, pats):
""" Fail the targeted resource, and verify as expected """
orig_failcount = self.get_failcount(node)
watch = self.create_watch(pats, 60)
watch.set_watch()
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
with Timer(self._logger, self.name, "recover"):
watch.look_for_all()
self._cm.cluster_stable()
recovered = self._cm.ResourceLocation(self.rid)
if watch.unmatched:
return self.failure("Patterns not found: %s" % repr(watch.unmatched))
elif rsc.unique and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %s"%(self.rid, repr(recovered)))
elif len(recovered) > 0:
self.debug("%s is running on: %s" % (self.rid, repr(recovered)))
elif rsc.managed:
return self.failure("%s was not recovered and is inactive" % self.rid)
new_failcount = self.get_failcount(node)
if new_failcount != (orig_failcount + 1):
return self.failure("%s fail count is %d not %d" % (self.rid,
new_failcount, orig_failcount + 1))
return 0 # Anything but None is success
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for %s" % self.rid,
r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self.rid, self.rid_alt),
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self.action, self.rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval) ]
AllTestClasses.append(ResourceRecover)
class ComponentFail(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "ComponentFail"
self._startall = SimulStartLite(cm)
self.complist = cm.Components()
self.patterns = []
self.okerrpatterns = []
self.is_unsafe = True
def __call__(self, node):
'''Perform the 'ComponentFail' test. '''
self.incr("calls")
self.patterns = []
self.okerrpatterns = []
# start all nodes
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
if not self._cm.cluster_stable(self._env["StableTime"]):
return self.failure("Setup failed - unstable")
node_is_dc = self._cm.is_node_dc(node, None)
# select a component to kill
chosen = self._env.random_gen.choice(self.complist)
while chosen.dc_only and node_is_dc == 0:
chosen = self._env.random_gen.choice(self.complist)
self.debug("...component %s (dc=%d)" % (chosen.name, node_is_dc))
self.incr(chosen.name)
if chosen.name != "corosync":
self.patterns.append(self.templates["Pat:ChildKilled"] %(node, chosen.name))
self.patterns.append(self.templates["Pat:ChildRespawn"] %(node, chosen.name))
self.patterns.extend(chosen.pats)
if node_is_dc:
self.patterns.extend(chosen.dc_pats)
# @TODO this should be a flag in the Component
if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]:
# Ignore actions for fence devices if fencer will respawn
# (their registration will be lost, and probes will fail)
self.okerrpatterns = [ self.templates["Pat:Fencing_active"] ]
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.okerrpatterns.append(self.templates["Pat:Fencing_recover"] % r.id)
self.okerrpatterns.append(self.templates["Pat:Fencing_probe"] % r.id)
# supply a copy so self.patterns doesn't end up empty
tmpPats = []
tmpPats.extend(self.patterns)
self.patterns.extend(chosen.badnews_ignore)
# Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
stonithPats = []
stonithPats.append(self.templates["Pat:Fencing_ok"] % node)
stonith = self.create_watch(stonithPats, 0)
stonith.set_watch()
# set the watch for stable
watch = self.create_watch(
tmpPats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
watch.set_watch()
# kill the component
chosen.kill(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
self.debug("Waiting for any fenced node to come back up")
self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
self._cm.cluster_stable(self._env["StartTime"])
self.debug("Checking if %s was shot" % node)
shot = stonith.look(60)
if shot:
self.debug("Found: " + repr(shot))
self.okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
if not self._env["at-boot"]:
self._cm.ShouldBeStatus[node] = "down"
# If fencing occurred, chances are many (if not all) the expected logs
# will not be sent - or will be lost when the node reboots
return self.success()
# check for logs indicating a graceful recovery
matched = watch.look_for_all(allow_multiple_matches=True)
if watch.unmatched:
self._logger.log("Patterns not found: " + repr(watch.unmatched))
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self._cm.cluster_stable(self._env["StartTime"])
if not matched:
return self.failure("Didn't find all expected %s patterns" % chosen.name)
elif not is_stable:
return self.failure("Cluster did not become stable after killing %s" % chosen.name)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Note that okerrpatterns refers to the last time we ran this test
# The good news is that this works fine for us...
self.okerrpatterns.extend(self.patterns)
return self.okerrpatterns
AllTestClasses.append(ComponentFail)
class SplitBrainTest(CTSTest):
'''It is used to test split-brain. when the path between the two nodes break
check the two nodes both take over the resource'''
def __init__(self,cm):
CTSTest.__init__(self,cm)
self.name = "SplitBrain"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.is_experimental = True
def isolate_partition(self, partition):
other_nodes = []
other_nodes.extend(self._env["nodes"])
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self._logger.log("Node "+node+" not in " + repr(self._env["nodes"]) + " from " +repr(partition))
if len(other_nodes) == 0:
return 1
self.debug("Creating partition: " + repr(partition))
self.debug("Everyone else: " + repr(other_nodes))
for node in partition:
if not self._cm.isolate_node(node, other_nodes):
self._logger.log("Could not isolate %s" % node)
return 0
return 1
def heal_partition(self, partition):
other_nodes = []
other_nodes.extend(self._env["nodes"])
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self._logger.log("Node "+node+" not in " + repr(self._env["nodes"]))
if len(other_nodes) == 0:
return 1
self.debug("Healing partition: " + repr(partition))
self.debug("Everyone else: " + repr(other_nodes))
for node in partition:
self._cm.unisolate_node(node, other_nodes)
def __call__(self, node):
'''Perform split-brain test'''
self.incr("calls")
self.passed = True
partitions = {}
ret = self._startall(None)
if not ret:
return self.failure("Setup failed")
while 1:
# Retry until we get multiple partitions
partitions = {}
p_max = len(self._env["nodes"])
for node in self._env["nodes"]:
p = self._env.random_gen.randint(1, p_max)
if not p in partitions:
partitions[p] = []
partitions[p].append(node)
p_max = len(list(partitions.keys()))
if p_max > 1:
break
# else, try again
self.debug("Created %d partitions" % p_max)
for key in list(partitions.keys()):
self.debug("Partition["+str(key)+"]:\t"+repr(partitions[key]))
# Disabling STONITH to reduce test complexity for now
self._rsh(node, "crm_attribute -V -n stonith-enabled -v false")
for key in list(partitions.keys()):
self.isolate_partition(partitions[key])
count = 30
while count > 0:
if len(self._cm.find_partitions()) != p_max:
time.sleep(10)
else:
break
else:
self.failure("Expected partitions were not created")
# Target number of partitions formed - wait for stability
if not self._cm.cluster_stable():
self.failure("Partitioned cluster not stable")
# Now audit the cluster state
self._cm.partitions_expected = p_max
if not self.audit():
self.failure("Audits failed")
self._cm.partitions_expected = 1
# And heal them again
for key in list(partitions.keys()):
self.heal_partition(partitions[key])
# Wait for a single partition to form
count = 30
while count > 0:
if len(self._cm.find_partitions()) != 1:
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not reform")
# Wait for it to have the right number of members
count = 30
while count > 0:
members = []
partitions = self._cm.find_partitions()
if len(partitions) > 0:
members = partitions[0].split()
if len(members) != len(self._env["nodes"]):
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not completely reform")
# Wait up to 20 minutes - the delay is more preferable than
# trying to continue with in a messed up state
if not self._cm.cluster_stable(1200):
self.failure("Reformed cluster not stable")
if self._env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Reformed cluster not stable")
# Turn fencing back on
if self._env["DoFencing"]:
self._rsh(node, "crm_attribute -V -D -n stonith-enabled")
self._cm.cluster_stable()
if self.passed:
return self.success()
return self.failure("See previous errors")
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Another DC detected:",
r"(ERROR|error).*: .*Application of an update diff failed",
r"pacemaker-controld.*:.*not in our membership list",
r"CRIT:.*node.*returning after partition" ]
def is_applicable(self):
if not CTSTest.is_applicable(self):
return False
return len(self._env["nodes"]) > 2
AllTestClasses.append(SplitBrainTest)
class Reattach(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Reattach"
self._startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
self.is_unsafe = False
def _is_managed(self, node):
(_, is_managed) = self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
is_managed = is_managed[0].strip()
return is_managed == "true"
def _set_unmanaged(self, node):
self.debug("Disable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
def _set_managed(self, node):
self.debug("Re-enable resource management")
self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
def setup(self, node):
attempt = 0
if not self._startall(None):
return None
# Make sure we are really _really_ stable and that all
# resources, including those that depend on transient node
# attributes, are started
while not self._cm.cluster_stable(double_check=True):
if attempt < 5:
attempt += 1
self.debug("Not stable yet, re-testing")
else:
self._logger.log("Cluster is not stable")
return None
return 1
def teardown(self, node):
# Make sure 'node' is up
start = StartTest(self._cm)
start(node)
if not self._is_managed(node):
self._logger.log("Attempting to re-enable resource management on %s" % node)
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
self._logger.log("Could not re-enable resource management")
return 0
return 1
def can_run_now(self, node):
""" Return True if we can meaningfully run right now"""
if self._find_ocfs2_resources(node):
self._logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present")
return False
return True
def __call__(self, node):
self.incr("calls")
pats = []
# Conveniently, the scheduler will display this message when disabling
# management, even if fencing is not enabled, so we can rely on it.
managed = self.create_watch(["No fencing will be done"], 60)
managed.set_watch()
self._set_unmanaged(node)
if not managed.look_for_all():
self._logger.log("Patterns not found: " + repr(managed.unmatched))
return self.failure("Resource management not disabled")
pats = []
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("stop", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("promote", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("demote", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("migrate", ".*"))
watch = self.create_watch(pats, 60, "ShutdownActivity")
watch.set_watch()
self.debug("Shutting down the cluster")
ret = self.stopall(None)
if not ret:
self._set_managed(node)
return self.failure("Couldn't shut down the cluster")
self.debug("Bringing the cluster back up")
ret = self._startall(None)
time.sleep(5) # allow ping to update the CIB
if not ret:
self._set_managed(node)
return self.failure("Couldn't restart the cluster")
if self.local_badnews("ResourceActivity:", watch):
self._set_managed(node)
return self.failure("Resources stopped or started during cluster restart")
watch = self.create_watch(pats, 60, "StartupActivity")
watch.set_watch()
# Re-enable resource management (and verify it happened).
self._set_managed(node)
self._cm.cluster_stable()
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
# Ignore actions for STONITH resources
ignore = []
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rclass == "stonith":
self.debug("Ignoring start actions for %s" % r.id)
ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
if self.local_badnews("ResourceActivity:", watch, ignore):
return self.failure("Resources stopped or started after resource management was re-enabled")
return ret
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"resource( was|s were) active at shutdown" ]
def is_applicable(self):
return True
AllTestClasses.append(Reattach)
class SpecialTest1(CTSTest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SpecialTest1"
self._startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, node):
'''Perform the 'SpecialTest1' test for Andrew. '''
self.incr("calls")
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Could not stop all nodes")
# Test config recovery when the other nodes come up
self._rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*")
# Start the selected node
ret = self.restart1(node)
if not ret:
return self.failure("Could not start "+node)
# Start all remaining nodes
ret = self._startall(None)
if not ret:
return self.failure("Could not start the remaining nodes")
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
# Errors that occur as a result of the CIB being wiped
return [ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
r"error.*: Resource start-up disabled since no STONITH resources have been defined",
r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity" ]
AllTestClasses.append(SpecialTest1)
class HAETest(CTSTest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "HAETest"
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
self.is_loop = True
def setup(self, node):
# Start all remaining nodes
ret = self._startall(None)
if not ret:
return self.failure("Couldn't start all nodes")
return self.success()
def teardown(self, node):
# Stop everything
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
return self.success()
def wait_on_state(self, node, resource, expected_clones, attempts=240):
while attempts > 0:
active = 0
(rc, lines) = self._rsh(node, "crm_resource -r %s -W -Q" % resource, verbose=1)
# Hack until crm_resource does the right thing
if rc == 0 and lines:
active = len(lines)
if len(lines) == expected_clones:
return 1
elif rc == 1:
self.debug("Resource %s is still inactive" % resource)
elif rc == 234:
self._logger.log("Unknown resource %s" % resource)
return 0
elif rc == 246:
self._logger.log("Cluster is inactive")
return 0
elif rc != 0:
self._logger.log("Call to crm_resource failed, rc=%d" % rc)
return 0
else:
self.debug("Resource %s is active on %d times instead of %d" % (resource, active, expected_clones))
attempts -= 1
time.sleep(1)
return 0
def find_dlm(self, node):
self.r_dlm = None
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self._cm, line)
if r.rtype == "controld" and r.parent != "NA":
self.debug("Found dlm: %s" % self.r_dlm)
self.r_dlm = r.parent
return 1
return 0
def find_hae_resources(self, node):
self.r_dlm = None
self._r_o2cb = None
self._r_ocfs2 = []
if self.find_dlm(node):
self._find_ocfs2_resources(node)
def is_applicable(self):
if not CTSTest.is_applicable(self):
return False
if self._env["Schema"] == "hae":
return True
return None
class HAERoleTest(HAETest):
def __init__(self, cm):
'''Lars' mount/unmount test for the HA extension. '''
HAETest.__init__(self,cm)
self.name = "HAERoleTest"
def change_state(self, node, resource, target):
(rc, _) = self._rsh(node, "crm_resource -V -r %s -p target-role -v %s --meta" % (resource, target))
return rc
def __call__(self, node):
self.incr("calls")
lpc = 0
failed = 0
delay = 2
done = time.time() + self._env["loop-minutes"]*60
self.find_hae_resources(node)
clone_max = len(self._env["nodes"])
while time.time() <= done and not failed:
lpc = lpc + 1
self.change_state(node, self.r_dlm, "Stopped")
if not self.wait_on_state(node, self.r_dlm, 0):
self.failure("%s did not go down correctly" % self.r_dlm)
failed = lpc
self.change_state(node, self.r_dlm, "Started")
if not self.wait_on_state(node, self.r_dlm, clone_max):
self.failure("%s did not come up correctly" % self.r_dlm)
failed = lpc
if not self.wait_on_state(node, self._r_o2cb, clone_max):
self.failure("%s did not come up correctly" % self._r_o2cb)
failed = lpc
for fs in self._r_ocfs2:
if not self.wait_on_state(node, fs, clone_max):
self.failure("%s did not come up correctly" % fs)
failed = lpc
if failed:
return self.failure("iteration %d failed" % failed)
return self.success()
AllTestClasses.append(HAERoleTest)
class HAEStandbyTest(HAETest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
HAETest.__init__(self,cm)
self.name = "HAEStandbyTest"
def change_state(self, node, resource, target):
(rc, _) = self._rsh(node, "crm_standby -V -l reboot -v %s" % (target))
return rc
def __call__(self, node):
self.incr("calls")
lpc = 0
failed = 0
done = time.time() + self._env["loop-minutes"]*60
self.find_hae_resources(node)
clone_max = len(self._env["nodes"])
while time.time() <= done and not failed:
lpc = lpc + 1
self.change_state(node, self.r_dlm, "true")
if not self.wait_on_state(node, self.r_dlm, clone_max-1):
self.failure("%s did not go down correctly" % self.r_dlm)
failed = lpc
self.change_state(node, self.r_dlm, "false")
if not self.wait_on_state(node, self.r_dlm, clone_max):
self.failure("%s did not come up correctly" % self.r_dlm)
failed = lpc
if not self.wait_on_state(node, self._r_o2cb, clone_max):
self.failure("%s did not come up correctly" % self._r_o2cb)
failed = lpc
for fs in self._r_ocfs2:
if not self.wait_on_state(node, fs, clone_max):
self.failure("%s did not come up correctly" % fs)
failed = lpc
if failed:
return self.failure("iteration %d failed" % failed)
return self.success()
AllTestClasses.append(HAEStandbyTest)
class NearQuorumPointTest(CTSTest):
'''
This test brings larger clusters near the quorum point (50%).
In addition, it will test doing starts and stops at the same time.
Here is how I think it should work:
- loop over the nodes and decide randomly which will be up and which
will be down Use a 50% probability for each of up/down.
- figure out what to do to get into that state from the current state
- in parallel, bring up those going up and bring those going down.
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "NearQuorumPoint"
def __call__(self, dummy):
'''Perform the 'NearQuorumPoint' test. '''
self.incr("calls")
startset = []
stopset = []
stonith = self._cm.prepare_fencing_watcher("NearQuorumPoint")
#decide what to do with each node
for node in self._env["nodes"]:
action = self._env.random_gen.choice(["start","stop"])
#action = self._env.random_gen.choice(["start","stop","no change"])
if action == "start" :
startset.append(node)
elif action == "stop" :
stopset.append(node)
self.debug("start nodes:" + repr(startset))
self.debug("stop nodes:" + repr(stopset))
#add search patterns
watchpats = [ ]
for node in stopset:
if self._cm.ShouldBeStatus[node] == "up":
watchpats.append(self.templates["Pat:We_stopped"] % node)
for node in startset:
if self._cm.ShouldBeStatus[node] == "down":
#watchpats.append(self.templates["Pat:NonDC_started"] % node)
watchpats.append(self.templates["Pat:Local_started"] % node)
else:
for stopping in stopset:
if self._cm.ShouldBeStatus[stopping] == "up":
watchpats.append(self.templates["Pat:They_stopped"] % (node, self._cm.key_for_node(stopping)))
if len(watchpats) == 0:
return self.skipped()
if len(startset) != 0:
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
watch.set_watch()
#begin actions
for node in stopset:
if self._cm.ShouldBeStatus[node] == "up":
self._cm.StopaCMnoBlock(node)
for node in startset:
if self._cm.ShouldBeStatus[node] == "down":
self._cm.StartaCMnoBlock(node)
#get the result
if watch.look_for_all():
self._cm.cluster_stable()
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
return self.success()
self._logger.log("Warn: Patterns not found: " + repr(watch.unmatched))
#get the "bad" nodes
upnodes = []
for node in stopset:
if self._cm.StataCM(node) == 1:
upnodes.append(node)
downnodes = []
for node in startset:
if self._cm.StataCM(node) == 0:
downnodes.append(node)
self._cm.fencing_cleanup("NearQuorumPoint", stonith)
if upnodes == [] and downnodes == []:
self._cm.cluster_stable()
# Make sure they're completely down with no residule
for node in stopset:
self._rsh(node, self.templates["StopCmd"])
return self.success()
if len(upnodes) > 0:
self._logger.log("Warn: Unstoppable nodes: " + repr(upnodes))
if len(downnodes) > 0:
self._logger.log("Warn: Unstartable nodes: " + repr(downnodes))
return self.failure()
def is_applicable(self):
return True
AllTestClasses.append(NearQuorumPointTest)
class RollingUpgradeTest(CTSTest):
'''Perform a rolling upgrade of the cluster'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RollingUpgrade"
self._start = StartTest(cm)
self._stop = StopTest(cm)
self.stopall = SimulStopLite(cm)
self._startall = SimulStartLite(cm)
def setup(self, node):
# Start all remaining nodes
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
for node in self._env["nodes"]:
if not self.downgrade(node, None):
return self.failure("Couldn't downgrade %s" % node)
ret = self._startall(None)
if not ret:
return self.failure("Couldn't start all nodes")
return self.success()
def teardown(self, node):
# Stop everything
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
for node in self._env["nodes"]:
if not self.upgrade(node, None):
return self.failure("Couldn't upgrade %s" % node)
return self.success()
def install(self, node, version, start=1, flags="--force"):
target_dir = "/tmp/rpm-%s" % version
src_dir = "%s/%s" % (self._env["rpm-dir"], version)
self._logger.log("Installing %s on %s with %s" % (version, node, flags))
if not self._stop(node):
return self.failure("stop failure: "+node)
self._rsh(node, "mkdir -p %s" % target_dir)
self._rsh(node, "rm -f %s/*.rpm" % target_dir)
(_, lines) = self._rsh(node, "ls -1 %s/*.rpm" % src_dir, verbose=1)
for line in lines:
line = line[:-1]
rc = self._rsh.copy("%s" % (line), "%s:%s/" % (node, target_dir))
self._rsh(node, "rpm -Uvh %s %s/*.rpm" % (flags, target_dir))
if start and not self._start(node):
return self.failure("start failure: "+node)
return self.success()
def upgrade(self, node, start=1):
return self.install(node, self._env["current-version"], start)
def downgrade(self, node, start=1):
return self.install(node, self._env["previous-version"], start, "--force --nodeps")
def __call__(self, node):
'''Perform the 'Rolling Upgrade' test. '''
self.incr("calls")
for node in self._env["nodes"]:
if self.upgrade(node):
return self.failure("Couldn't upgrade %s" % node)
self._cm.cluster_stable()
return self.success()
def is_applicable(self):
if not CTSTest.is_applicable(self):
return None
- if not "rpm-dir" in list(self._env.keys()):
+ if "rpm-dir" not in self._env:
return None
- if not "current-version" in list(self._env.keys()):
+ if "current-version" not in self._env:
return None
- if not "previous-version" in list(self._env.keys()):
+ if "previous-version" not in self._env:
return None
return 1
# Register RestartTest as a good test to run
AllTestClasses.append(RollingUpgradeTest)
class BSC_AddResource(CTSTest):
'''Add a resource to the cluster'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "AddResource"
self.resource_offset = 0
self.cib_cmd = """cibadmin -C -o %s -X '%s' """
def __call__(self, node):
self.incr("calls")
self.resource_offset = self.resource_offset + 1
r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset)
start_pat = "pacemaker-controld.*%s_start_0.*confirmed.*ok"
patterns = []
patterns.append(start_pat % r_id)
watch = self.create_watch(patterns, self._env["DeadTime"])
watch.set_watch()
ip = self.NextIP()
if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip):
return self.failure("Make resource %s failed" % r_id)
failed = 0
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self._logger.log ("Warn: Pattern not found: %s" % (regex))
failed = 1
if failed:
return self.failure("Resource pattern(s) not found")
if not self._cm.cluster_stable(self._env["DeadTime"]):
return self.failure("Unstable cluster")
return self.success()
def NextIP(self):
ip = self._env["IPBase"]
if ":" in ip:
fields = ip.rpartition(":")
fields[2] = str(hex(int(fields[2], 16)+1))
print(str(hex(int(f[2], 16)+1)))
else:
fields = ip.rpartition('.')
fields[2] = str(int(fields[2])+1)
ip = fields[0] + fields[1] + fields[3];
self._env["IPBase"] = ip
return ip.strip()
def make_ip_resource(self, node, id, rclass, type, ip):
self._logger.log("Creating %s:%s:%s (%s) on %s" % (rclass,type,id,ip,node))
rsc_xml="""
""" % (id, rclass, type, id, id, ip)
node_constraint = """
""" % (id, id, id, id, node)
rc = 0
(rc, _) = self._rsh(node, self.cib_cmd % ("constraints", node_constraint), verbose=1)
if rc != 0:
self._logger.log("Constraint creation failed: %d" % rc)
return None
(rc, _) = self._rsh(node, self.cib_cmd % ("resources", rsc_xml), verbose=1)
if rc != 0:
self._logger.log("Resource creation failed: %d" % rc)
return None
return 1
def is_applicable(self):
if self._env["DoBSC"]:
return True
return None
AllTestClasses.append(BSC_AddResource)
def TestList(cm, audits):
result = []
for testclass in AllTestClasses:
bound_test = testclass(cm)
if bound_test.is_applicable():
bound_test.audits = audits
result.append(bound_test)
return result
class RemoteLXC(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RemoteLXC"
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
self.num_containers = 2
self.is_container = True
self.fail_string = ""
def start_lxc_simple(self, node):
# restore any artifacts laying around from a previous test.
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
# generate the containers, put them in the config, add some resources to them
pats = [ ]
watch = self.create_watch(pats, 120)
watch.set_watch()
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc1"))
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc2"))
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc-ms"))
pats.append(self.templates["Pat:RscOpOK"] % ("promote", "lxc-ms"))
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -g -a -m -s -c %d &>/dev/null" % self.num_containers)
with Timer(self._logger, self.name, "remoteSimpleInit"):
watch.look_for_all()
if watch.unmatched:
self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
self.failed = True
def cleanup_lxc_simple(self, node):
pats = [ ]
# if the test failed, attempt to clean up the cib and libvirt environment
# as best as possible
if self.failed:
# restore libvirt and cib
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
return
watch = self.create_watch(pats, 120)
watch.set_watch()
pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container1"))
pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container2"))
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -p &>/dev/null")
with Timer(self._logger, self.name, "remoteSimpleCleanup"):
watch.look_for_all()
if watch.unmatched:
self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
self.failed = True
# cleanup libvirt
self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
def __call__(self, node):
'''Perform the 'RemoteLXC' test. '''
self.incr("calls")
ret = self._startall(None)
if not ret:
return self.failure("Setup failed, start all nodes failed.")
(rc, _) = self._rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -v &>/dev/null")
if rc == 1:
self.log("Environment test for lxc support failed.")
return self.skipped()
self.start_lxc_simple(node)
self.cleanup_lxc_simple(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Updating failcount for ping",
r"schedulerd.*: Recover\s+(ping|lxc-ms|container)\s+\(.*\)",
# The orphaned lxc-ms resource causes an expected transition error
# that is a result of the scheduler not having knowledge that the
# promotable resource used to be a clone. As a result, it looks like that
# resource is running in multiple locations when it shouldn't... But in
# this instance we know why this error is occurring and that it is expected.
r"Calculated [Tt]ransition .*pe-error",
r"Resource lxc-ms .* is active on 2 nodes attempting recovery",
r"Unknown operation: fail",
r"VirtualDomain.*ERROR: Unable to determine emulator" ]
AllTestClasses.append(RemoteLXC)
class RemoteBasic(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteBasic"
def __call__(self, node):
'''Perform the 'RemoteBaremetal' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.test_attributes(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
AllTestClasses.append(RemoteBasic)
class RemoteStonithd(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteStonithd"
def __call__(self, node):
'''Perform the 'RemoteStonithd' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.fail_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return False
- if "DoFencing" in list(self._env.keys()):
+ if "DoFencing" in self._env:
return self._env["DoFencing"]
return True
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"Lost connection to Pacemaker Remote node",
r"Software caused connection abort",
r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
r"error: Result of monitor operation for .* on remote-.*: Internal communication failure" ] + super().errors_to_ignore
AllTestClasses.append(RemoteStonithd)
class RemoteMigrate(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteMigrate"
def __call__(self, node):
'''Perform the 'RemoteMigrate' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.migrate_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return 0
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
if len(self._env["nodes"]) < 3:
return 0
return 1
AllTestClasses.append(RemoteMigrate)
class RemoteRscFailure(RemoteDriver):
def __init__(self, cm):
RemoteDriver.__init__(self, cm)
self.name = "RemoteRscFailure"
def __call__(self, node):
'''Perform the 'RemoteRscFailure' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
# This is an important step. We are migrating the connection
# before failing the resource. This verifies that the migration
# has properly maintained control over the remote-node.
self.migrate_connection(node)
self.fail_rsc(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self._cm.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
r"Dummy.*: No process state file found" ] + super().errors_to_ignore
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return 0
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
if len(self._env["nodes"]) < 3:
return 0
return 1
AllTestClasses.append(RemoteRscFailure)
# vim:ts=4:sw=4:et:
diff --git a/python/pacemaker/_cts/CTS.py b/python/pacemaker/_cts/CTS.py
index f31fd9016e..b41f776301 100644
--- a/python/pacemaker/_cts/CTS.py
+++ b/python/pacemaker/_cts/CTS.py
@@ -1,237 +1,239 @@
""" Main classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["CtsLab", "NodeStatus", "Process"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys
import time
import traceback
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.input import should_continue
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
class CtsLab:
""" A class that defines the Lab Environment for the Cluster Test System.
It defines those things which are expected to change from test
environment to test environment for the same cluster manager.
This is where you define the set of nodes that are in your test lab,
what kind of reset mechanism you use, etc. All this data is stored
as key/value pairs in an Environment instance constructed from arguments
passed to this class.
The CTS code ignores names it doesn't know about or need. Individual
tests have access to this information, and it is perfectly acceptable
to provide hints, tweaks, fine-tuning directions, or other information
to the tests through this mechanism.
"""
def __init__(self, args=None):
""" Create a new CtsLab instance. This class can be treated kind
of like a dictionary due to the presence of typical dict functions
like has_key, __getitem__, and __setitem__. However, it is not a
dictionary so do not rely on standard dictionary behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
"""
self._env = EnvFactory().getInstance(args)
self._logger = LogFactory()
def dump(self):
""" Print the current environment """
self._env.dump()
def has_key(self, key):
""" Does the given environment key exist? """
- return key in list(self._env.keys())
+ # pylint gets confused because of EnvFactory here.
+ # pylint: disable=unsupported-membership-test
+ return key in self._env
def __getitem__(self, key):
""" Return the given environment key, or raise KeyError if it does
not exist
"""
# Throughout this file, pylint has trouble understanding that EnvFactory
# and RemoteFactory are singleton instances that can be treated as callable
# and subscriptable objects. Various warnings are disabled because of this.
# See also a comment about self._rsh in environment.py.
# pylint: disable=unsubscriptable-object
return self._env[key]
def __setitem__(self, key, value):
""" Set the given environment key to the given value, overriding any
previous value
"""
# pylint: disable=unsupported-assignment-operation
self._env[key] = value
def run(self, scenario, iterations):
""" Run the given scenario the given number of times.
Returns:
ExitStatus.OK on success, or ExitStatus.ERROR on error
"""
if not scenario:
self._logger.log("No scenario was defined")
return ExitStatus.ERROR
self._logger.log("Cluster nodes: ")
# pylint: disable=unsubscriptable-object
for node in self._env["nodes"]:
self._logger.log(" * %s" % (node))
if not scenario.SetUp():
return ExitStatus.ERROR
# We want to alert on any exceptions caused by running a scenario, so
# here it's okay to disable the pylint warning.
# pylint: disable=bare-except
try:
scenario.run(iterations)
except:
self._logger.log("Exception by %s" % sys.exc_info()[0])
self._logger.traceback(traceback)
scenario.summarize()
scenario.TearDown()
return ExitStatus.ERROR
scenario.TearDown()
scenario.summarize()
if scenario.Stats["failure"] > 0:
return ExitStatus.ERROR
if scenario.Stats["success"] != iterations:
self._logger.log("No failure count but success != requested iterations")
return ExitStatus.ERROR
return ExitStatus.OK
class NodeStatus:
""" A class for querying the status of cluster nodes - are nodes up? Do
they respond to SSH connections?
"""
def __init__(self, env):
""" Create a new NodeStatus instance
Arguments:
env -- An Environment instance
"""
self._env = env
def _node_booted(self, node):
""" Return True if the given node is booted (responds to pings) """
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, verbose=0)
return rc == 0
def _sshd_up(self, node):
""" Return true if sshd responds on the given node """
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()(node, "true", verbose=0)
return rc == 0
def wait_for_node(self, node, timeout=300):
""" Wait for a node to become available. Should the timeout be reached,
the user will be given a choice whether to continue or not. If not,
ValueError will be raised.
Returns:
True when the node is available, or False if the timeout is reached.
"""
initial_timeout = timeout
anytimeouts = False
while timeout > 0:
if self._node_booted(node) and self._sshd_up(node):
if anytimeouts:
# Fudge to wait for the system to finish coming up
time.sleep(30)
LogFactory().debug("Node %s now up" % node)
return True
time.sleep(30)
if not anytimeouts:
LogFactory().debug("Waiting for node %s to come up" % node)
anytimeouts = True
timeout -= 1
LogFactory().log("%s did not come up within %d tries" % (node, initial_timeout))
if not should_continue(self._env["continue"]):
raise ValueError("%s did not come up within %d tries" % (node, initial_timeout))
return False
def wait_for_all_nodes(self, nodes, timeout=300):
""" Return True when all nodes come up, or False if the timeout is reached """
for node in nodes:
if not self.wait_for_node(node, timeout):
return False
return True
class Process:
""" A class for managing a Pacemaker daemon """
# pylint: disable=invalid-name
def __init__(self, cm, name, dc_only=False, pats=None, dc_pats=None,
badnews_ignore=None):
""" Create a new Process instance.
Arguments:
cm -- A ClusterManager instance
name -- The command being run
dc_only -- Should this daemon be killed only on the DC?
pats -- Regexes we expect to find in log files
dc_pats -- Additional DC-specific regexes we expect to find
in log files
badnews_ignore -- Regexes for lines in the log that can be ignored
"""
self._cm = cm
self.badnews_ignore = badnews_ignore
self.dc_only = dc_only
self.dc_pats = dc_pats
self.name = name
self.pats = pats
if self.badnews_ignore is None:
self.badnews_ignore = []
if self.dc_pats is None:
self.dc_pats = []
if self.pats is None:
self.pats = []
def kill(self, node):
""" Kill the instance of this process running on the given node """
(rc, _) = self._cm.rsh(node, "killall -9 %s" % self.name)
if rc != 0:
self._cm.log ("ERROR: Kill %s failed on node %s" % (self.name, node))
diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
index e4d70e635d..5aab141c6b 100644
--- a/python/pacemaker/_cts/environment.py
+++ b/python/pacemaker/_cts/environment.py
@@ -1,651 +1,648 @@
""" Test environment classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["EnvFactory"]
__copyright__ = "Copyright 2014-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import random
import socket
import sys
import time
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogKind
class Environment:
""" A class for managing the CTS environment, consisting largely of processing
and storing command line parameters
"""
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
def __init__(self, args):
""" Create a new Environment instance. This class can be treated kind
of like a dictionary due to the presence of typical dict functions
- like has_key, __getitem__, and __setitem__. However, it is not a
+ like __contains__, __getitem__, and __setitem__. However, it is not a
dictionary so do not rely on standard dictionary behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
If None, sys.argv will be used.
"""
self.data = {}
self._nodes = []
# Set some defaults before processing command line arguments. These are
# either not set by any command line parameter, or they need a default
# that can't be set in add_argument.
self["DeadTime"] = 300
self["StartTime"] = 300
self["StableTime"] = 30
self["tests"] = []
self["IPagent"] = "IPaddr2"
self["DoFencing"] = True
self["ClobberCIB"] = False
self["CIBfilename"] = None
self["CIBResource"] = False
self["LogWatcher"] = LogKind.ANY
self["node-limit"] = 0
self["scenario"] = "random"
self.random_gen = random.Random()
self._logger = LogFactory()
self._rsh = RemoteFactory().getInstance()
self._target = "localhost"
self._seed_random()
self._parse_args(args)
if not self["ListTests"]:
self._validate()
self._discover()
def _seed_random(self, seed=None):
""" Initialize the random number generator with the given seed, or use
the current time if None
"""
if not seed:
seed = int(time.time())
self["RandSeed"] = seed
self.random_gen.seed(str(seed))
def dump(self):
""" Print the current environment """
keys = []
for key in list(self.data.keys()):
keys.append(key)
keys.sort()
for key in keys:
s = "Environment[%s]" % key
self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key])))
def keys(self):
""" Return a list of all environment keys stored in this instance """
return list(self.data.keys())
- def has_key(self, key):
+ def __contains__(self, key):
""" Does the given environment key exist? """
if key == "nodes":
return True
return key in self.data
def __getitem__(self, key):
""" Return the given environment key, or None if it does not exist """
if str(key) == "0":
raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead")
if key == "nodes":
return self._nodes
if key == "Name":
return self._get_stack_short()
- if key in self.data:
- return self.data[key]
-
- return None
+ return self.data.get(key)
def __setitem__(self, key, value):
""" Set the given environment key to the given value, overriding any
previous value
"""
if key == "Stack":
self._set_stack(value)
elif key == "node-limit":
self.data[key] = value
self._filter_nodes()
elif key == "nodes":
self._nodes = []
for node in value:
# I don't think I need the IP address, etc. but this validates
# the node name against /etc/hosts and/or DNS, so it's a
# GoodThing(tm).
try:
n = node.strip()
socket.gethostbyname_ex(n)
self._nodes.append(n)
except:
self._logger.log("%s not found in DNS... aborting" % node)
raise
self._filter_nodes()
else:
self.data[key] = value
def random_node(self):
""" Choose a random node from the cluster """
return self.random_gen.choice(self["nodes"])
def _set_stack(self, name):
""" Normalize the given cluster stack name """
if name in ["corosync", "cs", "mcp"]:
self.data["Stack"] = "corosync 2+"
else:
raise ValueError("Unknown stack: %s" % name)
def _get_stack_short(self):
""" Return the short name for the currently set cluster stack """
if "Stack" not in self.data:
return "unknown"
if self.data["Stack"] == "corosync 2+":
return "crm-corosync"
LogFactory().log("Unknown stack: %s" % self["stack"])
raise ValueError("Unknown stack: %s" % self["stack"])
def _detect_systemd(self):
""" Detect whether systemd is in use on the target node """
if "have_systemd" not in self.data:
(rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0)
self["have_systemd"] = rc == 0
def _detect_syslog(self):
""" Detect the syslog variant in use on the target node """
if "syslogd" not in self.data:
if self["have_systemd"]:
# Systemd
(_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
self["syslogd"] = lines[0].strip()
else:
# SYS-V
(_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
self["syslogd"] = lines[0].strip()
if "syslogd" not in self.data or not self["syslogd"]:
# default
self["syslogd"] = "rsyslog"
def disable_service(self, node, service):
""" Disable the given service on the given node """
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, "systemctl disable %s" % service)
return rc
# SYS-V
(rc, _) = self._rsh(node, "chkconfig %s off" % service)
return rc
def enable_service(self, node, service):
""" Enable the given service on the given node """
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, "systemctl enable %s" % service)
return rc
# SYS-V
(rc, _) = self._rsh(node, "chkconfig %s on" % service)
return rc
def service_is_enabled(self, node, service):
""" Is the given service enabled on the given node? """
if self["have_systemd"]:
# Systemd
# With "systemctl is-enabled", we should check if the service is
# explicitly "enabled" instead of the return code. For example it returns
# 0 if the service is "static" or "indirect", but they don't really count
# as "enabled".
(rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service)
return rc == 0
# SYS-V
(rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service)
return rc == 0
def _detect_at_boot(self):
""" Detect if the cluster starts at boot """
if "at-boot" not in self.data:
self["at-boot"] = self.service_is_enabled(self._target, "corosync") \
or self.service_is_enabled(self._target, "pacemaker")
def _detect_ip_offset(self):
""" Detect the offset for IPaddr resources """
if self["CIBResource"] and "IPBase" not in self.data:
(_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
network = lines[0].strip()
(_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
try:
self["IPBase"] = lines[0].strip()
except (IndexError, TypeError):
self["IPBase"] = None
if not self["IPBase"]:
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.")
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
return
# pylint thinks self["IPBase"] is a list, not a string, which causes it
# to error out because a list doesn't have split().
# pylint: disable=no-member
if int(self["IPBase"].split('.')[3]) >= 240:
self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
% (self["IPBase"], self["IPBase"].split('.')[3]))
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
def _filter_nodes(self):
""" If --limit-nodes is given, keep that many nodes from the front of the
list of cluster nodes and drop the rest
"""
if self["node-limit"] > 0:
if len(self["nodes"]) > self["node-limit"]:
# pylint thinks self["node-limit"] is a list even though we initialize
# it as an int in __init__ and treat it as an int everywhere.
# pylint: disable=bad-string-format-type
self._logger.log("Limiting the number of nodes configured=%d (max=%d)"
%(len(self["nodes"]), self["node-limit"]))
while len(self["nodes"]) > self["node-limit"]:
self["nodes"].pop(len(self["nodes"])-1)
def _validate(self):
""" Were we given all the required command line parameters? """
if not self["nodes"]:
raise ValueError("No nodes specified!")
def _discover(self):
""" Probe cluster nodes to figure out how to log and manage services """
self._target = random.Random().choice(self["nodes"])
exerciser = socket.gethostname()
# Use the IP where possible to avoid name lookup failures
for ip in socket.gethostbyname_ex(exerciser)[2]:
if ip != "127.0.0.1":
exerciser = ip
break
self["cts-exerciser"] = exerciser
self._detect_systemd()
self._detect_syslog()
self._detect_at_boot()
self._detect_ip_offset()
def _parse_args(self, argv):
""" Parse and validate command line parameters, setting the appropriate
values in the environment dictionary. If argv is None, use sys.argv
instead.
"""
if not argv:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0])
grp1 = parser.add_argument_group("Common options")
grp1.add_argument("-g", "--dsh-group", "--group",
metavar="GROUP", dest="group",
help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
grp1.add_argument("-l", "--limit-nodes",
type=int, default=0,
metavar="MAX",
help="Only use the first MAX cluster nodes supplied with --nodes")
grp1.add_argument("--benchmark",
action="store_true",
help="Add timing information")
grp1.add_argument("--list", "--list-tests",
action="store_true", dest="list_tests",
help="List the valid tests")
grp1.add_argument("--nodes",
metavar="NODES",
help="List of cluster nodes separated by whitespace")
grp1.add_argument("--stack",
default="corosync",
metavar="STACK",
help="Which cluster stack is installed")
grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
grp2.add_argument("-L", "--logfile",
metavar="PATH",
help="Where to look for logs from cluster nodes")
grp2.add_argument("--at-boot", "--cluster-starts-at-boot",
choices=["1", "0", "yes", "no"],
help="Does the cluster software start at boot time?")
grp2.add_argument("--facility", "--syslog-facility",
default="daemon",
metavar="NAME",
help="Which syslog facility to log to")
grp2.add_argument("--ip", "--test-ip-base",
metavar="IP",
help="Offset for generated IP address resources")
grp3 = parser.add_argument_group("Options for release testing")
grp3.add_argument("-r", "--populate-resources",
action="store_true",
help="Generate a sample configuration")
grp3.add_argument("--choose",
metavar="NAME",
help="Run only the named test")
grp3.add_argument("--fencing", "--stonith",
choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
default="1",
help="What fencing agent to use")
grp3.add_argument("--once",
action="store_true",
help="Run all valid tests once")
grp4 = parser.add_argument_group("Additional (less common) options")
grp4.add_argument("-c", "--clobber-cib",
action="store_true",
help="Erase any existing configuration")
grp4.add_argument("-y", "--yes",
action="store_true", dest="always_continue",
help="Continue to run whenever prompted")
grp4.add_argument("--boot",
action="store_true",
help="")
grp4.add_argument("--bsc",
action="store_true",
help="")
grp4.add_argument("--cib-filename",
metavar="PATH",
help="Install the given CIB file to the cluster")
grp4.add_argument("--container-tests",
action="store_true",
help="Include pacemaker_remote tests that run in lxc container resources")
grp4.add_argument("--experimental-tests",
action="store_true",
help="Include experimental tests")
grp4.add_argument("--loop-minutes",
type=int, default=60,
help="")
grp4.add_argument("--no-loop-tests",
action="store_true",
help="Don't run looping/time-based tests")
grp4.add_argument("--no-unsafe-tests",
action="store_true",
help="Don't run tests that are unsafe for use with ocfs2/drbd")
grp4.add_argument("--notification-agent",
metavar="PATH",
default="/var/lib/pacemaker/notify.sh",
help="Script to configure for Pacemaker alerts")
grp4.add_argument("--notification-recipient",
metavar="R",
default="/var/lib/pacemaker/notify.log",
help="Recipient to pass to alert script")
grp4.add_argument("--oprofile",
metavar="NODES",
help="List of cluster nodes to run oprofile on")
grp4.add_argument("--outputfile",
metavar="PATH",
help="Location to write logs to")
grp4.add_argument("--qarsh",
action="store_true",
help="Use QARSH to access nodes instead of SSH")
grp4.add_argument("--schema",
metavar="SCHEMA",
default="pacemaker-3.0",
help="Create a CIB conforming to the given schema")
grp4.add_argument("--seed",
metavar="SEED",
help="Use the given string as the random number seed")
grp4.add_argument("--set",
action="append",
metavar="ARG",
default=[],
help="Set key=value pairs (can be specified multiple times)")
grp4.add_argument("--stonith-args",
metavar="ARGS",
default="hostlist=all,livedangerously=yes",
help="")
grp4.add_argument("--stonith-type",
metavar="TYPE",
default="external/ssh",
help="")
grp4.add_argument("--trunc",
action="store_true", dest="truncate",
help="Truncate log file before starting")
grp4.add_argument("--valgrind-procs",
metavar="PROCS",
default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd",
help="Run valgrind against the given space-separated list of processes")
grp4.add_argument("--valgrind-tests",
action="store_true",
help="Include tests using valgrind")
grp4.add_argument("--warn-inactive",
action="store_true",
help="Warn if a resource is assigned to an inactive node")
parser.add_argument("iterations",
nargs='?',
type=int, default=1,
help="Number of tests to run")
args = parser.parse_args(args=argv)
# Set values on this object based on what happened with command line
# processing. This has to be done in several blocks.
# These values can always be set. They get a default from the add_argument
# calls, only do one thing, and they do not have any side effects.
self["ClobberCIB"] = args.clobber_cib
self["ListTests"] = args.list_tests
self["Schema"] = args.schema
self["Stack"] = args.stack
self["SyslogFacility"] = args.facility
self["TruncateLog"] = args.truncate
self["at-boot"] = args.at_boot in ["1", "yes"]
self["benchmark"] = args.benchmark
self["continue"] = args.always_continue
self["container-tests"] = args.container_tests
self["experimental-tests"] = args.experimental_tests
self["iterations"] = args.iterations
self["loop-minutes"] = args.loop_minutes
self["loop-tests"] = not args.no_loop_tests
self["notification-agent"] = args.notification_agent
self["notification-recipient"] = args.notification_recipient
self["node-limit"] = args.limit_nodes
self["stonith-params"] = args.stonith_args
self["stonith-type"] = args.stonith_type
self["unsafe-tests"] = not args.no_unsafe_tests
self["valgrind-procs"] = args.valgrind_procs
self["valgrind-tests"] = args.valgrind_tests
self["warn-inactive"] = args.warn_inactive
# Nodes and groups are mutually exclusive, so their defaults cannot be
# set in their add_argument calls. Additionally, groups does more than
# just set a value. Here, set nodes first and then if a group is
# specified, override the previous nodes value.
if args.nodes:
self["nodes"] = args.nodes.split(" ")
else:
self["nodes"] = []
if args.group:
self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group)
LogFactory().add_file(self["OutputFile"], "CTS")
dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group)
if os.path.isfile(dsh_file):
self["nodes"] = []
with open(dsh_file, "r", encoding="utf-8") as f:
for line in f:
l = line.strip()
if not l.startswith('#'):
self["nodes"].append(l)
else:
print("Unknown DSH group: %s" % args.dsh_group)
# Everything else either can't have a default set in an add_argument
# call (likely because we don't want to always have a value set for it)
# or it does something fancier than just set a single value. However,
# order does not matter for these as long as the user doesn't provide
# conflicting arguments on the command line. So just do Everything
# alphabetically.
if args.boot:
self["scenario"] = "boot"
if args.bsc:
self["DoBSC"] = True
self["scenario"] = "basic-sanity"
if args.cib_filename:
self["CIBfilename"] = args.cib_filename
else:
self["CIBfilename"] = None
if args.choose:
self["scenario"] = "sequence"
self["tests"].append(args.choose)
if args.fencing:
if args.fencing in ["0", "no"]:
self["DoFencing"] = False
else:
self["DoFencing"] = True
if args.fencing in ["rhcs", "virt", "xvm"]:
self["stonith-type"] = "fence_xvm"
elif args.fencing == "scsi":
self["stonith-type"] = "fence_scsi"
elif args.fencing in ["lha", "ssh"]:
self["stonith-params"] = "hostlist=all,livedangerously=yes"
self["stonith-type"] = "external/ssh"
elif args.fencing == "openstack":
self["stonith-type"] = "fence_openstack"
print("Obtaining OpenStack credentials from the current environment")
self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % (
os.environ['OS_REGION_NAME'],
os.environ['OS_TENANT_NAME'],
os.environ['OS_AUTH_URL'],
os.environ['OS_USERNAME'],
os.environ['OS_PASSWORD']
)
elif args.fencing == "rhevm":
self["stonith-type"] = "fence_rhevm"
print("Obtaining RHEV-M credentials from the current environment")
self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
os.environ['RHEVM_USERNAME'],
os.environ['RHEVM_PASSWORD'],
os.environ['RHEVM_SERVER'],
os.environ['RHEVM_PORT'],
)
if args.ip:
self["CIBResource"] = True
self["ClobberCIB"] = True
self["IPBase"] = args.ip
if args.logfile:
self["LogAuditDisabled"] = True
self["LogFileName"] = args.logfile
self["LogWatcher"] = LogKind.REMOTE_FILE
else:
# We can't set this as the default on the parser.add_argument call
# for this option because then args.logfile will be set, which means
# the above branch will be taken and those other values will also be
# set.
self["LogFileName"] = "/var/log/messages"
if args.once:
self["scenario"] = "all-once"
if args.oprofile:
self["oprofile"] = args.oprofile.split(" ")
else:
self["oprofile"] = []
if args.outputfile:
self["OutputFile"] = args.outputfile
LogFactory().add_file(self["OutputFile"])
if args.populate_resources:
self["CIBResource"] = True
self["ClobberCIB"] = True
if args.qarsh:
self._rsh.enable_qarsh()
for kv in args.set:
(name, value) = kv.split("=")
self[name] = value
print("Setting %s = %s" % (name, value))
class EnvFactory:
""" A class for constructing a singleton instance of an Environment object """
instance = None
# pylint: disable=invalid-name
def getInstance(self, args=None):
""" Returns the previously created instance of Environment, or creates a
new instance if one does not already exist.
"""
if not EnvFactory.instance:
EnvFactory.instance = Environment(args)
return EnvFactory.instance