Page MenuHomeClusterLabs Projects

No OneTemporary

This file is larger than 256 KB, so syntax highlighting was skipped.
diff --git a/cts/lab/CIB.py b/cts/lab/CIB.py
index 3c2dfb7632..2cbed3580e 100644
--- a/cts/lab/CIB.py
+++ b/cts/lab/CIB.py
@@ -1,518 +1,518 @@
""" CIB generator for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import warnings
import tempfile
from pacemaker.buildoptions import BuildOptions
class CibBase(object):
def __init__(self, Factory, tag, _id, **kwargs):
self.tag = tag
self.name = _id
self.kwargs = kwargs
self.children = []
self.Factory = Factory
def __repr__(self):
return "%s-%s" % (self.tag, self.name)
def add_child(self, child):
self.children.append(child)
def __setitem__(self, key, value):
if value:
self.kwargs[key] = value
else:
self.kwargs.pop(key, None)
from cts.cib_xml import *
class ConfigBase(object):
cts_cib = None
version = "unknown"
Factory = None
def __init__(self, CM, factory, tmpfile=None):
self.CM = CM
self.Factory = factory
if not tmpfile:
warnings.filterwarnings("ignore")
f=tempfile.NamedTemporaryFile(delete=True)
f.close()
tmpfile = f.name
warnings.resetwarnings()
self.Factory.tmpfile = tmpfile
def version(self):
return self.version
def NextIP(self):
ip = self.CM.Env["IPBase"]
if ":" in ip:
(prefix, sep, suffix) = ip.rpartition(":")
suffix = str(hex(int(suffix, 16)+1)).lstrip("0x")
else:
(prefix, sep, suffix) = ip.rpartition(".")
suffix = str(int(suffix)+1)
ip = prefix + sep + suffix
self.CM.Env["IPBase"] = ip
return ip.strip()
class CIB12(ConfigBase):
version = "pacemaker-1.2"
counter = 1
def _show(self, command=""):
output = ""
(_, result) = self.Factory.rsh(self.Factory.target, "HOME=/root CIB_file="+self.Factory.tmpfile+" cibadmin -Ql "+command, verbose=1)
for line in result:
output += line
self.Factory.debug("Generated Config: "+line)
return output
def NewIP(self, name=None, standard="ocf"):
if self.CM.Env["IPagent"] == "IPaddr2":
ip = self.NextIP()
if not name:
if ":" in ip:
(prefix, sep, suffix) = ip.rpartition(":")
name = "r"+suffix
else:
name = "r"+ip
r = Resource(self.Factory, name, self.CM.Env["IPagent"], standard)
r["ip"] = ip
if ":" in ip:
r["cidr_netmask"] = "64"
r["nic"] = "eth0"
else:
r["cidr_netmask"] = "32"
else:
if not name:
name = "r%s%d" % (self.CM.Env["IPagent"], self.counter)
self.counter = self.counter + 1
r = Resource(self.Factory, name, self.CM.Env["IPagent"], standard)
r.add_op("monitor", "5s")
return r
def get_node_id(self, node_name):
""" Check the cluster configuration for a node ID. """
# We can't account for every possible configuration,
# so we only return a node ID if:
# * The node is specified in /etc/corosync/corosync.conf
# with "ring0_addr:" equal to node_name and "nodeid:"
# explicitly specified.
# In all other cases, we return 0.
node_id = 0
# awkward command: use } as record separator
# so each corosync.conf "object" is one record;
# match the "node {" record that has "ring0_addr: node_name";
# then print the substring of that record after "nodeid:"
(rc, output) = self.Factory.rsh(self.Factory.target,
r"""awk -v RS="}" """
r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/"""
r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s"""
% (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1)
if rc == 0 and len(output) == 1:
try:
node_id = int(output[0])
except ValueError:
node_id = 0
return node_id
def install(self, target):
old = self.Factory.tmpfile
# Force a rebuild
self.cts_cib = None
self.Factory.tmpfile = BuildOptions.CIB_DIR + "/cib.xml"
self.contents(target)
self.Factory.rsh(self.Factory.target, "chown " + BuildOptions.DAEMON_USER + " " + self.Factory.tmpfile)
self.Factory.tmpfile = old
def contents(self, target=None):
# fencing resource
if self.cts_cib:
return self.cts_cib
if target:
self.Factory.target = target
self.Factory.rsh(self.Factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self.Factory.tmpfile))
self.num_nodes = len(self.CM.Env["nodes"])
no_quorum = "stop"
if self.num_nodes < 3:
no_quorum = "ignore"
self.Factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self.num_nodes)
# We don't need a nodes section unless we add attributes
stn = None
# Fencing resource
# Define first so that the shell doesn't reject every update
if self.CM.Env["DoFencing"]:
# Define the "real" fencing device
st = Resource(self.Factory, "Fencing", ""+self.CM.Env["stonith-type"], "stonith")
# Set a threshold for unreliable stonith devices such as the vmware one
st.add_meta("migration-threshold", "5")
st.add_op("monitor", "120s", timeout="120s")
st.add_op("stop", "0", timeout="60s")
st.add_op("start", "0", timeout="60s")
# For remote node tests, a cluster node is stopped and brought back up
# as a remote node with the name "remote-OLDNAME". To allow fencing
# devices to fence these nodes, create a list of all possible node names.
all_node_names = [ prefix+n for n in self.CM.Env["nodes"] for prefix in ('', 'remote-') ]
# Add all parameters specified by user
entries = self.CM.Env["stonith-params"].split(',')
for entry in entries:
try:
(name, value) = entry.split('=', 1)
except ValueError:
print("Warning: skipping invalid fencing parameter: %s" % entry)
continue
# Allow user to specify "all" as the node list, and expand it here
if name in [ "hostlist", "pcmk_host_list" ] and value == "all":
value = ' '.join(all_node_names)
st[name] = value
st.commit()
# Test advanced fencing logic
if True:
stf_nodes = []
stt_nodes = []
attr_nodes = {}
# Create the levels
stl = FencingTopology(self.Factory)
for node in self.CM.Env["nodes"]:
# Remote node tests will rename the node
remote_node = "remote-" + node
# Randomly assign node to a fencing method
- ftype = self.CM.Env.RandomGen.choice(["levels-and", "levels-or ", "broadcast "])
+ ftype = self.CM.Env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
# For levels-and, randomly choose targeting by node name or attribute
by = ""
if ftype == "levels-and":
node_id = self.get_node_id(node)
- if node_id == 0 or self.CM.Env.RandomGen.choice([True, False]):
+ if node_id == 0 or self.CM.Env.random_gen.choice([True, False]):
by = " (by name)"
else:
attr_nodes[node] = node_id
by = " (by attribute)"
self.CM.log(" - Using %s fencing for node: %s%s" % (ftype, node, by))
if ftype == "levels-and":
# If targeting by name, add a topology level for this node
if node not in attr_nodes:
stl.level(1, node, "FencingPass,Fencing")
# Always target remote nodes by name, otherwise we would need to add
# an attribute to the remote node only during remote tests (we don't
# want nonexistent remote nodes showing up in the non-remote tests).
# That complexity is not worth the effort.
stl.level(1, remote_node, "FencingPass,Fencing")
# Add the node (and its remote equivalent) to the list of levels-and nodes.
stt_nodes.extend([node, remote_node])
elif ftype == "levels-or ":
for n in [ node, remote_node ]:
stl.level(1, n, "FencingFail")
stl.level(2, n, "Fencing")
stf_nodes.extend([node, remote_node])
# If any levels-and nodes were targeted by attribute,
# create the attributes and a level for the attribute.
if attr_nodes:
stn = Nodes(self.Factory)
for (node_name, node_id) in list(attr_nodes.items()):
stn.add_node(node_name, node_id, { "cts-fencing" : "levels-and" })
stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and")
# Create a Dummy agent that always passes for levels-and
if len(stt_nodes):
stt = Resource(self.Factory, "FencingPass", "fence_dummy", "stonith")
stt["pcmk_host_list"] = " ".join(stt_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stt["random_sleep_range"] = "30"
stt["mode"] = "pass"
stt.commit()
# Create a Dummy agent that always fails for levels-or
if len(stf_nodes):
stf = Resource(self.Factory, "FencingFail", "fence_dummy", "stonith")
stf["pcmk_host_list"] = " ".join(stf_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stf["random_sleep_range"] = "30"
stf["mode"] = "fail"
stf.commit()
# Now commit the levels themselves
stl.commit()
o = Option(self.Factory)
o["stonith-enabled"] = self.CM.Env["DoFencing"]
o["start-failure-is-fatal"] = "false"
o["pe-input-series-max"] = "5000"
o["shutdown-escalation"] = "5min"
o["batch-limit"] = "10"
o["dc-deadtime"] = "5s"
o["no-quorum-policy"] = no_quorum
- if self.CM.Env["DoBSC"] == 1:
+ if self.CM.Env["DoBSC"]:
o["ident-string"] = "Linux-HA TEST configuration file - REMOVEME!!"
o.commit()
o = OpDefaults(self.Factory)
o["timeout"] = "90s"
o.commit()
# Commit the nodes section if we defined one
if stn is not None:
stn.commit()
# Add an alerts section if possible
if self.Factory.rsh.exists_on_all(self.CM.Env["notification-agent"], self.CM.Env["nodes"]):
alerts = Alerts(self.Factory)
alerts.add_alert(self.CM.Env["notification-agent"],
self.CM.Env["notification-recipient"])
alerts.commit()
# Add resources?
- if self.CM.Env["CIBResource"] == 1:
+ if self.CM.Env["CIBResource"]:
self.add_resources()
if self.CM.cluster_monitor == 1:
mon = Resource(self.Factory, "cluster_mon", "ocf", "ClusterMon", "pacemaker")
mon.add_op("start", "0", requires="nothing")
mon.add_op("monitor", "5s", requires="nothing")
mon["update"] = "10"
mon["extra_options"] = "-r -n"
mon["user"] = "abeekhof"
mon["htmlfile"] = "/suse/abeekhof/Export/cluster.html"
mon.commit()
#self._create('''location prefer-dc cluster_mon rule -INFINITY: \#is_dc eq false''')
# generate cib
self.cts_cib = self._show()
if self.Factory.tmpfile != BuildOptions.CIB_DIR + "/cib.xml":
self.Factory.rsh(self.Factory.target, "rm -f "+self.Factory.tmpfile)
return self.cts_cib
def add_resources(self):
# Per-node resources
for node in self.CM.Env["nodes"]:
name = "rsc_"+node
r = self.NewIP(name)
r.prefer(node, "100")
r.commit()
# Migrator
# Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach
m = Resource(self.Factory, "migrator","Dummy", "ocf", "pacemaker")
m["passwd"] = "whatever"
m.add_meta("resource-stickiness","1")
m.add_meta("allow-migrate", "1")
m.add_op("monitor", "P10S")
m.commit()
# Ping the test exerciser
p = Resource(self.Factory, "ping-1","ping", "ocf", "pacemaker")
p.add_op("monitor", "60s")
p["host_list"] = self.CM.Env["cts-exerciser"]
p["name"] = "connected"
p["debug"] = "true"
c = Clone(self.Factory, "Connectivity", p)
c["globally-unique"] = "false"
c.commit()
# promotable clone resource
s = Resource(self.Factory, "stateful-1", "Stateful", "ocf", "pacemaker")
s.add_op("monitor", "15s", timeout="60s")
s.add_op("monitor", "16s", timeout="60s", role="Promoted")
ms = Clone(self.Factory, "promotable-1", s)
ms["promotable"] = "true"
ms["clone-max"] = self.num_nodes
ms["clone-node-max"] = 1
ms["promoted-max"] = 1
ms["promoted-node-max"] = 1
# Require connectivity to run the promotable clone
r = Rule(self.Factory, "connected", "-INFINITY", op="or")
r.add_child(Expression(self.Factory, "m1-connected-1", "connected", "lt", "1"))
r.add_child(Expression(self.Factory, "m1-connected-2", "connected", "not_defined", None))
ms.prefer("connected", rule=r)
ms.commit()
# Group Resource
g = Group(self.Factory, "group-1")
g.add_child(self.NewIP())
if self.CM.Env["have_systemd"]:
sysd = Resource(self.Factory, "petulant",
"pacemaker-cts-dummyd@10", "service")
sysd.add_op("monitor", "P10S")
g.add_child(sysd)
else:
g.add_child(self.NewIP())
g.add_child(self.NewIP())
# Make group depend on the promotable clone
g.after("promotable-1", first="promote", then="start")
g.colocate("promotable-1", "INFINITY", withrole="Promoted")
g.commit()
# LSB resource
lsb = Resource(self.Factory, "lsb-dummy", "LSBDummy", "lsb")
lsb.add_op("monitor", "5s")
# LSB with group
lsb.after("group-1")
lsb.colocate("group-1")
lsb.commit()
class CIB20(CIB12):
version = "pacemaker-2.5"
class CIB30(CIB12):
version = "pacemaker-3.7"
#class HASI(CIB10):
# def add_resources(self):
# # DLM resource
# self._create('''primitive dlm ocf:pacemaker:controld op monitor interval=120s''')
# self._create('''clone dlm-clone dlm meta globally-unique=false interleave=true''')
# O2CB resource
# self._create('''primitive o2cb ocf:ocfs2:o2cb op monitor interval=120s''')
# self._create('''clone o2cb-clone o2cb meta globally-unique=false interleave=true''')
# self._create('''colocation o2cb-with-dlm INFINITY: o2cb-clone dlm-clone''')
# self._create('''order start-o2cb-after-dlm mandatory: dlm-clone o2cb-clone''')
class ConfigFactory(object):
def __init__(self, CM):
self.CM = CM
self.rsh = self.CM.rsh
self.register("pacemaker12", CIB12, CM, self)
self.register("pacemaker20", CIB20, CM, self)
self.register("pacemaker30", CIB30, CM, self)
# self.register("hae", HASI, CM, self)
- if self.CM.Env["ListTests"] == 0:
+ if not self.CM.Env["ListTests"]:
self.target = self.CM.Env["nodes"][0]
self.tmpfile = None
def log(self, args):
self.CM.log("cib: %s" % args)
def debug(self, args):
self.CM.debug("cib: %s" % args)
def register(self, methodName, constructor, *args, **kargs):
"""register a constructor"""
_args = [constructor]
_args.extend(args)
setattr(self, methodName, ConfigFactoryItem(*_args, **kargs))
def unregister(self, methodName):
"""unregister a constructor"""
delattr(self, methodName)
def createConfig(self, name="pacemaker-1.0"):
if name == "pacemaker-1.0":
name = "pacemaker10";
elif name == "pacemaker-1.2":
name = "pacemaker12";
elif name == "pacemaker-2.0":
name = "pacemaker20";
elif name.startswith("pacemaker-3."):
name = "pacemaker30";
elif name == "hasi":
name = "hae";
if hasattr(self, name):
return getattr(self, name)()
else:
self.CM.log("Configuration variant '%s' is unknown. Defaulting to latest config" % name)
return self.pacemaker30()
class ConfigFactoryItem(object):
def __init__(self, function, *args, **kargs):
self._function = function
self._args = args
self._kargs = kargs
def __call__(self, *args, **kargs):
"""call function"""
_args = list(self._args)
_args.extend(args)
_kargs = self._kargs.copy()
_kargs.update(kargs)
return self._function(*_args,**_kargs)
if __name__ == '__main__':
""" Unit test (pass cluster node names as command line arguments) """
import cts.CTS
import cts.CM_corosync
import sys
if len(sys.argv) < 2:
print("Usage: %s <node> ..." % sys.argv[0])
sys.exit(1)
args = [
"--nodes", " ".join(sys.argv[1:]),
"--clobber-cib",
"--populate-resources",
"--stack", "corosync",
"--test-ip-base", "fe80::1234:56:7890:1000",
"--stonith", "rhcs",
]
env = CTS.CtsLab(args)
cm = CM_corosync.crm_corosync()
CibFactory = ConfigFactory(cm)
cib = CibFactory.createConfig("pacemaker-3.0")
print(cib.contents())
diff --git a/cts/lab/CTS.py b/cts/lab/CTS.py
index 632f7a3b05..741dc1b64c 100644
--- a/cts/lab/CTS.py
+++ b/cts/lab/CTS.py
@@ -1,185 +1,184 @@
""" Main classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys
import time
import traceback
-from cts.environment import EnvFactory
-
+from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
class CtsLab(object):
'''This class defines the Lab Environment for the Cluster Test System.
It defines those things which are expected to change from test
environment to test environment for the same cluster manager.
It is where you define the set of nodes that are in your test lab
what kind of reset mechanism you use, etc.
At this point in time, it is the intent of this class to model static
configuration and/or environmental data about the environment which
doesn't change as the tests proceed.
Well-known names (keys) are an important concept in this class.
The HasMinimalKeys member function knows the minimal set of
well-known names for the class.
The following names are standard (well-known) at this time:
nodes An array of the nodes in the cluster
reset A ResetMechanism object
logger An array of objects that log strings...
CMclass The type of ClusterManager we are running
(This is a class object, not a class instance)
RandSeed Random seed. It is a triple of bytes. (optional)
The CTS code ignores names it doesn't know about/need.
The individual tests have access to this information, and it is
perfectly acceptable to provide hints, tweaks, fine-tuning
directions or other information to the tests through this mechanism.
'''
def __init__(self, args=None):
self.Env = EnvFactory().getInstance(args)
self.Scenario = None
self.logger = LogFactory()
self.rsh = RemoteFactory().getInstance()
def dump(self):
self.Env.dump()
def has_key(self, key):
return key in list(self.Env.keys())
def __getitem__(self, key):
return self.Env[key]
def __setitem__(self, key, value):
self.Env[key] = value
def run(self, Scenario, Iterations):
if not Scenario:
self.logger.log("No scenario was defined")
return 1
self.logger.log("Cluster nodes: ")
for node in self.Env["nodes"]:
self.logger.log(" * %s" % (node))
if not Scenario.SetUp():
return 1
try :
Scenario.run(Iterations)
except :
self.logger.log("Exception by %s" % sys.exc_info()[0])
self.logger.traceback(traceback)
Scenario.summarize()
Scenario.TearDown()
return 1
#ClusterManager.oprofileSave(Iterations)
Scenario.TearDown()
Scenario.summarize()
if Scenario.Stats["failure"] > 0:
return Scenario.Stats["failure"]
elif Scenario.Stats["success"] != Iterations:
self.logger.log("No failure count but success != requested iterations")
return 1
return 0
def __CheckNode(self, node):
"Raise a ValueError if the given node isn't valid"
if not self.IsValidNode(node):
raise ValueError("Invalid node [%s] in CheckNode" % node)
class NodeStatus(object):
def __init__(self, env):
self.Env = env
def IsNodeBooted(self, node):
'''Return TRUE if the given node is booted (responds to pings)'''
(rc, _) = RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, verbose=0)
return rc == 0
def IsSshdUp(self, node):
(rc, _) = RemoteFactory().getInstance()(node, "true", verbose=0)
return rc == 0
def WaitForNodeToComeUp(self, node, Timeout=300):
'''Return TRUE when given node comes up, or None/FALSE if timeout'''
timeout = Timeout
anytimeouts = 0
while timeout > 0:
if self.IsNodeBooted(node) and self.IsSshdUp(node):
if anytimeouts:
# Fudge to wait for the system to finish coming up
time.sleep(30)
LogFactory().debug("Node %s now up" % node)
return 1
time.sleep(30)
if (not anytimeouts):
LogFactory().debug("Waiting for node %s to come up" % node)
anytimeouts = 1
timeout = timeout - 1
LogFactory().log("%s did not come up within %d tries" % (node, Timeout))
- if self.Env["continue"] == 1:
+ if self.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("%s did not come up within %d tries" % (node, Timeout))
def WaitForAllNodesToComeUp(self, nodes, timeout=300):
'''Return TRUE when all nodes come up, or FALSE if timeout'''
for node in nodes:
if not self.WaitForNodeToComeUp(node, timeout):
return None
return 1
class Component(object):
def kill(self, node):
None
class Process(Component):
def __init__(self, cm, name, process=None, dc_only=0, pats=[], dc_pats=[], badnews_ignore=[], common_ignore=[], triggersreboot=0):
self.name = str(name)
self.dc_only = dc_only
self.pats = pats
self.dc_pats = dc_pats
self.CM = cm
self.badnews_ignore = badnews_ignore
self.badnews_ignore.extend(common_ignore)
self.triggersreboot = triggersreboot
if process:
self.proc = str(process)
else:
self.proc = str(name)
self.KillCmd = "killall -9 " + self.proc
def kill(self, node):
(rc, _) = self.CM.rsh(node, self.KillCmd)
if rc != 0:
self.CM.log ("ERROR: Kill %s failed on node %s" % (self.name,node))
return None
return 1
diff --git a/cts/lab/CTSaudits.py b/cts/lab/CTSaudits.py
index 40b6e2ff58..db69a6276d 100755
--- a/cts/lab/CTSaudits.py
+++ b/cts/lab/CTSaudits.py
@@ -1,875 +1,875 @@
""" Auditing classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import time, re, uuid
from cts.watcher import LogWatcher
from pacemaker.buildoptions import BuildOptions
class ClusterAudit(object):
def __init__(self, cm):
self.CM = cm
def __call__(self):
raise ValueError("Abstract Class member (__call__)")
def is_applicable(self):
'''Return TRUE if we are applicable in the current test configuration'''
raise ValueError("Abstract Class member (is_applicable)")
return 1
def log(self, args):
self.CM.log("audit: %s" % args)
def debug(self, args):
self.CM.debug("audit: %s" % args)
def name(self):
raise ValueError("Abstract Class member (name)")
AllAuditClasses = [ ]
class LogAudit(ClusterAudit):
def name(self):
return "LogAudit"
def __init__(self, cm):
self.CM = cm
self.kinds = [ "combined syslog", "journal", "remote" ]
def RestartClusterLogging(self, nodes=None):
if not nodes:
nodes = self.CM.Env["nodes"]
self.CM.debug("Restarting logging on: %s" % repr(nodes))
for node in nodes:
if self.CM.Env["have_systemd"]:
(rc, _) = self.CM.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self.CM.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node)
(rc, _) = self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"])
if rc != 0:
self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node))
def TestLogging(self):
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self.CM.Env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
watch_pref = self.CM.Env["LogWatcher"]
if watch_pref == "any":
for k in self.kinds:
watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, "LogAudit", 5, silent=True, hosts=self.CM.Env["nodes"], kind=k)
watch[k].setwatch()
else:
k = watch_pref
watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, "LogAudit", 5, silent=True, hosts=self.CM.Env["nodes"], kind=k)
watch[k].setwatch()
if watch_pref == "any": self.CM.log("Writing log with key: %s" % (suffix))
for node in self.CM.Env["nodes"]:
cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self.CM.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in self.kinds:
if k in watch:
w = watch[k]
if watch_pref == "any": self.CM.log("Testing for %s logs" % (k))
w.lookforall(silent=True)
if not w.unmatched:
if watch_pref == "any":
self.CM.log ("Continuing with %s-based log reader" % (w.kind))
self.CM.Env["LogWatcher"] = w.kind
return 1
for k in list(watch.keys()):
w = watch[k]
if w.unmatched:
for regex in w.unmatched:
self.CM.log ("Test message [%s] not found in %s logs." % (regex, w.kind))
return 0
def __call__(self):
max = 3
attempt = 0
self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"])
while attempt <= max and self.TestLogging() == 0:
attempt = attempt + 1
self.RestartClusterLogging()
time.sleep(60*attempt)
if attempt > max:
self.CM.log("ERROR: Cluster logging unrecoverable.")
return 0
return 1
def is_applicable(self):
if self.CM.Env["DoBSC"]:
return 0
if self.CM.Env["LogAuditDisabled"]:
return 0
return 1
class DiskAudit(ClusterAudit):
def name(self):
return "DiskspaceAudit"
def __init__(self, cm):
self.CM = cm
def __call__(self):
result = 1
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM " + BuildOptions.LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'"
self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"])
for node in self.CM.Env["nodes"]:
(_, dfout) = self.CM.rsh(node, dfcmd, verbose=1)
if not dfout:
self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
else:
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]"
% (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
% (node, used_percent, remaining_mb))
result = None
- if self.CM.Env["continue"] == 1:
+ if self.CM.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Disk full on %s" % (node))
elif remaining_mb < 100 or used_percent > 90:
self.CM.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
if self.CM.Env["DoBSC"]:
return 0
return 1
class FileAudit(ClusterAudit):
def name(self):
return "FileAudit"
def __init__(self, cm):
self.CM = cm
self.known = []
def __call__(self):
result = 1
self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"])
for node in self.CM.Env["nodes"]:
(_, lsout) = self.CM.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = 0
self.known.append(line)
self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = 0
self.known.append(line)
self.CM.log("Warning: Corosync core file on %s: %s" % (node, line))
if node in self.CM.ShouldBeStatus and self.CM.ShouldBeStatus[node] == "down":
clean = 0
(_, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = 0
clean = 1
self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self.CM.debug("ps[%s]: %s" % (node, line))
self.CM.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self.CM.debug("Skipping %s" % node)
return result
def is_applicable(self):
return 1
class AuditResource(object):
def __init__(self, cm, line):
fields = line.split()
self.CM = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
def unique(self):
if self.flags & int("0x00000020", 16):
return 1
return 0
def orphan(self):
if self.flags & int("0x00000001", 16):
return 1
return 0
def managed(self):
if self.flags & int("0x00000002", 16):
return 1
return 0
class AuditConstraint(object):
def __init__(self, cm, line):
fields = line.split()
self.CM = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
def name(self):
return "PrimitiveAudit"
def __init__(self, cm):
self.CM = cm
def doResourceAudit(self, resource, quorum):
rc = 1
active = self.CM.ResourceLocation(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %s" % (resource.id, repr(active)))
elif resource.needs_quorum == 1:
self.CM.log("Resource %s active without quorum: %s"
% (resource.id, repr(active)))
rc = 0
elif not resource.managed():
self.CM.log("Resource %s not managed. Active on %s"
% (resource.id, repr(active)))
elif not resource.unique():
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %s"
% (resource.id, repr(active)))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self.CM.log("Resource %s is active multiple times: %s"
% (resource.id, repr(active)))
rc = 0
elif resource.orphan():
self.debug("Resource %s is an inactive orphan" % resource.id)
elif len(self.inactive_nodes) == 0:
self.CM.log("WARN: Resource %s not served anywhere" % resource.id)
rc = 0
- elif self.CM.Env["warn-inactive"] == 1:
+ elif self.CM.Env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)"
% (resource.id, repr(self.inactive_nodes)))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %s)"
% (resource.id, repr(self.inactive_nodes)))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %s)"
% (resource.id, repr(self.inactive_nodes)))
return rc
def setup(self):
self.target = None
self.resources = []
self.constraints = []
self.active_nodes = []
self.inactive_nodes = []
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == "up":
self.active_nodes.append(node)
else:
self.inactive_nodes.append(node)
for node in self.CM.Env["nodes"]:
if self.target == None and self.CM.ShouldBeStatus[node] == "up":
self.target = node
if not self.target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name())
return 0
(_, lines) = self.CM.rsh(self.target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self.resources.append(AuditResource(self.CM, line))
elif re.search("^Constraint", line):
self.constraints.append(AuditConstraint(self.CM, line))
else:
self.CM.log("Unknown entry: %s" % line);
return 1
def __call__(self):
rc = 1
if not self.setup():
return 1
quorum = self.CM.HasQuorum(None)
for resource in self.resources:
if resource.type == "primitive":
if self.doResourceAudit(resource, quorum) == 0:
rc = 0
return rc
def is_applicable(self):
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self.CM["Name"] == "crm-corosync":
# return 1
return 0
class GroupAudit(PrimitiveAudit):
def name(self):
return "GroupAudit"
def __call__(self):
rc = 1
if not self.setup():
return 1
for group in self.resources:
if group.type == "group":
first_match = 1
group_location = None
for child in self.resources:
if child.parent == group.id:
nodes = self.CM.ResourceLocation(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = 0
if len(nodes) > 1:
rc = 0
self.CM.log("Child %s of %s is active more than once: %s"
% (child.id, group.id, repr(nodes)))
elif len(nodes) == 0:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
rc = 0
self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s"
% (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return rc
class CloneAudit(PrimitiveAudit):
def name(self):
return "CloneAudit"
def __call__(self):
rc = 1
if not self.setup():
return 1
for clone in self.resources:
if clone.type == "clone":
for child in self.resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return rc
class ColocationAudit(PrimitiveAudit):
def name(self):
return "ColocationAudit"
def crm_location(self, resource):
(rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
rc = 1
if not self.setup():
return 1
for coloc in self.constraints:
if coloc.type == "rsc_colocation":
source = self.crm_location(coloc.rsc)
target = self.crm_location(coloc.target)
if len(source) == 0:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if not node in target:
rc = 0
self.CM.log("Colocation audit (%s): %s running on %s (not in %s)"
% (coloc.id, coloc.rsc, node, repr(target)))
else:
self.debug("Colocation audit (%s): %s running on %s (in %s)"
% (coloc.id, coloc.rsc, node, repr(target)))
return rc
class ControllerStateAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
def has_key(self, key):
return key in self.Stats
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
def __call__(self):
passed = 1
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self.CM.Env["nodes"]:
should_be = self.CM.ShouldBeStatus[node]
rc = self.CM.test_node_CM(node)
if rc > 0:
if should_be == "down":
down_are_up = down_are_up + 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down = up_are_down + 1
if len(unstable_list) > 0:
passed = 0
self.CM.log("Cluster is not stable: %d (of %d): %s"
% (len(unstable_list), self.CM.upcount(), repr(unstable_list)))
if up_are_down > 0:
passed = 0
self.CM.log("%d (of %d) nodes expected to be up were down."
% (up_are_down, len(self.CM.Env["nodes"])))
if down_are_up > 0:
passed = 0
self.CM.log("%d (of %d) nodes expected to be down were up."
% (down_are_up, len(self.CM.Env["nodes"])))
return passed
def name(self):
return "ControllerStateAudit"
def is_applicable(self):
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self.CM["Name"] == "crm-corosync":
# return 1
return 0
class CIBAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
def has_key(self, key):
return key in self.Stats
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
def __call__(self):
passed = 1
ccm_partitions = self.CM.find_partitions()
if len(ccm_partitions) == 0:
self.debug("\tNo partitions to audit")
return 1
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
partition_passed = 0
if self.audit_cib_contents(partition) == 0:
passed = 0
return passed
def audit_cib_contents(self, hostlist):
passed = 1
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self.store_remote_cib(node, node0)
if node_xml == None:
self.CM.log("Could not perform audit: No configuration from %s" % node)
passed = 0
elif node0 == None:
node0 = node
node0_xml = node_xml
elif node0_xml == None:
self.CM.log("Could not perform audit: No configuration from %s" % node0)
passed = 0
else:
(rc, result) = self.CM.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = 0
for line in result:
if not re.search("<diff/>", line):
passed = 0
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
# self.CM.rsh(node0, "rm -f %s" % node_xml)
# self.CM.rsh(node0, "rm -f %s" % node0_xml)
return passed
def store_remote_cib(self, node, target):
combined = ""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], verbose=1)
if rc != 0:
self.CM.log("Could not retrieve configuration")
return None
self.CM.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self.CM.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self.CM.log("Could not store configuration")
return None
return filename
def name(self):
return "CibAudit"
def is_applicable(self):
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self.CM["Name"] == "crm-corosync":
# return 1
return 0
class PartitionAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
self.NodeEpoch = {}
self.NodeState = {}
self.NodeQuorum = {}
def has_key(self, key):
return key in self.Stats
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
def __call__(self):
passed = 1
ccm_partitions = self.CM.find_partitions()
if ccm_partitions == None or len(ccm_partitions) == 0:
return 1
self.CM.cluster_stable(double_check=True)
if len(ccm_partitions) != self.CM.partitions_expected:
self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
passed = 0
for partition in ccm_partitions:
self.CM.log("\t %s" % partition)
for partition in ccm_partitions:
partition_passed = 0
if self.audit_partition(partition) == 0:
passed = 0
return passed
def trim_string(self, avalue):
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
def trim2int(self, avalue):
if not avalue:
return None
if len(avalue) > 1:
return int(avalue[:-1])
def audit_partition(self, partition):
passed = 1
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % (partition))
for node in node_list:
if self.CM.ShouldBeStatus[node] != "up":
self.CM.log("Warn: Node %s appeared out of nowhere" % (node))
self.CM.ShouldBeStatus[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self.CM.rsh(node, self.CM["StatusCmd"] % node, verbose=1)
self.NodeState[node] = out[0].strip()
(_, out) = self.CM.rsh(node, self.CM["EpochCmd"], verbose=1)
self.NodeEpoch[node] = out[0].strip()
(_, out) = self.CM.rsh(node, self.CM["QuorumCmd"], verbose=1)
self.NodeQuorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoch[node], self.NodeQuorum[node]))
self.NodeState[node] = self.trim_string(self.NodeState[node])
self.NodeEpoch[node] = self.trim2int(self.NodeEpoch[node])
self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node])
if not self.NodeEpoch[node]:
self.CM.log("Warn: Node %s dissappeared: cant determin epoch" % (node))
self.CM.ShouldBeStatus[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch == None or self.NodeEpoch[node] < lowest_epoch:
lowest_epoch = self.NodeEpoch[node]
if not lowest_epoch:
self.CM.log("Lowest epoch not determined in %s" % (partition))
passed = 0
for node in node_list:
if self.CM.ShouldBeStatus[node] == "up":
if self.CM.is_node_dc(node, self.NodeState[node]):
dc_found.append(node)
if self.NodeEpoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self.NodeEpoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self.CM.log("DC %s is not the oldest node (%d vs. %d)"
% (node, self.NodeEpoch[node], lowest_epoch))
passed = 0
if len(dc_found) == 0:
self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)"
% (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self.CM.log("%d DCs (%s) found in cluster partition: %s"
% (len(dc_found), str(dc_found), str(node_list)))
passed = 0
if passed == 0:
for node in node_list:
if self.CM.ShouldBeStatus[node] == "up":
self.CM.log("epoch %s : %s"
% (self.NodeEpoch[node], self.NodeState[node]))
return passed
def name(self):
return "PartitionAudit"
def is_applicable(self):
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self.CM["Name"] == "crm-corosync":
# return 1
return 0
AllAuditClasses.append(DiskAudit)
AllAuditClasses.append(FileAudit)
AllAuditClasses.append(LogAudit)
AllAuditClasses.append(ControllerStateAudit)
AllAuditClasses.append(PartitionAudit)
AllAuditClasses.append(PrimitiveAudit)
AllAuditClasses.append(GroupAudit)
AllAuditClasses.append(CloneAudit)
AllAuditClasses.append(ColocationAudit)
AllAuditClasses.append(CIBAudit)
def AuditList(cm):
result = []
for auditclass in AllAuditClasses:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/cts/lab/CTSlab.py.in b/cts/lab/CTSlab.py.in
index 88f23cf36f..7b229847c9 100644
--- a/cts/lab/CTSlab.py.in
+++ b/cts/lab/CTSlab.py.in
@@ -1,137 +1,137 @@
#!@PYTHON@
""" Command-line interface to Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2001-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys, signal, os
pdir = os.path.dirname(sys.path[0])
sys.path.insert(0, pdir) # So that things work from the source directory
try:
from cts.CTS import *
from cts.CM_corosync import *
from cts.CTSaudits import AuditList
from cts.CTStests import TestList
from cts.CTSscenarios import *
from pacemaker._cts.logging import LogFactory
except ImportError as e:
sys.stderr.write("abort: %s\n" % e)
sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" %
' '.join(sys.path))
sys.exit(1)
# These are globals so they can be used by the signal handler.
scenario = None
LogFactory().add_stderr()
def sig_handler(signum, frame) :
LogFactory().log("Interrupted by signal %d"%signum)
if scenario: scenario.summarize()
if signum == 15 :
if scenario: scenario.TearDown()
sys.exit(1)
def plural_s(n, uppercase=False):
if n == 1:
return ""
elif uppercase:
return "S"
else:
return "s"
if __name__ == '__main__':
Environment = CtsLab(sys.argv[1:])
NumIter = Environment["iterations"]
if NumIter is None:
NumIter = 1
Tests = []
# Set the signal handler
signal.signal(15, sig_handler)
signal.signal(10, sig_handler)
# Create the Cluster Manager object
cm = None
if Environment["Stack"] == "corosync 2+":
cm = crm_corosync()
else:
LogFactory().log("Unknown stack: "+Environment["stack"])
sys.exit(1)
- if Environment["TruncateLog"] == 1:
+ if Environment["TruncateLog"]:
if Environment["OutputFile"] is None:
LogFactory().log("Ignoring truncate request because no output file specified")
else:
LogFactory().log("Truncating %s" % Environment["OutputFile"])
with open(Environment["OutputFile"], "w") as outputfile:
outputfile.truncate(0)
Audits = AuditList(cm)
- if Environment["ListTests"] == 1:
+ if Environment["ListTests"]:
Tests = TestList(cm, Audits)
LogFactory().log("Total %d tests"%len(Tests))
for test in Tests :
LogFactory().log(str(test.name));
sys.exit(0)
elif len(Environment["tests"]) == 0:
Tests = TestList(cm, Audits)
else:
Chosen = Environment["tests"]
for TestCase in Chosen:
match = None
for test in TestList(cm, Audits):
if test.name == TestCase:
match = test
if not match:
LogFactory().log("--choose: No applicable/valid tests chosen")
sys.exit(1)
else:
Tests.append(match)
# Scenario selection
if Environment["scenario"] == "basic-sanity":
scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests)
elif Environment["scenario"] == "all-once":
NumIter = len(Tests)
scenario = AllOnce(
- cm, [ BootCluster(Environment), PacketLoss(Environment) ], Audits, Tests)
+ cm, [ BootCluster(Environment) ], Audits, Tests)
elif Environment["scenario"] == "sequence":
scenario = Sequence(
- cm, [ BootCluster(Environment), PacketLoss(Environment) ], Audits, Tests)
+ cm, [ BootCluster(Environment) ], Audits, Tests)
elif Environment["scenario"] == "boot":
scenario = Boot(cm, [ LeaveBooted(Environment)], Audits, [])
else:
scenario = RandomTests(
- cm, [ BootCluster(Environment), PacketLoss(Environment) ], Audits, Tests)
+ cm, [ BootCluster(Environment) ], Audits, Tests)
LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TEST" + plural_s(NumIter, True) + " ")
LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"]))
LogFactory().log("Schema: %s" % Environment["Schema"])
LogFactory().log("Scenario: %s" % scenario.__doc__)
LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"])
LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"])
LogFactory().log("Random Seed: %s" % Environment["RandSeed"])
LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip())
LogFactory().log("System log files: %s" % Environment["LogFileName"])
if Environment.has_key("IPBase"):
LogFactory().log("Base IP for resources: %s" % Environment["IPBase"])
LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"])
Environment.dump()
rc = Environment.run(scenario, NumIter)
sys.exit(rc)
diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py
index e6f7bb5123..15caac6564 100644
--- a/cts/lab/CTSscenarios.py
+++ b/cts/lab/CTSscenarios.py
@@ -1,601 +1,562 @@
""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2021 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import sys
import time
from cts.CTStests import CTSTest
from cts.CTSaudits import ClusterAudit
from cts.watcher import LogWatcher
class ScenarioComponent(object):
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
- '''Return TRUE if the current ScenarioComponent is applicable
+ '''Return True if the current ScenarioComponent is applicable
in the given LabEnvironment given to the constructor.
'''
raise ValueError("Abstract Class member (IsApplicable)")
def SetUp(self, CM):
'''Set up the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
def TearDown(self, CM):
'''Tear down (undo) the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
class Scenario(object):
(
'''The basic idea of a scenario is that of an ordered list of
ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn,
and then after the tests have been run, they are torn down using TearDown()
(in reverse order).
A Scenario is applicable to a particular cluster manager iff each
ScenarioComponent is applicable.
A partially set up scenario is torn down if it fails during setup.
''')
def __init__(self, ClusterManager, Components, Audits, Tests):
"Initialize the Scenario from the list of ScenarioComponents"
self.ClusterManager = ClusterManager
self.Components = Components
self.Audits = Audits
self.Tests = Tests
self.BadNews = None
self.TestSets = []
self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0}
self.Sets = []
#self.ns=CTS.NodeStatus(self.Env)
for comp in Components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in Audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in Tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def IsApplicable(self):
(
'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable()
'''
)
for comp in self.Components:
if not comp.IsApplicable():
return None
- return 1
+ return True
def SetUp(self):
'''Set up the Scenario. Return TRUE on success.'''
self.ClusterManager.prepare()
self.audit() # Also detects remote/local log config
- self.ClusterManager.StatsMark(0)
self.ClusterManager.ns.WaitForAllNodesToComeUp(self.ClusterManager.Env["nodes"])
self.audit()
self.ClusterManager.install_support()
self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"],
self.ClusterManager.templates.get_patterns("BadNews"),
"BadNews", 0,
kind=self.ClusterManager.Env["LogWatcher"],
hosts=self.ClusterManager.Env["nodes"])
self.BadNews.setwatch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self.Components):
if not self.Components[j].SetUp(self.ClusterManager):
# OOPS! We failed. Tear partial setups down.
self.audit()
self.ClusterManager.log("Tearing down partial setup")
self.TearDown(j)
return None
j = j + 1
self.audit()
return 1
def TearDown(self, max=None):
'''Tear Down the Scenario - in reverse order.'''
if max == None:
max = len(self.Components)-1
j = max
while j >= 0:
self.Components[j].TearDown(self.ClusterManager)
j = j - 1
self.audit()
- self.ClusterManager.StatsExtract()
self.ClusterManager.install_support("uninstall")
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
def run(self, Iterations):
self.ClusterManager.oprofileStart()
try:
self.run_loop(Iterations)
self.ClusterManager.oprofileStop()
except:
self.ClusterManager.oprofileStop()
raise
def run_loop(self, Iterations):
raise ValueError("Abstract Class member (run_loop)")
def run_test(self, test, testcount):
- nodechoice = self.ClusterManager.Env.RandomNode()
+ nodechoice = self.ClusterManager.Env.random_node()
ret = 1
where = ""
did_run = 0
- self.ClusterManager.StatsMark(testcount)
self.ClusterManager.instance_errorstoignore_clear()
self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]")
starttime = test.set_timer()
if not test.setup(nodechoice):
self.ClusterManager.log("Setup failed")
ret = 0
elif not test.canrunnow(nodechoice):
self.ClusterManager.log("Skipped")
test.skipped()
else:
did_run = 1
ret = test(nodechoice)
if not test.teardown(nodechoice):
self.ClusterManager.log("Teardown failed")
- if self.ClusterManager.Env["continue"] == 1:
+ if self.ClusterManager.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = 0
stoptime = time.time()
self.ClusterManager.oprofileSave(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if not test["min_time"]:
test["elapsed_time"] = elapsed_time
test["min_time"] = test_time
test["max_time"] = test_time
else:
test["elapsed_time"] = test["elapsed_time"] + elapsed_time
if test_time < test["min_time"]:
test["min_time"] = test_time
if test_time > test["max_time"]:
test["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self.ClusterManager.statall()
did_run = 1 # Force the test count to be incremented anyway so test extraction works
self.audit(test.errorstoignore())
return did_run
def summarize(self):
self.ClusterManager.log("****************")
self.ClusterManager.log("Overall Results:" + repr(self.Stats))
self.ClusterManager.log("****************")
stat_filter = {
"calls":0,
"failure":0,
"skipped":0,
"auditfail":0,
}
self.ClusterManager.log("Test Summary")
for test in self.Tests:
for key in list(stat_filter.keys()):
stat_filter[key] = test.Stats[key]
self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter))
self.ClusterManager.debug("Detailed Results")
for test in self.Tests:
self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats))
self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, LocalIgnore=[]):
errcount = 0
ignorelist = []
ignorelist.append("CTS:")
ignorelist.extend(LocalIgnore)
ignorelist.extend(self.ClusterManager.errorstoignore())
ignorelist.extend(self.ClusterManager.instance_errorstoignore())
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self.Audits:
if not audit():
self.ClusterManager.log("Audit " + audit.name() + " FAILED.")
failed += 1
else:
self.ClusterManager.debug("Audit " + audit.name() + " passed.")
while errcount < 1000:
match = None
if self.BadNews:
match = self.BadNews.look(0)
if match:
add_err = 1
for ignore in ignorelist:
if add_err == 1 and re.search(ignore, match):
add_err = 0
if add_err == 1:
self.ClusterManager.log("BadNews: " + match)
self.incr("BadNews")
errcount = errcount + 1
else:
break
else:
- if self.ClusterManager.Env["continue"] == 1:
+ if self.ClusterManager.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Big problems. Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
self.ClusterManager.log("Shutting down.")
self.summarize()
self.TearDown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self.BadNews:
self.BadNews.end()
return failed
class AllOnce(Scenario):
'''Every Test Once''' # Accessable as __doc__
def run_loop(self, Iterations):
testcount = 1
for test in self.Tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
'''Random Test Execution'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
- test = self.ClusterManager.Env.RandomGen.choice(self.Tests)
+ test = self.ClusterManager.Env.random_gen.choice(self.Tests)
self.run_test(test, testcount)
testcount += 1
class BasicSanity(Scenario):
'''Basic Cluster Sanity'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
- test = self.Environment.RandomGen.choice(self.Tests)
+ test = self.Environment.random_gen.choice(self.Tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
'''Named Tests in Sequence'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
for test in self.Tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
'''Start the Cluster'''
def run_loop(self, Iterations):
testcount = 0
class BootCluster(ScenarioComponent):
(
'''BootCluster is the most basic of ScenarioComponents.
This ScenarioComponent simply starts the cluster manager on all the nodes.
It is fairly robust as it waits for all nodes to come up before starting
as they might have been rebooted or crashed for some reason beforehand.
''')
def __init__(self, Env):
pass
def IsApplicable(self):
'''BootCluster is so generic it is always Applicable'''
- return 1
+ return True
def SetUp(self, CM):
'''Basic Cluster Manager startup. Start everything'''
CM.prepare()
# Clear out the cobwebs ;-)
CM.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all nodes.")
return CM.startall(verbose=True, quick=True)
def TearDown(self, CM, force=False):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Stopping Cluster Manager on all nodes")
return CM.stopall(verbose=True, force=force)
class LeaveBooted(BootCluster):
def TearDown(self, CM):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Leaving Cluster running on all nodes")
return 1
class PingFest(ScenarioComponent):
(
'''PingFest does a flood ping to each node in the cluster from the test machine.
If the LabEnvironment Parameter PingSize is set, it will be used as the size
of ping packet requested (via the -s option). If it is not set, it defaults
to 1024 bytes.
According to the manual page for ping:
Outputs packets as fast as they come back or one hundred times per
second, whichever is more. For every ECHO_REQUEST sent a period ``.''
is printed, while for every ECHO_REPLY received a backspace is printed.
This provides a rapid display of how many packets are being dropped.
Only the super-user may use this option. This can be very hard on a net-
work and should be used with caution.
''' )
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''PingFests are always applicable ;-)
'''
- return 1
+ return True
def SetUp(self, CM):
'''Start the PingFest!'''
self.PingSize = 1024
if "PingSize" in list(CM.Env.keys()):
self.PingSize = CM.Env["PingSize"]
CM.log("Starting %d byte flood pings" % self.PingSize)
self.PingPids = []
for node in CM.Env["nodes"]:
self.PingPids.append(self._pingchild(node))
CM.log("Ping PIDs: " + repr(self.PingPids))
return 1
def TearDown(self, CM):
'''Stop it right now! My ears are pinging!!'''
for pid in self.PingPids:
if pid != None:
CM.log("Stopping ping process %d" % pid)
os.kill(pid, signal.SIGKILL)
def _pingchild(self, node):
Args = ["ping", "-qfn", "-s", str(self.PingSize), node]
sys.stdin.flush()
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid < 0:
self.Env.log("Cannot fork ping child")
return None
if pid > 0:
return pid
# Otherwise, we're the child process.
os.execvp("ping", Args)
self.Env.log("Cannot execvp ping: " + repr(Args))
sys.exit(1)
-class PacketLoss(ScenarioComponent):
- (
-'''
-It would be useful to do some testing of CTS with a modest amount of packet loss
-enabled - so we could see that everything runs like it should with a certain
-amount of packet loss present.
-''')
-
- def IsApplicable(self):
- '''always Applicable'''
- return 1
-
- def SetUp(self, CM):
- '''Reduce the reliability of communications'''
- if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
- return 1
-
- for node in CM.Env["nodes"]:
- CM.reducecomm_node(node)
-
- CM.log("Reduce the reliability of communications")
-
- return 1
-
- def TearDown(self, CM):
- '''Fix the reliability of communications'''
-
- if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
- return 1
-
- for node in CM.Env["nodes"]:
- CM.unisolate_node(node)
-
- CM.log("Fix the reliability of communications")
-
-
class BasicSanityCheck(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["DoBSC"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on BSC node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on BSC node(s).")
return CM.stopall()
class Benchmark(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["benchmark"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM, force=True)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on all node(s).")
return CM.stopall()
class RollingUpgrade(ScenarioComponent):
(
'''
Test a rolling upgrade between two versions of the stack
''')
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
if not self.Env["rpm-dir"]:
return None
if not self.Env["current-version"]:
return None
if not self.Env["previous-version"]:
return None
- return 1
+ return True
def install(self, node, version):
target_dir = "/tmp/rpm-%s" % version
src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version)
self.CM.rsh(node, "mkdir -p %s" % target_dir)
rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir))
self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir))
return self.success()
def upgrade(self, node):
return self.install(node, self.CM.Env["current-version"])
def downgrade(self, node):
return self.install(node, self.CM.Env["previous-version"])
def SetUp(self, CM):
print(repr(self)+"prepare")
CM.prepare()
# Clear out the cobwebs
CM.stopall(force=True)
CM.log("Downgrading all nodes to %s." % self.Env["previous-version"])
for node in self.Env["nodes"]:
if not self.downgrade(node):
CM.log("Couldn't downgrade %s" % node)
return None
return 1
def TearDown(self, CM):
# Stop everything
CM.log("Stopping Cluster Manager on Upgrade nodes.")
CM.stopall()
CM.log("Upgrading all nodes to %s." % self.Env["current-version"])
for node in self.Env["nodes"]:
if not self.upgrade(node):
CM.log("Couldn't upgrade %s" % node)
return None
return 1
diff --git a/cts/lab/CTStests.py b/cts/lab/CTStests.py
index f238a24199..d32f4a4b94 100644
--- a/cts/lab/CTStests.py
+++ b/cts/lab/CTStests.py
@@ -1,3178 +1,3178 @@
""" Test-specific classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
#
# SPECIAL NOTE:
#
# Tests may NOT implement any cluster-manager-specific code in them.
# EXTEND the ClusterManager object to provide the base capabilities
# the test needs if you need to do something that the current CM classes
# do not. Otherwise you screw up the whole point of the object structure
# in CTS.
#
# Thank you.
#
import os
import re
import time
import subprocess
import tempfile
from stat import *
from cts import CTS
from cts.CTSaudits import *
from cts.watcher import LogWatcher
-from cts.environment import EnvFactory
from pacemaker import BuildOptions
+from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
AllTestClasses = [ ]
class CTSTest(object):
'''
A Cluster test.
We implement the basic set of properties and behaviors for a generic
cluster test.
Cluster tests track their own statistics.
We keep each of the kinds of counts we track as separate {name,value}
pairs.
'''
def __init__(self, cm):
#self.name="the unnamed test"
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
# if not issubclass(cm.__class__, ClusterManager):
# raise ValueError("Must be a ClusterManager object")
self.CM = cm
self.Env = EnvFactory().getInstance()
self.rsh = RemoteFactory().getInstance()
self.logger = LogFactory()
self.templates = PatternSelector(cm["Name"])
self.Audits = []
self.timeout = 120
self.passed = 1
self.is_loop = 0
self.is_unsafe = 0
self.is_experimental = 0
self.is_container = 0
self.is_valgrind = 0
self.benchmark = 0 # which tests to benchmark
self.timer = {} # timers
def log(self, args):
self.logger.log(args)
def debug(self, args):
self.logger.debug(args)
def has_key(self, key):
return key in self.Stats
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
if str(key) == "0":
raise ValueError("Bad call to 'foo in X', should reference 'foo in X.Stats' instead")
if key in self.Stats:
return self.Stats[key]
return None
def log_mark(self, msg):
self.debug("MARK: test %s %s %d" % (self.name,msg,time.time()))
return
def get_timer(self,key = "test"):
try: return self.timer[key]
except: return 0
def set_timer(self,key = "test"):
self.timer[key] = time.time()
return self.timer[key]
def log_timer(self,key = "test"):
elapsed = 0
if key in self.timer:
elapsed = time.time() - self.timer[key]
s = key == "test" and self.name or "%s:%s" % (self.name,key)
self.debug("%s runtime: %.2f" % (s, elapsed))
del self.timer[key]
return elapsed
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
# Reset the test passed boolean
if name == "calls":
self.passed = 1
def failure(self, reason="none"):
'''Increment the failure count'''
self.passed = 0
self.incr("failure")
self.logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
return None
def success(self):
'''Increment the success count'''
self.incr("success")
return 1
def skipped(self):
'''Increment the skipped count'''
self.incr("skipped")
return 1
def __call__(self, node):
'''Perform the given test'''
raise ValueError("Abstract Class member (__call__)")
self.incr("calls")
return self.failure()
def audit(self):
passed = 1
if len(self.Audits) > 0:
for audit in self.Audits:
if not audit():
self.logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name()))
self.incr("auditfail")
passed = 0
return passed
def setup(self, node):
'''Setup the given test'''
return self.success()
def teardown(self, node):
'''Tear down the given test'''
return self.success()
def create_watch(self, patterns, timeout, name=None):
if not name:
name = self.name
return LogWatcher(self.Env["LogFileName"], patterns, name, timeout, kind=self.Env["LogWatcher"], hosts=self.Env["nodes"])
def local_badnews(self, prefix, watch, local_ignore=[]):
errcount = 0
if not prefix:
prefix = "LocalBadNews:"
ignorelist = []
ignorelist.append(" CTS: ")
ignorelist.append(prefix)
ignorelist.extend(local_ignore)
while errcount < 100:
match = watch.look(0)
if match:
add_err = 1
for ignore in ignorelist:
if add_err == 1 and re.search(ignore, match):
add_err = 0
if add_err == 1:
self.logger.log(prefix + " " + match)
errcount = errcount + 1
else:
break
else:
self.logger.log("Too many errors!")
watch.end()
return errcount
def is_applicable(self):
return self.is_applicable_common()
def is_applicable_common(self):
- '''Return TRUE if we are applicable in the current test configuration'''
+ '''Return True if we are applicable in the current test configuration'''
#raise ValueError("Abstract Class member (is_applicable)")
if self.is_loop and not self.Env["loop-tests"]:
- return 0
+ return False
elif self.is_unsafe and not self.Env["unsafe-tests"]:
- return 0
+ return False
elif self.is_valgrind and not self.Env["valgrind-tests"]:
- return 0
+ return False
elif self.is_experimental and not self.Env["experimental-tests"]:
- return 0
+ return False
elif self.is_container and not self.Env["container-tests"]:
- return 0
+ return False
elif self.Env["benchmark"] and self.benchmark == 0:
- return 0
+ return False
- return 1
+ return True
def find_ocfs2_resources(self, node):
self.r_o2cb = None
self.r_ocfs2 = []
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self.CM, line)
if r.rtype == "o2cb" and r.parent != "NA":
self.debug("Found o2cb: %s" % self.r_o2cb)
self.r_o2cb = r.parent
if re.search("^Constraint", line):
c = AuditConstraint(self.CM, line)
if c.type == "rsc_colocation" and c.target == self.r_o2cb:
self.r_ocfs2.append(c.rsc)
self.debug("Found ocfs2 filesystems: %s" % repr(self.r_ocfs2))
return len(self.r_ocfs2)
def canrunnow(self, node):
'''Return TRUE if we can meaningfully run right now'''
return 1
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return []
class StopTest(CTSTest):
'''Stop (deactivate) the cluster manager on a node'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "Stop"
def __call__(self, node):
'''Perform the 'stop' test. '''
self.incr("calls")
if self.CM.ShouldBeStatus[node] != "up":
return self.skipped()
patterns = []
# Technically we should always be able to notice ourselves stopping
patterns.append(self.templates["Pat:We_stopped"] % node)
# Any active node needs to notice this one left
# (note that this won't work if we have multiple partitions)
for other in self.Env["nodes"]:
if self.CM.ShouldBeStatus[other] == "up" and other != node:
patterns.append(self.templates["Pat:They_stopped"] %(other, self.CM.key_for_node(node)))
#self.debug("Checking %s will notice %s left"%(other, node))
watch = self.create_watch(patterns, self.Env["DeadTime"])
watch.setwatch()
if node == self.CM.OurNode:
self.incr("us")
else:
if self.CM.upcount() <= 1:
self.incr("all")
else:
self.incr("them")
self.CM.StopaCM(node)
watch_result = watch.lookforall()
failreason = None
UnmatchedList = "||"
if watch.unmatched:
(_, output) = self.rsh(node, "/bin/ps axf", verbose=1)
for line in output:
self.debug(line)
(_, output) = self.rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
for line in output:
self.debug(line)
for regex in watch.unmatched:
self.logger.log ("ERROR: Shutdown pattern not found: %s" % (regex))
UnmatchedList += regex + "||";
failreason = "Missing shutdown pattern"
self.CM.cluster_stable(self.Env["DeadTime"])
if not watch.unmatched or self.CM.upcount() == 0:
return self.success()
if len(watch.unmatched) >= self.CM.upcount():
return self.failure("no match against (%s)" % UnmatchedList)
if failreason == None:
return self.success()
else:
return self.failure(failreason)
#
# We don't register StopTest because it's better when called by
# another test...
#
class StartTest(CTSTest):
'''Start (activate) the cluster manager on a node'''
def __init__(self, cm, debug=None):
CTSTest.__init__(self,cm)
self.name = "start"
self.debug = debug
def __call__(self, node):
'''Perform the 'start' test. '''
self.incr("calls")
if self.CM.upcount() == 0:
self.incr("us")
else:
self.incr("them")
if self.CM.ShouldBeStatus[node] != "down":
return self.skipped()
elif self.CM.StartaCM(node):
return self.success()
else:
return self.failure("Startup %s on node %s failed"
% (self.Env["Name"], node))
#
# We don't register StartTest because it's better when called by
# another test...
#
class FlipTest(CTSTest):
'''If it's running, stop it. If it's stopped start it.
Overthrow the status quo...
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Flip"
self.start = StartTest(cm)
self.stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'Flip' test. '''
self.incr("calls")
if self.CM.ShouldBeStatus[node] == "up":
self.incr("stopped")
ret = self.stop(node)
type = "up->down"
# Give the cluster time to recognize it's gone...
time.sleep(self.Env["StableTime"])
elif self.CM.ShouldBeStatus[node] == "down":
self.incr("started")
ret = self.start(node)
type = "down->up"
else:
return self.skipped()
self.incr(type)
if ret:
return self.success()
else:
return self.failure("%s failure" % type)
# Register FlipTest as a good test to run
AllTestClasses.append(FlipTest)
class RestartTest(CTSTest):
'''Stop and restart a node'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Restart"
self.start = StartTest(cm)
self.stop = StopTest(cm)
self.benchmark = 1
def __call__(self, node):
'''Perform the 'restart' test. '''
self.incr("calls")
self.incr("node:" + node)
ret1 = 1
if self.CM.StataCM(node):
self.incr("WasStopped")
if not self.start(node):
return self.failure("start (setup) failure: "+node)
self.set_timer()
if not self.stop(node):
return self.failure("stop failure: "+node)
if not self.start(node):
return self.failure("start failure: "+node)
return self.success()
# Register RestartTest as a good test to run
AllTestClasses.append(RestartTest)
class StonithdTest(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "Stonithd"
self.startall = SimulStartLite(cm)
self.benchmark = 1
def __call__(self, node):
self.incr("calls")
if len(self.Env["nodes"]) < 2:
return self.skipped()
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
is_dc = self.CM.is_node_dc(node)
watchpats = []
watchpats.append(self.templates["Pat:Fencing_ok"] % node)
watchpats.append(self.templates["Pat:NodeFenced"] % node)
- if self.Env["at-boot"] == 0:
+ if not self.Env["at-boot"]:
self.debug("Expecting %s to stay down" % node)
self.CM.ShouldBeStatus[node] = "down"
else:
self.debug("Expecting %s to come up again %d" % (node, self.Env["at-boot"]))
watchpats.append("%s.* S_STARTING -> S_PENDING" % node)
watchpats.append("%s.* S_PENDING -> S_NOT_DC" % node)
watch = self.create_watch(watchpats, 30 + self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"])
watch.setwatch()
- origin = self.Env.RandomGen.choice(self.Env["nodes"])
+ origin = self.Env.random_gen.choice(self.Env["nodes"])
(rc, _) = self.rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
if rc == 124: # CRM_EX_TIMEOUT
# Look for the patterns, usually this means the required
# device was running on the node to be fenced - or that
# the required devices were in the process of being loaded
# and/or moved
#
# Effectively the node committed suicide so there will be
# no confirmation, but pacemaker should be watching and
# fence the node again
self.logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
elif origin != node and rc != 0:
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self.CM.ns.WaitForAllNodesToComeUp(self.Env["nodes"], 600)
self.logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
elif origin == node and rc != 255:
# 255 == broken pipe, ie. the node was fenced as expected
self.logger.log("Locally originated fencing returned %d" % rc)
self.set_timer("fence")
matched = watch.lookforall()
self.log_timer("fence")
self.set_timer("reform")
if watch.unmatched:
self.logger.log("Patterns not found: " + repr(watch.unmatched))
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
self.debug("Waiting for fenced node to come back up")
self.CM.ns.WaitForAllNodesToComeUp(self.Env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self.CM.cluster_stable(self.Env["StartTime"])
if not matched:
return self.failure("Didn't find all expected patterns")
elif not is_stable:
return self.failure("Cluster did not become stable")
self.log_timer("reform")
return self.success()
def errorstoignore(self):
return [
self.templates["Pat:Fencing_start"] % ".*",
self.templates["Pat:Fencing_ok"] % ".*",
self.templates["Pat:Fencing_active"],
r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired",
]
def is_applicable(self):
if not self.is_applicable_common():
- return 0
+ return False
if "DoFencing" in list(self.Env.keys()):
return self.Env["DoFencing"]
- return 1
+ return True
AllTestClasses.append(StonithdTest)
class StartOnebyOne(CTSTest):
'''Start all the nodes ~ one by one'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "StartOnebyOne"
self.stopall = SimulStopLite(cm)
self.start = StartTest(cm)
self.ns = CTS.NodeStatus(cm.Env)
def __call__(self, dummy):
'''Perform the 'StartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Test setup failed")
failed = []
self.set_timer()
for node in self.Env["nodes"]:
if not self.start(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to start: " + repr(failed))
return self.success()
# Register StartOnebyOne as a good test to run
AllTestClasses.append(StartOnebyOne)
class SimulStart(CTSTest):
'''Start all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStart"
self.stopall = SimulStopLite(cm)
self.startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStart' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
if not self.startall(None):
return self.failure("Startall failed")
return self.success()
# Register SimulStart as a good test to run
AllTestClasses.append(SimulStart)
class SimulStop(CTSTest):
'''Stop all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStop"
self.startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStop' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
if not self.stopall(None):
return self.failure("Stopall failed")
return self.success()
# Register SimulStop as a good test to run
AllTestClasses.append(SimulStop)
class StopOnebyOne(CTSTest):
'''Stop all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "StopOnebyOne"
self.startall = SimulStartLite(cm)
self.stop = StopTest(cm)
def __call__(self, dummy):
'''Perform the 'StopOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
failed = []
self.set_timer()
for node in self.Env["nodes"]:
if not self.stop(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to stop: " + repr(failed))
return self.success()
# Register StopOnebyOne as a good test to run
AllTestClasses.append(StopOnebyOne)
class RestartOnebyOne(CTSTest):
'''Restart all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RestartOnebyOne"
self.startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'RestartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
did_fail = []
self.set_timer()
self.restart = RestartTest(self.CM)
for node in self.Env["nodes"]:
if not self.restart(node):
did_fail.append(node)
if did_fail:
return self.failure("Could not restart %d nodes: %s"
% (len(did_fail), repr(did_fail)))
return self.success()
# Register StopOnebyOne as a good test to run
AllTestClasses.append(RestartOnebyOne)
class PartialStart(CTSTest):
'''Start a node - but tell it to stop before it finishes starting up'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "PartialStart"
self.startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
self.stop = StopTest(cm)
#self.is_unsafe = 1
def __call__(self, node):
'''Perform the 'PartialStart' test. '''
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
watchpats = []
watchpats.append("pacemaker-controld.*Connecting to .* cluster infrastructure")
watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
watch.setwatch()
self.CM.StartaCMnoBlock(node)
ret = watch.lookforall()
if not ret:
self.logger.log("Patterns not found: " + repr(watch.unmatched))
return self.failure("Setup of %s failed" % node)
ret = self.stop(node)
if not ret:
return self.failure("%s did not stop in time" % node)
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
# We might do some fencing in the 2-node case if we make it up far enough
return [
r"Executing reboot fencing operation",
r"Requesting fencing \([^)]+\) of node ",
]
# Register StopOnebyOne as a good test to run
AllTestClasses.append(PartialStart)
class StandbyTest(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Standby"
self.benchmark = 1
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
# make sure the node is active
# set the node to standby mode
# check resources, none resource should be running on the node
# set the node to active mode
# check resouces, resources should have been migrated back (SHOULD THEY?)
def __call__(self, node):
self.incr("calls")
ret = self.startall(None)
if not ret:
return self.failure("Start all nodes failed")
self.debug("Make sure node %s is active" % node)
if self.CM.StandbyStatus(node) != "off":
if not self.CM.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self.CM.cluster_stable()
status = self.CM.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.debug("Getting resources running on node %s" % node)
rsc_on_node = self.CM.active_resources(node)
watchpats = []
watchpats.append(r"State transition .* -> S_POLICY_ENGINE")
watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
watch.setwatch()
self.debug("Setting node %s to standby mode" % node)
if not self.CM.SetStandbyMode(node, "on"):
return self.failure("can't set node %s to standby mode" % node)
self.set_timer("on")
ret = watch.lookforall()
if not ret:
self.logger.log("Patterns not found: " + repr(watch.unmatched))
self.CM.SetStandbyMode(node, "off")
return self.failure("cluster didn't react to standby change on %s" % node)
self.CM.cluster_stable()
status = self.CM.StandbyStatus(node)
if status != "on":
return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status))
self.log_timer("on")
self.debug("Checking resources")
bad_run = self.CM.active_resources(node)
if len(bad_run) > 0:
rc = self.failure("%s set to standby, %s is still running on it" % (node, repr(bad_run)))
self.debug("Setting node %s to active mode" % node)
self.CM.SetStandbyMode(node, "off")
return rc
self.debug("Setting node %s to active mode" % node)
if not self.CM.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self.set_timer("off")
self.CM.cluster_stable()
status = self.CM.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.log_timer("off")
return self.success()
AllTestClasses.append(StandbyTest)
class ValgrindTest(CTSTest):
'''Check for memory leaks'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Valgrind"
self.stopall = SimulStopLite(cm)
self.startall = SimulStartLite(cm)
self.is_valgrind = 1
self.is_loop = 1
def setup(self, node):
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Stop all nodes failed")
# @TODO Edit /etc/sysconfig/pacemaker on all nodes to enable valgrind,
# and clear any valgrind logs from previous runs. For now, we rely on
# the user to do this manually.
ret = self.startall(None)
if not ret:
return self.failure("Start all nodes failed")
return self.success()
def teardown(self, node):
# Return all nodes to normal
# @TODO Edit /etc/sysconfig/pacemaker on all nodes to disable valgrind
ret = self.stopall(None)
if not ret:
return self.failure("Stop all nodes failed")
return self.success()
def find_leaks(self):
# Check for leaks
# (no longer used but kept in case feature is restored)
leaked = []
self.stop = StopTest(self.CM)
for node in self.Env["nodes"]:
rc = self.stop(node)
if not rc:
self.failure("Couldn't shut down %s" % node)
(rc, _) = self.rsh(node, "grep -e indirectly.*lost:.*[1-9] -e definitely.*lost:.*[1-9] -e (ERROR|error).*SUMMARY:.*[1-9].*errors %s" % self.logger.logPat)
if rc != 1:
leaked.append(node)
self.failure("Valgrind errors detected on %s" % node)
(_, output) = self.rsh(node, "grep -e lost: -e SUMMARY: %s" % self.logger.logPat, verbose=1)
for line in output:
self.logger.log(line)
(_, output) = self.rsh(node, "cat %s" % self.logger.logPat, verbose=1)
for line in output:
self.debug(line)
self.rsh(node, "rm -f %s" % self.logger.logPat, verbose=1)
return leaked
def __call__(self, node):
#leaked = self.find_leaks()
#if len(leaked) > 0:
# return self.failure("Nodes %s leaked" % repr(leaked))
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [
r"pacemaker-based.*: \*\*\*\*\*\*\*\*\*\*\*\*\*",
r"pacemaker-based.*: .* avoid confusing Valgrind",
r"HA_VALGRIND_ENABLED",
]
class StandbyLoopTest(ValgrindTest):
'''Check for memory leaks by putting a node in and out of standby for an hour'''
# @TODO This is not a useful test for memory leaks
def __init__(self, cm):
ValgrindTest.__init__(self,cm)
self.name = "StandbyLoop"
def __call__(self, node):
lpc = 0
delay = 2
failed = 0
done = time.time() + self.Env["loop-minutes"] * 60
while time.time() <= done and not failed:
lpc = lpc + 1
time.sleep(delay)
if not self.CM.SetStandbyMode(node, "on"):
self.failure("can't set node %s to standby mode" % node)
failed = lpc
time.sleep(delay)
if not self.CM.SetStandbyMode(node, "off"):
self.failure("can't set node %s to active mode" % node)
failed = lpc
leaked = self.find_leaks()
if failed:
return self.failure("Iteration %d failed" % failed)
elif len(leaked) > 0:
return self.failure("Nodes %s leaked" % repr(leaked))
return self.success()
#AllTestClasses.append(StandbyLoopTest)
class BandwidthTest(CTSTest):
# Tests should not be cluster-manager-specific
# If you need to find out cluster manager configuration to do this, then
# it should be added to the generic cluster manager API.
'''Test the bandwidth which the cluster uses'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "Bandwidth"
self.start = StartTest(cm)
self.__setitem__("min",0)
self.__setitem__("max",0)
self.__setitem__("totalbandwidth",0)
(handle, self.tempfile) = tempfile.mkstemp(".cts")
os.close(handle)
self.startall = SimulStartLite(cm)
def __call__(self, node):
'''Perform the Bandwidth test'''
self.incr("calls")
if self.CM.upcount() < 1:
return self.skipped()
Path = self.CM.InternalCommConfig()
if "ip" not in Path["mediatype"]:
return self.skipped()
port = Path["port"][0]
port = int(port)
ret = self.startall(None)
if not ret:
return self.failure("Test setup failed")
time.sleep(5) # We get extra messages right after startup.
fstmpfile = "/var/run/band_estimate"
dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \
% (port, fstmpfile)
(rc, _) = self.rsh(node, dumpcmd)
if rc == 0:
farfile = "root@%s:%s" % (node, fstmpfile)
self.rsh.copy(farfile, self.tempfile)
Bandwidth = self.countbandwidth(self.tempfile)
if not Bandwidth:
self.logger.log("Could not compute bandwidth.")
return self.success()
intband = int(Bandwidth + 0.5)
self.logger.log("...bandwidth: %d bits/sec" % intband)
self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth
if self.Stats["min"] == 0:
self.Stats["min"] = Bandwidth
if Bandwidth > self.Stats["max"]:
self.Stats["max"] = Bandwidth
if Bandwidth < self.Stats["min"]:
self.Stats["min"] = Bandwidth
self.rsh(node, "rm -f %s" % fstmpfile)
os.unlink(self.tempfile)
return self.success()
else:
return self.failure("no response from tcpdump command [%d]!" % rc)
def countbandwidth(self, file):
fp = open(file, "r")
fp.seek(0)
count = 0
sum = 0
while 1:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count + 1
linesplit = line.split(" ")
for j in range(len(linesplit)-1):
if linesplit[j] == "udp": break
if linesplit[j] == "length:": break
try:
sum = sum + int(linesplit[j+1])
except ValueError:
self.logger.log("Invalid tcpdump line: %s" % line)
return None
T1 = linesplit[0]
timesplit = T1.split(":")
time2split = timesplit[2].split(".")
time1 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
break
while count < 100:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count+1
linessplit = line.split(" ")
for j in range(len(linessplit)-1):
if linessplit[j] == "udp": break
if linessplit[j] == "length:": break
try:
sum = int(linessplit[j+1]) + sum
except ValueError:
self.logger.log("Invalid tcpdump line: %s" % line)
return None
T2 = linessplit[0]
timesplit = T2.split(":")
time2split = timesplit[2].split(".")
time2 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
time = time2-time1
if (time <= 0):
return 0
return int((sum*8)/time)
def is_applicable(self):
'''BandwidthTest never applicable'''
- return 0
+ return False
AllTestClasses.append(BandwidthTest)
###################################################################
class MaintenanceMode(CTSTest):
###################################################################
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "MaintenanceMode"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.max = 30
#self.is_unsafe = 1
self.benchmark = 1
self.action = "asyncmon"
self.interval = 0
self.rid = "maintenanceDummy"
def toggleMaintenanceMode(self, node, action):
pats = []
pats.append(self.templates["Pat:DC_IDLE"])
# fail the resource right after turning Maintenance mode on
# verify it is not recovered until maintenance mode is turned off
if action == "On":
pats.append(self.templates["Pat:RscOpFail"] % (self.action, self.rid))
else:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
watch = self.create_watch(pats, 60)
watch.setwatch()
self.debug("Turning maintenance mode %s" % action)
self.rsh(node, self.templates["MaintenanceMode%s" % (action)])
if (action == "On"):
self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
self.set_timer("recover%s" % (action))
watch.lookforall()
self.log_timer("recover%s" % (action))
if watch.unmatched:
self.debug("Failed to find patterns when turning maintenance mode %s" % action)
return repr(watch.unmatched)
return ""
def insertMaintenanceDummy(self, node):
pats = []
pats.append(("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self.rid)))
watch = self.create_watch(pats, 60)
watch.setwatch()
self.CM.AddDummyRsc(node, self.rid)
self.set_timer("addDummy")
watch.lookforall()
self.log_timer("addDummy")
if watch.unmatched:
self.debug("Failed to find patterns when adding maintenance dummy resource")
return repr(watch.unmatched)
return ""
def removeMaintenanceDummy(self, node):
pats = []
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
watch = self.create_watch(pats, 60)
watch.setwatch()
self.CM.RemoveDummyRsc(node, self.rid)
self.set_timer("removeDummy")
watch.lookforall()
self.log_timer("removeDummy")
if watch.unmatched:
self.debug("Failed to find patterns when removing maintenance dummy resource")
return repr(watch.unmatched)
return ""
def managedRscList(self, node):
rscList = []
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self.CM, line)
if tmp.managed():
rscList.append(tmp.id)
return rscList
def verifyResources(self, node, rscList, managed):
managedList = list(rscList)
managed_str = "managed"
if not managed:
managed_str = "unmanaged"
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
tmp = AuditResource(self.CM, line)
if managed and not tmp.managed():
continue
elif not managed and tmp.managed():
continue
elif managedList.count(tmp.id):
managedList.remove(tmp.id)
if len(managedList) == 0:
self.debug("Found all %s resources on %s" % (managed_str, node))
return True
self.logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managedList))
return False
def __call__(self, node):
'''Perform the 'MaintenanceMode' test. '''
self.incr("calls")
verify_managed = False
verify_unmanaged = False
failPat = ""
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
# get a list of all the managed resources. We use this list
# after enabling maintenance mode to verify all managed resources
# become un-managed. After maintenance mode is turned off, we use
# this list to verify all the resources become managed again.
managedResources = self.managedRscList(node)
if len(managedResources) == 0:
self.logger.log("No managed resources on %s" % node)
return self.skipped()
# insert a fake resource we can fail during maintenance mode
# so we can verify recovery does not take place until after maintenance
# mode is disabled.
failPat = failPat + self.insertMaintenanceDummy(node)
# toggle maintenance mode ON, then fail dummy resource.
failPat = failPat + self.toggleMaintenanceMode(node, "On")
# verify all the resources are now unmanaged
if self.verifyResources(node, managedResources, False):
verify_unmanaged = True
# Toggle maintenance mode OFF, verify dummy is recovered.
failPat = failPat + self.toggleMaintenanceMode(node, "Off")
# verify all the resources are now managed again
if self.verifyResources(node, managedResources, True):
verify_managed = True
# Remove our maintenance dummy resource.
failPat = failPat + self.removeMaintenanceDummy(node)
self.CM.cluster_stable()
if failPat != "":
return self.failure("Unmatched patterns: %s" % (failPat))
elif verify_unmanaged is False:
return self.failure("Failed to verify resources became unmanaged during maintenance mode")
elif verify_managed is False:
return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [
r"Updating failcount for %s" % self.rid,
r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self.rid,
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self.action, self.rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval),
]
AllTestClasses.append(MaintenanceMode)
class ResourceRecover(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "ResourceRecover"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.max = 30
self.rid = None
self.rid_alt = None
#self.is_unsafe = 1
self.benchmark = 1
# these are the values used for the new LRM API call
self.action = "asyncmon"
self.interval = 0
def __call__(self, node):
'''Perform the 'ResourceRecover' test. '''
self.incr("calls")
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self.CM.active_resources(node)
if len(resourcelist) == 0:
self.logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self.choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self.rid)
if rsc.id == rsc.clone_id:
self.debug("Failing " + rsc.id)
else:
self.debug("Failing " + rsc.id + " (also known as " + rsc.clone_id + ")")
# Log patterns to watch for (failure, plus restart if managed)
pats = []
pats.append(self.templates["Pat:CloneOpFail"] % (self.action, rsc.id, rsc.clone_id))
if rsc.managed():
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
if rsc.unique():
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self.fail_resource(rsc, node, pats) is None:
return None # self.failure() already called
return self.success()
def choose_resource(self, node, resourcelist):
""" Choose a random resource to target """
- self.rid = self.Env.RandomGen.choice(resourcelist)
+ self.rid = self.Env.random_gen.choice(resourcelist)
self.rid_alt = self.rid
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self.CM, line)
if rsc.id == self.rid:
# Handle anonymous clones that get renamed
self.rid = rsc.clone_id
return rsc
return None
def get_failcount(self, node):
""" Check the fail count of targeted resource on given node """
(rc, lines) = self.rsh(node,
"crm_failcount --quiet --query --resource %s "
"--operation %s --interval %d "
"--node %s" % (self.rid, self.action,
self.interval, node), verbose=1)
if rc != 0 or len(lines) != 1:
self.logger.log("crm_failcount on %s failed (%d): %s" % (node, rc,
" // ".join(map(str.strip, lines))))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self.logger.log("crm_failcount output on %s unparseable: %s" % (node,
' '.join(lines)))
return -1
return failcount
def fail_resource(self, rsc, node, pats):
""" Fail the targeted resource, and verify as expected """
orig_failcount = self.get_failcount(node)
watch = self.create_watch(pats, 60)
watch.setwatch()
self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
self.set_timer("recover")
watch.lookforall()
self.log_timer("recover")
self.CM.cluster_stable()
recovered = self.CM.ResourceLocation(self.rid)
if watch.unmatched:
return self.failure("Patterns not found: %s" % repr(watch.unmatched))
elif rsc.unique() and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %s"%(self.rid, repr(recovered)))
elif len(recovered) > 0:
self.debug("%s is running on: %s" % (self.rid, repr(recovered)))
elif rsc.managed():
return self.failure("%s was not recovered and is inactive" % self.rid)
new_failcount = self.get_failcount(node)
if new_failcount != (orig_failcount + 1):
return self.failure("%s fail count is %d not %d" % (self.rid,
new_failcount, orig_failcount + 1))
return 0 # Anything but None is success
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [
r"Updating failcount for %s" % self.rid,
r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self.rid, self.rid_alt),
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self.action, self.rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval),
]
AllTestClasses.append(ResourceRecover)
class ComponentFail(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "ComponentFail"
self.startall = SimulStartLite(cm)
self.complist = cm.Components()
self.patterns = []
self.okerrpatterns = []
self.is_unsafe = 1
def __call__(self, node):
'''Perform the 'ComponentFail' test. '''
self.incr("calls")
self.patterns = []
self.okerrpatterns = []
# start all nodes
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
if not self.CM.cluster_stable(self.Env["StableTime"]):
return self.failure("Setup failed - unstable")
node_is_dc = self.CM.is_node_dc(node, None)
# select a component to kill
- chosen = self.Env.RandomGen.choice(self.complist)
+ chosen = self.Env.random_gen.choice(self.complist)
while chosen.dc_only == 1 and node_is_dc == 0:
- chosen = self.Env.RandomGen.choice(self.complist)
+ chosen = self.Env.random_gen.choice(self.complist)
self.debug("...component %s (dc=%d,boot=%d)" % (chosen.name, node_is_dc,chosen.triggersreboot))
self.incr(chosen.name)
if chosen.name != "corosync":
self.patterns.append(self.templates["Pat:ChildKilled"] %(node, chosen.name))
self.patterns.append(self.templates["Pat:ChildRespawn"] %(node, chosen.name))
self.patterns.extend(chosen.pats)
if node_is_dc:
self.patterns.extend(chosen.dc_pats)
# @TODO this should be a flag in the Component
if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]:
# Ignore actions for fence devices if fencer will respawn
# (their registration will be lost, and probes will fail)
self.okerrpatterns = [ self.templates["Pat:Fencing_active"] ]
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self.CM, line)
if r.rclass == "stonith":
self.okerrpatterns.append(self.templates["Pat:Fencing_recover"] % r.id)
self.okerrpatterns.append(self.templates["Pat:Fencing_probe"] % r.id)
# supply a copy so self.patterns doesn't end up empty
tmpPats = []
tmpPats.extend(self.patterns)
self.patterns.extend(chosen.badnews_ignore)
# Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
stonithPats = []
stonithPats.append(self.templates["Pat:Fencing_ok"] % node)
stonith = self.create_watch(stonithPats, 0)
stonith.setwatch()
# set the watch for stable
watch = self.create_watch(
tmpPats, self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"])
watch.setwatch()
# kill the component
chosen.kill(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
self.debug("Waiting for any fenced node to come back up")
self.CM.ns.WaitForAllNodesToComeUp(self.Env["nodes"], 600)
self.debug("Waiting for the cluster to re-stabilize with all nodes")
self.CM.cluster_stable(self.Env["StartTime"])
self.debug("Checking if %s was shot" % node)
shot = stonith.look(60)
if shot:
self.debug("Found: " + repr(shot))
self.okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
- if self.Env["at-boot"] == 0:
+ if not self.Env["at-boot"]:
self.CM.ShouldBeStatus[node] = "down"
# If fencing occurred, chances are many (if not all) the expected logs
# will not be sent - or will be lost when the node reboots
return self.success()
# check for logs indicating a graceful recovery
matched = watch.lookforall(allow_multiple_matches=1)
if watch.unmatched:
self.logger.log("Patterns not found: " + repr(watch.unmatched))
self.debug("Waiting for the cluster to re-stabilize with all nodes")
is_stable = self.CM.cluster_stable(self.Env["StartTime"])
if not matched:
return self.failure("Didn't find all expected %s patterns" % chosen.name)
elif not is_stable:
return self.failure("Cluster did not become stable after killing %s" % chosen.name)
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
# Note that okerrpatterns refers to the last time we ran this test
# The good news is that this works fine for us...
self.okerrpatterns.extend(self.patterns)
return self.okerrpatterns
AllTestClasses.append(ComponentFail)
class SplitBrainTest(CTSTest):
'''It is used to test split-brain. when the path between the two nodes break
check the two nodes both take over the resource'''
def __init__(self,cm):
CTSTest.__init__(self,cm)
self.name = "SplitBrain"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.is_experimental = 1
def isolate_partition(self, partition):
other_nodes = []
other_nodes.extend(self.Env["nodes"])
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"]) + " from " +repr(partition))
if len(other_nodes) == 0:
return 1
self.debug("Creating partition: " + repr(partition))
self.debug("Everyone else: " + repr(other_nodes))
for node in partition:
if not self.CM.isolate_node(node, other_nodes):
self.logger.log("Could not isolate %s" % node)
return 0
return 1
def heal_partition(self, partition):
other_nodes = []
other_nodes.extend(self.Env["nodes"])
for node in partition:
try:
other_nodes.remove(node)
except ValueError:
self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"]))
if len(other_nodes) == 0:
return 1
self.debug("Healing partition: " + repr(partition))
self.debug("Everyone else: " + repr(other_nodes))
for node in partition:
self.CM.unisolate_node(node, other_nodes)
def __call__(self, node):
'''Perform split-brain test'''
self.incr("calls")
self.passed = 1
partitions = {}
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
while 1:
# Retry until we get multiple partitions
partitions = {}
p_max = len(self.Env["nodes"])
for node in self.Env["nodes"]:
- p = self.Env.RandomGen.randint(1, p_max)
+ p = self.Env.random_gen.randint(1, p_max)
if not p in partitions:
partitions[p] = []
partitions[p].append(node)
p_max = len(list(partitions.keys()))
if p_max > 1:
break
# else, try again
self.debug("Created %d partitions" % p_max)
for key in list(partitions.keys()):
self.debug("Partition["+str(key)+"]:\t"+repr(partitions[key]))
# Disabling STONITH to reduce test complexity for now
self.rsh(node, "crm_attribute -V -n stonith-enabled -v false")
for key in list(partitions.keys()):
self.isolate_partition(partitions[key])
count = 30
while count > 0:
if len(self.CM.find_partitions()) != p_max:
time.sleep(10)
else:
break
else:
self.failure("Expected partitions were not created")
# Target number of partitions formed - wait for stability
if not self.CM.cluster_stable():
self.failure("Partitioned cluster not stable")
# Now audit the cluster state
self.CM.partitions_expected = p_max
if not self.audit():
self.failure("Audits failed")
self.CM.partitions_expected = 1
# And heal them again
for key in list(partitions.keys()):
self.heal_partition(partitions[key])
# Wait for a single partition to form
count = 30
while count > 0:
if len(self.CM.find_partitions()) != 1:
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not reform")
# Wait for it to have the right number of members
count = 30
while count > 0:
members = []
partitions = self.CM.find_partitions()
if len(partitions) > 0:
members = partitions[0].split()
if len(members) != len(self.Env["nodes"]):
time.sleep(10)
count -= 1
else:
break
else:
self.failure("Cluster did not completely reform")
# Wait up to 20 minutes - the delay is more preferable than
# trying to continue with in a messed up state
if not self.CM.cluster_stable(1200):
self.failure("Reformed cluster not stable")
- if self.Env["continue"] == 1:
+ if self.Env["continue"]:
answer = "Y"
else:
try:
answer = input('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Reformed cluster not stable")
# Turn fencing back on
if self.Env["DoFencing"]:
self.rsh(node, "crm_attribute -V -D -n stonith-enabled")
self.CM.cluster_stable()
if self.passed:
return self.success()
return self.failure("See previous errors")
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return [
r"Another DC detected:",
r"(ERROR|error).*: .*Application of an update diff failed",
r"pacemaker-controld.*:.*not in our membership list",
r"CRIT:.*node.*returning after partition",
]
def is_applicable(self):
if not self.is_applicable_common():
- return 0
+ return False
return len(self.Env["nodes"]) > 2
AllTestClasses.append(SplitBrainTest)
class Reattach(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "Reattach"
self.startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
self.is_unsafe = 0 # Handled by canrunnow()
def _is_managed(self, node):
(_, is_managed) = self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
is_managed = is_managed[0].strip()
return is_managed == "true"
def _set_unmanaged(self, node):
self.debug("Disable resource management")
self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
def _set_managed(self, node):
self.debug("Re-enable resource management")
self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
def setup(self, node):
attempt = 0
if not self.startall(None):
return None
# Make sure we are really _really_ stable and that all
# resources, including those that depend on transient node
# attributes, are started
while not self.CM.cluster_stable(double_check=True):
if attempt < 5:
attempt += 1
self.debug("Not stable yet, re-testing")
else:
self.logger.log("Cluster is not stable")
return None
return 1
def teardown(self, node):
# Make sure 'node' is up
start = StartTest(self.CM)
start(node)
if not self._is_managed(node):
self.logger.log("Attempting to re-enable resource management on %s" % node)
self._set_managed(node)
self.CM.cluster_stable()
if not self._is_managed(node):
self.logger.log("Could not re-enable resource management")
return 0
return 1
def canrunnow(self, node):
'''Return TRUE if we can meaningfully run right now'''
if self.find_ocfs2_resources(node):
self.logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present")
return 0
return 1
def __call__(self, node):
self.incr("calls")
pats = []
# Conveniently, the scheduler will display this message when disabling
# management, even if fencing is not enabled, so we can rely on it.
managed = self.create_watch(["No fencing will be done"], 60)
managed.setwatch()
self._set_unmanaged(node)
if not managed.lookforall():
self.logger.log("Patterns not found: " + repr(managed.unmatched))
return self.failure("Resource management not disabled")
pats = []
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("stop", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("promote", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("demote", ".*"))
pats.append(self.templates["Pat:RscOpOK"] % ("migrate", ".*"))
watch = self.create_watch(pats, 60, "ShutdownActivity")
watch.setwatch()
self.debug("Shutting down the cluster")
ret = self.stopall(None)
if not ret:
self._set_managed(node)
return self.failure("Couldn't shut down the cluster")
self.debug("Bringing the cluster back up")
ret = self.startall(None)
time.sleep(5) # allow ping to update the CIB
if not ret:
self._set_managed(node)
return self.failure("Couldn't restart the cluster")
if self.local_badnews("ResourceActivity:", watch):
self._set_managed(node)
return self.failure("Resources stopped or started during cluster restart")
watch = self.create_watch(pats, 60, "StartupActivity")
watch.setwatch()
# Re-enable resource management (and verify it happened).
self._set_managed(node)
self.CM.cluster_stable()
if not self._is_managed(node):
return self.failure("Could not re-enable resource management")
# Ignore actions for STONITH resources
ignore = []
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self.CM, line)
if r.rclass == "stonith":
self.debug("Ignoring start actions for %s" % r.id)
ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
if self.local_badnews("ResourceActivity:", watch, ignore):
return self.failure("Resources stopped or started after resource management was re-enabled")
return ret
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [
r"resource( was|s were) active at shutdown",
]
def is_applicable(self):
- return 1
+ return True
AllTestClasses.append(Reattach)
class SpecialTest1(CTSTest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SpecialTest1"
self.startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, node):
'''Perform the 'SpecialTest1' test for Andrew. '''
self.incr("calls")
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Could not stop all nodes")
# Test config recovery when the other nodes come up
self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*")
# Start the selected node
ret = self.restart1(node)
if not ret:
return self.failure("Could not start "+node)
# Start all remaining nodes
ret = self.startall(None)
if not ret:
return self.failure("Could not start the remaining nodes")
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
# Errors that occur as a result of the CIB being wiped
return [
r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
r"error.*: Resource start-up disabled since no STONITH resources have been defined",
r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity",
]
AllTestClasses.append(SpecialTest1)
class HAETest(CTSTest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "HAETest"
self.stopall = SimulStopLite(cm)
self.startall = SimulStartLite(cm)
self.is_loop = 1
def setup(self, node):
# Start all remaining nodes
ret = self.startall(None)
if not ret:
return self.failure("Couldn't start all nodes")
return self.success()
def teardown(self, node):
# Stop everything
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
return self.success()
def wait_on_state(self, node, resource, expected_clones, attempts=240):
while attempts > 0:
active = 0
(rc, lines) = self.rsh(node, "crm_resource -r %s -W -Q" % resource, verbose=1)
# Hack until crm_resource does the right thing
if rc == 0 and lines:
active = len(lines)
if len(lines) == expected_clones:
return 1
elif rc == 1:
self.debug("Resource %s is still inactive" % resource)
elif rc == 234:
self.logger.log("Unknown resource %s" % resource)
return 0
elif rc == 246:
self.logger.log("Cluster is inactive")
return 0
elif rc != 0:
self.logger.log("Call to crm_resource failed, rc=%d" % rc)
return 0
else:
self.debug("Resource %s is active on %d times instead of %d" % (resource, active, expected_clones))
attempts -= 1
time.sleep(1)
return 0
def find_dlm(self, node):
self.r_dlm = None
(_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
r = AuditResource(self.CM, line)
if r.rtype == "controld" and r.parent != "NA":
self.debug("Found dlm: %s" % self.r_dlm)
self.r_dlm = r.parent
return 1
return 0
def find_hae_resources(self, node):
self.r_dlm = None
self.r_o2cb = None
self.r_ocfs2 = []
if self.find_dlm(node):
self.find_ocfs2_resources(node)
def is_applicable(self):
if not self.is_applicable_common():
- return 0
+ return False
if self.Env["Schema"] == "hae":
- return 1
+ return True
return None
class HAERoleTest(HAETest):
def __init__(self, cm):
'''Lars' mount/unmount test for the HA extension. '''
HAETest.__init__(self,cm)
self.name = "HAERoleTest"
def change_state(self, node, resource, target):
(rc, _) = self.rsh(node, "crm_resource -V -r %s -p target-role -v %s --meta" % (resource, target))
return rc
def __call__(self, node):
self.incr("calls")
lpc = 0
failed = 0
delay = 2
done = time.time() + self.Env["loop-minutes"]*60
self.find_hae_resources(node)
clone_max = len(self.Env["nodes"])
while time.time() <= done and not failed:
lpc = lpc + 1
self.change_state(node, self.r_dlm, "Stopped")
if not self.wait_on_state(node, self.r_dlm, 0):
self.failure("%s did not go down correctly" % self.r_dlm)
failed = lpc
self.change_state(node, self.r_dlm, "Started")
if not self.wait_on_state(node, self.r_dlm, clone_max):
self.failure("%s did not come up correctly" % self.r_dlm)
failed = lpc
if not self.wait_on_state(node, self.r_o2cb, clone_max):
self.failure("%s did not come up correctly" % self.r_o2cb)
failed = lpc
for fs in self.r_ocfs2:
if not self.wait_on_state(node, fs, clone_max):
self.failure("%s did not come up correctly" % fs)
failed = lpc
if failed:
return self.failure("iteration %d failed" % failed)
return self.success()
AllTestClasses.append(HAERoleTest)
class HAEStandbyTest(HAETest):
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
HAETest.__init__(self,cm)
self.name = "HAEStandbyTest"
def change_state(self, node, resource, target):
(rc, _) = self.rsh(node, "crm_standby -V -l reboot -v %s" % (target))
return rc
def __call__(self, node):
self.incr("calls")
lpc = 0
failed = 0
done = time.time() + self.Env["loop-minutes"]*60
self.find_hae_resources(node)
clone_max = len(self.Env["nodes"])
while time.time() <= done and not failed:
lpc = lpc + 1
self.change_state(node, self.r_dlm, "true")
if not self.wait_on_state(node, self.r_dlm, clone_max-1):
self.failure("%s did not go down correctly" % self.r_dlm)
failed = lpc
self.change_state(node, self.r_dlm, "false")
if not self.wait_on_state(node, self.r_dlm, clone_max):
self.failure("%s did not come up correctly" % self.r_dlm)
failed = lpc
if not self.wait_on_state(node, self.r_o2cb, clone_max):
self.failure("%s did not come up correctly" % self.r_o2cb)
failed = lpc
for fs in self.r_ocfs2:
if not self.wait_on_state(node, fs, clone_max):
self.failure("%s did not come up correctly" % fs)
failed = lpc
if failed:
return self.failure("iteration %d failed" % failed)
return self.success()
AllTestClasses.append(HAEStandbyTest)
class NearQuorumPointTest(CTSTest):
'''
This test brings larger clusters near the quorum point (50%).
In addition, it will test doing starts and stops at the same time.
Here is how I think it should work:
- loop over the nodes and decide randomly which will be up and which
will be down Use a 50% probability for each of up/down.
- figure out what to do to get into that state from the current state
- in parallel, bring up those going up and bring those going down.
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "NearQuorumPoint"
def __call__(self, dummy):
'''Perform the 'NearQuorumPoint' test. '''
self.incr("calls")
startset = []
stopset = []
stonith = self.CM.prepare_fencing_watcher("NearQuorumPoint")
#decide what to do with each node
for node in self.Env["nodes"]:
- action = self.Env.RandomGen.choice(["start","stop"])
- #action = self.Env.RandomGen.choice(["start","stop","no change"])
+ action = self.Env.random_gen.choice(["start","stop"])
+ #action = self.Env.random_gen.choice(["start","stop","no change"])
if action == "start" :
startset.append(node)
elif action == "stop" :
stopset.append(node)
self.debug("start nodes:" + repr(startset))
self.debug("stop nodes:" + repr(stopset))
#add search patterns
watchpats = [ ]
for node in stopset:
if self.CM.ShouldBeStatus[node] == "up":
watchpats.append(self.templates["Pat:We_stopped"] % node)
for node in startset:
if self.CM.ShouldBeStatus[node] == "down":
#watchpats.append(self.templates["Pat:NonDC_started"] % node)
watchpats.append(self.templates["Pat:Local_started"] % node)
else:
for stopping in stopset:
if self.CM.ShouldBeStatus[stopping] == "up":
watchpats.append(self.templates["Pat:They_stopped"] % (node, self.CM.key_for_node(stopping)))
if len(watchpats) == 0:
return self.skipped()
if len(startset) != 0:
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
watch.setwatch()
#begin actions
for node in stopset:
if self.CM.ShouldBeStatus[node] == "up":
self.CM.StopaCMnoBlock(node)
for node in startset:
if self.CM.ShouldBeStatus[node] == "down":
self.CM.StartaCMnoBlock(node)
#get the result
if watch.lookforall():
self.CM.cluster_stable()
self.CM.fencing_cleanup("NearQuorumPoint", stonith)
return self.success()
self.logger.log("Warn: Patterns not found: " + repr(watch.unmatched))
#get the "bad" nodes
upnodes = []
for node in stopset:
if self.CM.StataCM(node) == 1:
upnodes.append(node)
downnodes = []
for node in startset:
if self.CM.StataCM(node) == 0:
downnodes.append(node)
self.CM.fencing_cleanup("NearQuorumPoint", stonith)
if upnodes == [] and downnodes == []:
self.CM.cluster_stable()
# Make sure they're completely down with no residule
for node in stopset:
self.rsh(node, self.templates["StopCmd"])
return self.success()
if len(upnodes) > 0:
self.logger.log("Warn: Unstoppable nodes: " + repr(upnodes))
if len(downnodes) > 0:
self.logger.log("Warn: Unstartable nodes: " + repr(downnodes))
return self.failure()
def is_applicable(self):
- return 1
+ return True
AllTestClasses.append(NearQuorumPointTest)
class RollingUpgradeTest(CTSTest):
'''Perform a rolling upgrade of the cluster'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RollingUpgrade"
self.start = StartTest(cm)
self.stop = StopTest(cm)
self.stopall = SimulStopLite(cm)
self.startall = SimulStartLite(cm)
def setup(self, node):
# Start all remaining nodes
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
for node in self.Env["nodes"]:
if not self.downgrade(node, None):
return self.failure("Couldn't downgrade %s" % node)
ret = self.startall(None)
if not ret:
return self.failure("Couldn't start all nodes")
return self.success()
def teardown(self, node):
# Stop everything
ret = self.stopall(None)
if not ret:
return self.failure("Couldn't stop all nodes")
for node in self.Env["nodes"]:
if not self.upgrade(node, None):
return self.failure("Couldn't upgrade %s" % node)
return self.success()
def install(self, node, version, start=1, flags="--force"):
target_dir = "/tmp/rpm-%s" % version
src_dir = "%s/%s" % (self.Env["rpm-dir"], version)
self.logger.log("Installing %s on %s with %s" % (version, node, flags))
if not self.stop(node):
return self.failure("stop failure: "+node)
self.rsh(node, "mkdir -p %s" % target_dir)
self.rsh(node, "rm -f %s/*.rpm" % target_dir)
(_, lines) = self.rsh(node, "ls -1 %s/*.rpm" % src_dir, verbose=1)
for line in lines:
line = line[:-1]
rc = self.rsh.copy("%s" % (line), "%s:%s/" % (node, target_dir))
self.rsh(node, "rpm -Uvh %s %s/*.rpm" % (flags, target_dir))
if start and not self.start(node):
return self.failure("start failure: "+node)
return self.success()
def upgrade(self, node, start=1):
return self.install(node, self.Env["current-version"], start)
def downgrade(self, node, start=1):
return self.install(node, self.Env["previous-version"], start, "--force --nodeps")
def __call__(self, node):
'''Perform the 'Rolling Upgrade' test. '''
self.incr("calls")
for node in self.Env["nodes"]:
if self.upgrade(node):
return self.failure("Couldn't upgrade %s" % node)
self.CM.cluster_stable()
return self.success()
def is_applicable(self):
if not self.is_applicable_common():
return None
if not "rpm-dir" in list(self.Env.keys()):
return None
if not "current-version" in list(self.Env.keys()):
return None
if not "previous-version" in list(self.Env.keys()):
return None
return 1
# Register RestartTest as a good test to run
AllTestClasses.append(RollingUpgradeTest)
class BSC_AddResource(CTSTest):
'''Add a resource to the cluster'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "AddResource"
self.resource_offset = 0
self.cib_cmd = """cibadmin -C -o %s -X '%s' """
def __call__(self, node):
self.incr("calls")
self.resource_offset = self.resource_offset + 1
r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset)
start_pat = "pacemaker-controld.*%s_start_0.*confirmed.*ok"
patterns = []
patterns.append(start_pat % r_id)
watch = self.create_watch(patterns, self.Env["DeadTime"])
watch.setwatch()
ip = self.NextIP()
if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip):
return self.failure("Make resource %s failed" % r_id)
failed = 0
watch_result = watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Pattern not found: %s" % (regex))
failed = 1
if failed:
return self.failure("Resource pattern(s) not found")
if not self.CM.cluster_stable(self.Env["DeadTime"]):
return self.failure("Unstable cluster")
return self.success()
def NextIP(self):
ip = self.Env["IPBase"]
if ":" in ip:
fields = ip.rpartition(":")
fields[2] = str(hex(int(fields[2], 16)+1))
print(str(hex(int(f[2], 16)+1)))
else:
fields = ip.rpartition('.')
fields[2] = str(int(fields[2])+1)
ip = fields[0] + fields[1] + fields[3];
self.Env["IPBase"] = ip
return ip.strip()
def make_ip_resource(self, node, id, rclass, type, ip):
self.logger.log("Creating %s:%s:%s (%s) on %s" % (rclass,type,id,ip,node))
rsc_xml="""
<primitive id="%s" class="%s" type="%s" provider="heartbeat">
<instance_attributes id="%s"><attributes>
<nvpair id="%s" name="ip" value="%s"/>
</attributes></instance_attributes>
</primitive>""" % (id, rclass, type, id, id, ip)
node_constraint = """
<rsc_location id="run_%s" rsc="%s">
<rule id="pref_run_%s" score="100">
<expression id="%s_loc_expr" attribute="#uname" operation="eq" value="%s"/>
</rule>
</rsc_location>""" % (id, id, id, id, node)
rc = 0
(rc, _) = self.rsh(node, self.cib_cmd % ("constraints", node_constraint), verbose=1)
if rc != 0:
self.logger.log("Constraint creation failed: %d" % rc)
return None
(rc, _) = self.rsh(node, self.cib_cmd % ("resources", rsc_xml), verbose=1)
if rc != 0:
self.logger.log("Resource creation failed: %d" % rc)
return None
return 1
def is_applicable(self):
if self.Env["DoBSC"]:
- return 1
+ return True
return None
AllTestClasses.append(BSC_AddResource)
class SimulStopLite(CTSTest):
'''Stop any active nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStopLite"
def __call__(self, dummy):
'''Perform the 'SimulStopLite' setup work. '''
self.incr("calls")
self.debug("Setup: " + self.name)
# We ignore the "node" parameter...
watchpats = [ ]
for node in self.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == "up":
self.incr("WasStarted")
watchpats.append(self.templates["Pat:We_stopped"] % node)
if len(watchpats) == 0:
return self.success()
# Stop all the nodes - at about the same time...
watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
watch.setwatch()
self.set_timer()
for node in self.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == "up":
self.CM.StopaCMnoBlock(node)
if watch.lookforall():
# Make sure they're completely down with no residule
for node in self.Env["nodes"]:
self.rsh(node, self.templates["StopCmd"])
return self.success()
did_fail = 0
up_nodes = []
for node in self.Env["nodes"]:
if self.CM.StataCM(node) == 1:
did_fail = 1
up_nodes.append(node)
if did_fail:
return self.failure("Active nodes exist: " + repr(up_nodes))
self.logger.log("Warn: All nodes stopped but CTS didn't detect: "
+ repr(watch.unmatched))
return self.failure("Missing log message: "+repr(watch.unmatched))
def is_applicable(self):
'''SimulStopLite is a setup test and never applicable'''
- return 0
+ return False
class SimulStartLite(CTSTest):
'''Start any stopped nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "SimulStartLite"
def __call__(self, dummy):
'''Perform the 'SimulStartList' setup work. '''
self.incr("calls")
self.debug("Setup: " + self.name)
# We ignore the "node" parameter...
node_list = []
for node in self.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == "down":
self.incr("WasStopped")
node_list.append(node)
self.set_timer()
while len(node_list) > 0:
# Repeat until all nodes come up
watchpats = [ ]
uppat = self.templates["Pat:NonDC_started"]
if self.CM.upcount() == 0:
uppat = self.templates["Pat:Local_started"]
watchpats.append(self.templates["Pat:DC_IDLE"])
for node in node_list:
watchpats.append(uppat % node)
watchpats.append(self.templates["Pat:InfraUp"] % node)
watchpats.append(self.templates["Pat:PacemakerUp"] % node)
# Start all the nodes - at about the same time...
watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
watch.setwatch()
stonith = self.CM.prepare_fencing_watcher(self.name)
for node in node_list:
self.CM.StartaCMnoBlock(node)
watch.lookforall()
node_list = self.CM.fencing_cleanup(self.name, stonith)
if node_list == None:
return self.failure("Cluster did not stabilize")
# Remove node_list messages from watch.unmatched
for node in node_list:
self.logger.debug("Dealing with stonith operations for %s" % repr(node_list))
if watch.unmatched:
try:
watch.unmatched.remove(uppat % node)
except:
self.debug("Already matched: %s" % (uppat % node))
try:
watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
except:
self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
try:
watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
except:
self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" %(regex))
if not self.CM.cluster_stable():
return self.failure("Cluster did not stabilize")
did_fail = 0
unstable = []
for node in self.Env["nodes"]:
if self.CM.StataCM(node) == 0:
did_fail = 1
unstable.append(node)
if did_fail:
return self.failure("Unstarted nodes exist: " + repr(unstable))
unstable = []
for node in self.Env["nodes"]:
if not self.CM.node_stable(node):
did_fail = 1
unstable.append(node)
if did_fail:
return self.failure("Unstable cluster nodes exist: " + repr(unstable))
return self.success()
def is_applicable(self):
'''SimulStartLite is a setup test and never applicable'''
- return 0
+ return False
def TestList(cm, audits):
result = []
for testclass in AllTestClasses:
bound_test = testclass(cm)
if bound_test.is_applicable():
bound_test.Audits = audits
result.append(bound_test)
return result
class RemoteLXC(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = "RemoteLXC"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.num_containers = 2
self.is_container = 1
self.failed = 0
self.fail_string = ""
def start_lxc_simple(self, node):
# restore any artifacts laying around from a previous test.
self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
# generate the containers, put them in the config, add some resources to them
pats = [ ]
watch = self.create_watch(pats, 120)
watch.setwatch()
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc1"))
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc2"))
pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc-ms"))
pats.append(self.templates["Pat:RscOpOK"] % ("promote", "lxc-ms"))
self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -g -a -m -s -c %d &>/dev/null" % self.num_containers)
self.set_timer("remoteSimpleInit")
watch.lookforall()
self.log_timer("remoteSimpleInit")
if watch.unmatched:
self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
self.failed = 1
def cleanup_lxc_simple(self, node):
pats = [ ]
# if the test failed, attempt to clean up the cib and libvirt environment
# as best as possible
if self.failed == 1:
# restore libvirt and cib
self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
return
watch = self.create_watch(pats, 120)
watch.setwatch()
pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container1"))
pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container2"))
self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -p &>/dev/null")
self.set_timer("remoteSimpleCleanup")
watch.lookforall()
self.log_timer("remoteSimpleCleanup")
if watch.unmatched:
self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
self.failed = 1
# cleanup libvirt
self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
def __call__(self, node):
'''Perform the 'RemoteLXC' test. '''
self.incr("calls")
ret = self.startall(None)
if not ret:
return self.failure("Setup failed, start all nodes failed.")
(rc, _) = self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -v &>/dev/null")
if rc == 1:
self.log("Environment test for lxc support failed.")
return self.skipped()
self.start_lxc_simple(node)
self.cleanup_lxc_simple(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
if self.failed == 1:
return self.failure(self.fail_string)
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [
r"Updating failcount for ping",
r"schedulerd.*: Recover\s+(ping|lxc-ms|container)\s+\(.*\)",
# The orphaned lxc-ms resource causes an expected transition error
# that is a result of the scheduler not having knowledge that the
# promotable resource used to be a clone. As a result, it looks like that
# resource is running in multiple locations when it shouldn't... But in
# this instance we know why this error is occurring and that it is expected.
r"Calculated [Tt]ransition .*pe-error",
r"Resource lxc-ms .* is active on 2 nodes attempting recovery",
r"Unknown operation: fail",
r"VirtualDomain.*ERROR: Unable to determine emulator",
]
AllTestClasses.append(RemoteLXC)
class RemoteDriver(CTSTest):
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name = self.__class__.__name__
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.stop = StopTest(cm)
self.remote_rsc = "remote-rsc"
self.cib_cmd = """cibadmin -C -o %s -X '%s' """
self.reset()
def reset(self):
self.pcmk_started = 0
self.failed = False
self.fail_string = ""
self.remote_node_added = 0
self.remote_rsc_added = 0
- self.remote_use_reconnect_interval = self.Env.RandomGen.choice([True,False])
+ self.remote_use_reconnect_interval = self.Env.random_gen.choice([True,False])
def fail(self, msg):
""" Mark test as failed. """
self.failed = True
# Always log the failure.
self.logger.log(msg)
# Use first failure as test status, as it's likely to be most useful.
if not self.fail_string:
self.fail_string = msg
def get_othernode(self, node):
for othernode in self.Env["nodes"]:
if othernode == node:
# we don't want to try and use the cib that we just shutdown.
# find a cluster node that is not our soon to be remote-node.
continue
else:
return othernode
def del_rsc(self, node, rsc):
othernode = self.get_othernode(node)
(rc, _) = self.rsh(othernode, "crm_resource -D -r %s -t primitive" % (rsc))
if rc != 0:
self.fail("Removal of resource '%s' failed" % rsc)
def add_rsc(self, node, rsc_xml):
othernode = self.get_othernode(node)
(rc, _) = self.rsh(othernode, self.cib_cmd % ("resources", rsc_xml))
if rc != 0:
self.fail("resource creation failed")
def add_primitive_rsc(self, node):
rsc_xml = """
<primitive class="ocf" id="%(node)s" provider="heartbeat" type="Dummy">
<meta_attributes id="%(node)s-meta_attributes"/>
<operations>
<op id="%(node)s-monitor-interval-20s" interval="20s" name="monitor"/>
</operations>
</primitive>""" % { "node": self.remote_rsc }
self.add_rsc(node, rsc_xml)
if not self.failed:
self.remote_rsc_added = 1
def add_connection_rsc(self, node):
rsc_xml = """
<primitive class="ocf" id="%(node)s" provider="pacemaker" type="remote">
<instance_attributes id="%(node)s-instance_attributes">
<nvpair id="%(node)s-instance_attributes-server" name="server" value="%(server)s"/>
""" % { "node": self.remote_node, "server": node }
if self.remote_use_reconnect_interval:
# Set reconnect interval on resource
rsc_xml = rsc_xml + """
<nvpair id="%s-instance_attributes-reconnect_interval" name="reconnect_interval" value="60s"/>
""" % (self.remote_node)
rsc_xml = rsc_xml + """
</instance_attributes>
<operations>
<op id="%(node)s-start" name="start" interval="0" timeout="120s"/>
<op id="%(node)s-monitor-20s" name="monitor" interval="20s" timeout="45s"/>
</operations>
</primitive>
""" % { "node": self.remote_node }
self.add_rsc(node, rsc_xml)
if not self.failed:
self.remote_node_added = 1
def disable_services(self, node):
self.corosync_enabled = self.Env.service_is_enabled(node, "corosync")
if self.corosync_enabled:
self.Env.disable_service(node, "corosync")
self.pacemaker_enabled = self.Env.service_is_enabled(node, "pacemaker")
if self.pacemaker_enabled:
self.Env.disable_service(node, "pacemaker")
def restore_services(self, node):
if self.corosync_enabled:
self.Env.enable_service(node, "corosync")
if self.pacemaker_enabled:
self.Env.enable_service(node, "pacemaker")
def stop_pcmk_remote(self, node):
# disable pcmk remote
for i in range(10):
(rc, _) = self.rsh(node, "service pacemaker_remote stop")
if rc != 0:
time.sleep(6)
else:
break
def start_pcmk_remote(self, node):
for i in range(10):
(rc, _) = self.rsh(node, "service pacemaker_remote start")
if rc != 0:
time.sleep(6)
else:
self.pcmk_started = 1
break
def freeze_pcmk_remote(self, node):
""" Simulate a Pacemaker Remote daemon failure. """
# We freeze the process.
self.rsh(node, "killall -STOP pacemaker-remoted")
def resume_pcmk_remote(self, node):
# We resume the process.
self.rsh(node, "killall -CONT pacemaker-remoted")
def start_metal(self, node):
# Cluster nodes are reused as remote nodes in remote tests. If cluster
# services were enabled at boot, in case the remote node got fenced, the
# cluster node would join instead of the expected remote one. Meanwhile
# pacemaker_remote would not be able to start. Depending on the chances,
# the situations might not be able to be orchestrated gracefully any more.
#
# Temporarily disable any enabled cluster serivces.
self.disable_services(node)
pcmk_started = 0
# make sure the resource doesn't already exist for some reason
self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_rsc))
self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_node))
if not self.stop(node):
self.fail("Failed to shutdown cluster node %s" % node)
return
self.start_pcmk_remote(node)
if self.pcmk_started == 0:
self.fail("Failed to start pacemaker_remote on node %s" % node)
return
# Convert node to baremetal now that it has shutdown the cluster stack
pats = [ ]
watch = self.create_watch(pats, 120)
watch.setwatch()
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node))
pats.append(self.templates["Pat:DC_IDLE"])
self.add_connection_rsc(node)
self.set_timer("remoteMetalInit")
watch.lookforall()
self.log_timer("remoteMetalInit")
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def migrate_connection(self, node):
if self.failed:
return
pats = [ ]
pats.append(self.templates["Pat:RscOpOK"] % ("migrate_to", self.remote_node))
pats.append(self.templates["Pat:RscOpOK"] % ("migrate_from", self.remote_node))
pats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(pats, 120)
watch.setwatch()
(rc, _) = self.rsh(node, "crm_resource -M -r %s" % (self.remote_node), verbose=1)
if rc != 0:
self.fail("failed to move remote node connection resource")
return
self.set_timer("remoteMetalMigrate")
watch.lookforall()
self.log_timer("remoteMetalMigrate")
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
return
def fail_rsc(self, node):
if self.failed:
return
watchpats = [ ]
watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("stop", self.remote_rsc, self.remote_node))
watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node))
watchpats.append(self.templates["Pat:DC_IDLE"])
watch = self.create_watch(watchpats, 120)
watch.setwatch()
self.debug("causing dummy rsc to fail.")
self.rsh(node, "rm -f /var/run/resource-agents/Dummy*")
self.set_timer("remoteRscFail")
watch.lookforall()
self.log_timer("remoteRscFail")
if watch.unmatched:
self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched)
def fail_connection(self, node):
if self.failed:
return
watchpats = [ ]
watchpats.append(self.templates["Pat:Fencing_ok"] % self.remote_node)
watchpats.append(self.templates["Pat:NodeFenced"] % self.remote_node)
watch = self.create_watch(watchpats, 120)
watch.setwatch()
# freeze the pcmk remote daemon. this will result in fencing
self.debug("Force stopped active remote node")
self.freeze_pcmk_remote(node)
self.debug("Waiting for remote node to be fenced.")
self.set_timer("remoteMetalFence")
watch.lookforall()
self.log_timer("remoteMetalFence")
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
return
self.debug("Waiting for the remote node to come back up")
self.CM.ns.WaitForNodeToComeUp(node, 120);
pats = [ ]
watch = self.create_watch(pats, 240)
watch.setwatch()
pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node))
if self.remote_rsc_added == 1:
pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node))
# start the remote node again watch it integrate back into cluster.
self.start_pcmk_remote(node)
if self.pcmk_started == 0:
self.fail("Failed to start pacemaker_remote on node %s" % node)
return
self.debug("Waiting for remote node to rejoin cluster after being fenced.")
self.set_timer("remoteMetalRestart")
watch.lookforall()
self.log_timer("remoteMetalRestart")
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
return
def add_dummy_rsc(self, node):
if self.failed:
return
# verify we can put a resource on the remote node
pats = [ ]
watch = self.create_watch(pats, 120)
watch.setwatch()
pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node))
pats.append(self.templates["Pat:DC_IDLE"])
# Add a resource that must live on remote-node
self.add_primitive_rsc(node)
# force that rsc to prefer the remote node.
(rc, _) = self.CM.rsh(node, "crm_resource -M -r %s -N %s -f" % (self.remote_rsc, self.remote_node), verbose=1)
if rc != 0:
self.fail("Failed to place remote resource on remote node.")
return
self.set_timer("remoteMetalRsc")
watch.lookforall()
self.log_timer("remoteMetalRsc")
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
def test_attributes(self, node):
if self.failed:
return
# This verifies permanent attributes can be set on a remote-node. It also
# verifies the remote-node can edit its own cib node section remotely.
(rc, line) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % (self.remote_node), verbose=1)
if rc != 0:
self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line))
return
(rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % (self.remote_node), verbose=1)
if rc != 0:
self.fail("Failed to get remote-node attribute")
return
(rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % (self.remote_node), verbose=1)
if rc != 0:
self.fail("Failed to delete remote-node attribute")
return
def cleanup_metal(self, node):
self.restore_services(node)
if self.pcmk_started == 0:
return
pats = [ ]
watch = self.create_watch(pats, 120)
watch.setwatch()
if self.remote_rsc_added == 1:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_rsc))
if self.remote_node_added == 1:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_node))
self.set_timer("remoteMetalCleanup")
self.resume_pcmk_remote(node)
if self.remote_rsc_added == 1:
# Remove dummy resource added for remote node tests
self.debug("Cleaning up dummy rsc put on remote node")
self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % self.remote_rsc)
self.del_rsc(node, self.remote_rsc)
if self.remote_node_added == 1:
# Remove remote node's connection resource
self.debug("Cleaning up remote node connection resource")
self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % (self.remote_node))
self.del_rsc(node, self.remote_node)
watch.lookforall()
self.log_timer("remoteMetalCleanup")
if watch.unmatched:
self.fail("Unmatched patterns: %s" % watch.unmatched)
self.stop_pcmk_remote(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
if self.remote_node_added == 1:
# Remove remote node itself
self.debug("Cleaning up node entry for remote node")
self.rsh(self.get_othernode(node), "crm_node --force --remove %s" % self.remote_node)
def setup_env(self, node):
self.remote_node = "remote-%s" % (node)
# we are assuming if all nodes have a key, that it is
# the right key... If any node doesn't have a remote
# key, we regenerate it everywhere.
if self.rsh.exists_on_all("/etc/pacemaker/authkey", self.Env["nodes"]):
return
# create key locally
(handle, keyfile) = tempfile.mkstemp(".cts")
os.close(handle)
subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# sync key throughout the cluster
for node in self.Env["nodes"]:
self.rsh(node, "mkdir -p --mode=0750 /etc/pacemaker")
self.rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % node)
self.rsh(node, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey")
self.rsh(node, "chmod 0640 /etc/pacemaker/authkey")
os.unlink(keyfile)
def is_applicable(self):
if not self.is_applicable_common():
return False
for node in self.Env["nodes"]:
(rc, _) = self.rsh(node, "which pacemaker-remoted >/dev/null 2>&1")
if rc != 0:
return False
return True
def start_new_test(self, node):
self.incr("calls")
self.reset()
ret = self.startall(None)
if not ret:
return self.failure("setup failed: could not start all nodes")
self.setup_env(node)
self.start_metal(node)
self.add_dummy_rsc(node)
return True
def __call__(self, node):
return self.failure("This base class is not meant to be called directly.")
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [ r"""is running on remote.*which isn't allowed""",
r"""Connection terminated""",
r"""Could not send remote""",
]
# RemoteDriver is just a base class for other tests, so it is not added to AllTestClasses
class RemoteBasic(RemoteDriver):
def __call__(self, node):
'''Perform the 'RemoteBaremetal' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.test_attributes(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
AllTestClasses.append(RemoteBasic)
class RemoteStonithd(RemoteDriver):
def __call__(self, node):
'''Perform the 'RemoteStonithd' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.fail_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return False
if "DoFencing" in list(self.Env.keys()):
return self.Env["DoFencing"]
return True
def errorstoignore(self):
ignore_pats = [
r"Lost connection to Pacemaker Remote node",
r"Software caused connection abort",
r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
r"error: Result of monitor operation for .* on remote-.*: Internal communication failure",
]
ignore_pats.extend(RemoteDriver.errorstoignore(self))
return ignore_pats
AllTestClasses.append(RemoteStonithd)
class RemoteMigrate(RemoteDriver):
def __call__(self, node):
'''Perform the 'RemoteMigrate' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
self.migrate_connection(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return 0
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
if len(self.Env["nodes"]) < 3:
return 0
return 1
AllTestClasses.append(RemoteMigrate)
class RemoteRscFailure(RemoteDriver):
def __call__(self, node):
'''Perform the 'RemoteRscFailure' test. '''
if not self.start_new_test(node):
return self.failure(self.fail_string)
# This is an important step. We are migrating the connection
# before failing the resource. This verifies that the migration
# has properly maintained control over the remote-node.
self.migrate_connection(node)
self.fail_rsc(node)
self.cleanup_metal(node)
self.debug("Waiting for the cluster to recover")
self.CM.cluster_stable()
if self.failed:
return self.failure(self.fail_string)
return self.success()
def errorstoignore(self):
ignore_pats = [
r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
r"Dummy.*: No process state file found",
]
ignore_pats.extend(RemoteDriver.errorstoignore(self))
return ignore_pats
def is_applicable(self):
if not RemoteDriver.is_applicable(self):
return 0
# This test requires at least three nodes: one to convert to a
# remote node, one to host the connection originally, and one
# to migrate the connection to.
if len(self.Env["nodes"]) < 3:
return 0
return 1
AllTestClasses.append(RemoteRscFailure)
# vim:ts=4:sw=4:et:
diff --git a/cts/lab/ClusterManager.py b/cts/lab/ClusterManager.py
index 8cf84cf9e9..f26998c774 100644
--- a/cts/lab/ClusterManager.py
+++ b/cts/lab/ClusterManager.py
@@ -1,1085 +1,942 @@
""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
Certain portions by Huang Zhen <zhenhltc@cn.ibm.com> are copyright 2004
International Business Machines. The version control history for this file
may have further details."""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import time
from collections import UserDict
from cts.CIB import ConfigFactory
from cts.CTS import NodeStatus, Process
from cts.CTStests import AuditResource
from cts.watcher import LogWatcher
-from cts.environment import EnvFactory
from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
-has_log_stats = {}
-log_stats_bin = BuildOptions.DAEMON_DIR + "/cts_log_stats.sh"
-log_stats = """
-#!%s
-# Tool for generating system load reports while CTS runs
-
-trap "" 1
-
-f=$1; shift
-action=$1; shift
-base=`basename $0`
-
-if [ ! -e $f ]; then
- echo "Time, Load 1, Load 5, Load 15, Test Marker" > $f
-fi
-
-function killpid() {
- if [ -e $f.pid ]; then
- kill -9 `cat $f.pid`
- rm -f $f.pid
- fi
-}
-
-function status() {
- if [ -e $f.pid ]; then
- kill -0 `cat $f.pid`
- return $?
- else
- return 1
- fi
-}
-
-function start() {
- # Is it already running?
- if
- status
- then
- return
- fi
-
- echo Active as $$
- echo $$ > $f.pid
-
- while [ 1 = 1 ]; do
- uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
- #top -b -c -n1 | grep -e usr/libexec/pacemaker | grep -v -e grep -e python | head -n 1 | sed s@/usr/libexec/pacemaker/@@ | awk '{print " 0, "$9", "$10", "$12}' | tr '\\n' ',' >> $f
- echo 0 >> $f
- sleep 5
- done
-}
-
-case $action in
- start)
- start
- ;;
- start-bg|bg)
- # Use c --ssh -- ./stats.sh file start-bg
- nohup $0 $f start >/dev/null 2>&1 </dev/null &
- ;;
- stop)
- killpid
- ;;
- delete)
- killpid
- rm -f $f
- ;;
- mark)
- uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
- echo " $*" >> $f
- start
- ;;
- *)
- echo "Unknown action: $action."
- ;;
-esac
-""" % (BuildOptions.BASH_PATH)
-
class ClusterManager(UserDict):
'''The Cluster Manager class.
This is an subclass of the Python dictionary class.
(this is because it contains lots of {name,value} pairs,
not because it's behavior is that terribly similar to a
dictionary in other ways.)
This is an abstract class which class implements high-level
operations on the cluster and/or its cluster managers.
Actual cluster managers classes are subclassed from this type.
One of the things we do is track the state we think every node should
be in.
'''
def __InitialConditions(self):
#if os.geteuid() != 0:
# raise ValueError("Must Be Root!")
None
def _finalConditions(self):
for key in list(self.keys()):
if self[key] == None:
raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.")
def __init__(self):
self.Env = EnvFactory().getInstance()
self.templates = PatternSelector(self.Env["Name"])
self.__InitialConditions()
self.logger = LogFactory()
self.TestLoggingLevel=0
self.data = {}
self.name = self.Env["Name"]
self.rsh = RemoteFactory().getInstance()
self.ShouldBeStatus={}
self.ns = NodeStatus(self.Env)
self.OurNode = os.uname()[1].lower()
self.__instance_errorstoignore = []
self.fastfail = 0
self.cib_installed = 0
self.config = None
self.cluster_monitor = 0
self.use_short_names = 1
if self.Env["DoBSC"]:
del self.templates["Pat:They_stopped"]
self._finalConditions()
self.check_transitions = 0
self.check_elections = 0
self.CIBsync = {}
self.CibFactory = ConfigFactory(self)
self.cib = self.CibFactory.createConfig(self.Env["Schema"])
def __getitem__(self, key):
if key == "Name":
return self.name
print("FIXME: Getting %s from %s" % (key, repr(self)))
if key in self.data:
return self.data[key]
return self.templates.get_patterns(key)
def __setitem__(self, key, value):
print("FIXME: Setting %s=%s on %s" % (key, value, repr(self)))
self.data[key] = value
def key_for_node(self, node):
return node
def instance_errorstoignore_clear(self):
'''Allows the test scenario to reset instance errors to ignore on each iteration.'''
self.__instance_errorstoignore = []
def instance_errorstoignore(self):
'''Return list of errors which are 'normal' for a specific test instance'''
return self.__instance_errorstoignore
def log(self, args):
self.logger.log(args)
def debug(self, args):
self.logger.debug(args)
def upcount(self):
'''How many nodes are up?'''
count = 0
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
count = count + 1
return count
def install_support(self, command="install"):
for node in self.Env["nodes"]:
self.rsh(node, BuildOptions.DAEMON_DIR + "/cts-support " + command)
def prepare_fencing_watcher(self, name):
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
upnode = None
if self.HasQuorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonithPats = []
for peer in self.Env["nodes"]:
if self.ShouldBeStatus[peer] != "up":
stonithPats.append(self.templates["Pat:Fencing_ok"] % peer)
stonithPats.append(self.templates["Pat:Fencing_start"] % peer)
stonith = LogWatcher(self.Env["LogFileName"], stonithPats, "StartupFencing", 0, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
stonith.setwatch()
return stonith
def fencing_cleanup(self, node, stonith):
peer_list = []
peer_state = {}
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.HasQuorum(None)
if not q and len(self.Env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"])))
return peer_list
for n in self.Env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug("looking for: " + repr(stonith.regexes))
shot = stonith.look(0)
while shot:
line = repr(shot)
self.debug("Found: " + line)
del stonith.regexes[stonith.whichmatch]
# Extract node name
for n in self.Env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self.logger.log("ERROR: Unknown stonith match: %s" % line)
elif not peer in peer_list:
self.debug("Found peer: " + peer)
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
if self.Env["at-boot"]:
self.ShouldBeStatus[peer] = "up"
else:
self.ShouldBeStatus[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while len(stonith.regexes) and shot:
line = repr(shot)
self.debug("Found: " + line)
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
self.ns.WaitForNodeToComeUp(peer, self.Env["DeadTime"])
# Poll until it comes up
if self.Env["at-boot"]:
if not self.StataCM(peer):
time.sleep(self.Env["StartTime"])
if not self.StataCM(peer):
self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
return None
return peer_list
def StartaCM(self, node, verbose=False):
'''Start up the cluster manager on a given node'''
if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
else: self.debug("Starting %s on node %s" % (self.templates["Name"], node))
ret = 1
if not node in self.ShouldBeStatus:
self.ShouldBeStatus[node] = "down"
if self.ShouldBeStatus[node] != "down":
return 1
patterns = []
# Technically we should always be able to notice ourselves starting
patterns.append(self.templates["Pat:Local_started"] % node)
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(
self.Env["LogFileName"], patterns, "StartaCM", self.Env["StartTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
self.install_config(node)
self.ShouldBeStatus[node] = "any"
if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
self.logger.log ("%s was already started" % (node))
return 1
stonith = self.prepare_fencing_watcher(node)
watch.setwatch()
(rc, _) = self.rsh(node, self.templates["StartCmd"])
if rc != 0:
self.logger.log ("Warn: Start command failed on node %s" % (node))
self.fencing_cleanup(node, stonith)
return None
self.ShouldBeStatus[node] = "up"
watch_result = watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
if watch_result and self.cluster_stable(self.Env["DeadTime"]):
#self.debug("Found match: "+ repr(watch_result))
self.fencing_cleanup(node, stonith)
return 1
elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return 1
self.logger.log ("Warn: Start failed for node %s" % (node))
return None
def StartaCMnoBlock(self, node, verbose=False):
'''Start up the cluster manager on a given node with none-block mode'''
if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node))
else: self.debug("Starting %s on node %s" % (self["Name"], node))
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=False)
self.ShouldBeStatus[node] = "up"
return 1
def StopaCM(self, node, verbose=False, force=False):
'''Stop the cluster manager on a given node'''
if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node))
else: self.debug("Stopping %s on node %s" % (self["Name"], node))
if self.ShouldBeStatus[node] != "up" and force == False:
return 1
(rc, _) = self.rsh(node, self.templates["StopCmd"])
if rc == 0:
# Make sure we can continue even if corosync leaks
# fdata-* is the old name
#self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*")
self.ShouldBeStatus[node] = "down"
self.cluster_stable(self.Env["DeadTime"])
return 1
else:
self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
return None
def StopaCMnoBlock(self, node):
'''Stop the cluster manager on a given node with none-block mode'''
self.debug("Stopping %s on node %s" % (self["Name"], node))
self.rsh(node, self.templates["StopCmd"], synchronous=False)
self.ShouldBeStatus[node] = "down"
return 1
def RereadCM(self, node):
'''Force the cluster manager on a given node to reread its config
This may be a no-op on certain cluster managers.
'''
(rc, _) = self.rsh(node, self.templates["RereadCmd"])
if rc == 0:
return 1
else:
self.logger.log ("Could not force %s on node %s to reread its config"
% (self["Name"], node))
return None
def startall(self, nodelist=None, verbose=False, quick=False):
'''Start the cluster manager on every node in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
map = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.ShouldBeStatus[node] == "down":
self.ns.WaitForAllNodesToComeUp(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
if not self.StartaCM(node, verbose=verbose):
return 0
return 1
# Approximation of SimulStartList for --boot
watchpats = [ ]
watchpats.append(self.templates["Pat:DC_IDLE"])
for node in nodelist:
watchpats.append(self.templates["Pat:InfraUp"] % node)
watchpats.append(self.templates["Pat:PacemakerUp"] % node)
watchpats.append(self.templates["Pat:Local_started"] % node)
watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node))
# Start all the nodes - at about the same time...
watch = LogWatcher(self.Env["LogFileName"], watchpats, "fast-start", self.Env["DeadTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
watch.setwatch()
if not self.StartaCM(nodelist[0], verbose=verbose):
return 0
for node in nodelist:
self.StartaCMnoBlock(node, verbose=verbose)
watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
if not self.cluster_stable():
self.logger.log("Cluster did not stabilize")
return 0
return 1
def stopall(self, nodelist=None, verbose=False, force=False):
'''Stop the cluster managers on every node in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
ret = 1
map = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up" or force == True:
if not self.StopaCM(node, verbose=verbose, force=force):
ret = 0
return ret
def rereadall(self, nodelist=None):
'''Force the cluster managers on every node in the cluster
to reread their config files. We can do it on a subset of the
cluster if nodelist is not None.
'''
map = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
self.RereadCM(node)
def statall(self, nodelist=None):
'''Return the status of the cluster managers in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
result = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.StataCM(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
'''isolate the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node != target:
rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
if rc != 0:
self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
return None
else:
self.debug("Communication cut between %s and %s" % (target, node))
return 1
def unisolate_node(self, target, nodes=None):
'''fix the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node != target:
restored = 0
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False)
self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False)
self.debug("Communication restored between %s and %s" % (target, node))
- def reducecomm_node(self,node):
- '''reduce the communication between the nodes'''
- (rc, _) = self.rsh(node, self.templates["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"]))
- if rc == 0:
- return 1
- else:
- self.logger.log("Could not reduce the communication between the nodes from node: %s" % node)
- return None
-
- def restorecomm_node(self,node):
- '''restore the saved communication between the nodes'''
- rc = 0
- if float(self.Env["XmitLoss"]) != 0 or float(self.Env["RecvLoss"]) != 0 :
- (rc, _) = self.rsh(node, self.templates["RestoreCommCmd"])
- if rc == 0:
- return 1
- else:
- self.logger.log("Could not restore the communication between the nodes from node: %s" % node)
- return None
-
def oprofileStart(self, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofileStart(n)
elif node in self.Env["oprofile"]:
self.debug("Enabling oprofile on %s" % node)
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
def oprofileSave(self, test, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofileSave(test, n)
elif node in self.Env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, "opcontrol --save=cts.%d" % test)
# Read back with: opreport -l session:cts.0 image:<directory>/c*
if None:
self.rsh(node, "opcontrol --reset")
else:
self.oprofileStop(node)
self.oprofileStart(node)
def oprofileStop(self, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofileStop(n)
elif node in self.Env["oprofile"]:
self.debug("Stopping oprofile on %s" % node)
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
-
- def StatsExtract(self):
- if not self.Env["stats"]:
- return
-
- for host in self.Env["nodes"]:
- log_stats_file = "%s/cts-stats.csv" % BuildOptions.DAEMON_DIR
- if host in has_log_stats:
- self.rsh(host, '''bash %s %s stop''' % (log_stats_bin, log_stats_file))
- (_, lines) = self.rsh(host, '''cat %s''' % log_stats_file, verbose=1)
- self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
-
- fname = "cts-stats-%d-nodes-%s.csv" % (len(self.Env["nodes"]), host)
- print("Extracted stats: %s" % fname)
- fd = open(fname, "a")
- fd.writelines(lines)
- fd.close()
-
- def StatsMark(self, testnum):
- '''Mark the test number in the stats log'''
-
- global has_log_stats
- if not self.Env["stats"]:
- return
-
- for host in self.Env["nodes"]:
- log_stats_file = "%s/cts-stats.csv" % BuildOptions.DAEMON_DIR
- if not host in has_log_stats:
-
- global log_stats
- global log_stats_bin
- script=log_stats
- #script = re.sub("\\\\", "\\\\", script)
- script = re.sub('\"', '\\\"', script)
- script = re.sub("'", "\'", script)
- script = re.sub("`", "\`", script)
- script = re.sub("\$", "\\\$", script)
-
- self.debug("Installing %s on %s" % (log_stats_bin, host))
- self.rsh(host, '''echo "%s" > %s''' % (script, log_stats_bin), verbose=0)
- self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
- has_log_stats[host] = 1
-
- # Now mark it
- self.rsh(host, '''bash %s %s mark %s''' % (log_stats_bin, log_stats_file, testnum), synchronous=False)
-
def errorstoignore(self):
# At some point implement a more elegant solution that
# also produces a report at the end
""" Return a list of known error messages that should be ignored """
return self.templates.get_patterns("BadNewsIgnore")
def install_config(self, node):
if not self.ns.WaitForNodeToComeUp(node):
self.log("Node %s is not up." % node)
return None
- if not node in self.CIBsync and self.Env["ClobberCIB"] == 1:
+ if not node in self.CIBsync and self.Env["ClobberCIB"]:
self.CIBsync[node] = 1
self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*")
# Only install the CIB on the first node, all the other ones will pick it up from there
if self.cib_installed == 1:
return None
self.cib_installed = 1
if self.Env["CIBfilename"] == None:
self.log("Installing Generated CIB on node %s" % (node))
self.cib.install(node)
else:
self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node))
if self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) != 0:
raise ValueError("Can not scp file to %s %d"%(node))
self.rsh(node, "chown " + BuildOptions.DAEMON_USER + " " + BuildOptions.CIB_DIR + "/cib.xml")
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
self.partitions_expected = 1
for node in self.Env["nodes"]:
self.ShouldBeStatus[node] = ""
if self.Env["experimental-tests"]:
self.unisolate_node(node)
self.StataCM(node)
def test_node_CM(self, node):
'''Report the status of the cluster manager on a given node'''
watchpats = [ ]
watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)")
watchpats.append(self.templates["Pat:NonDC_started"] % node)
watchpats.append(self.templates["Pat:DC_started"] % node)
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, "ClusterIdle", hosts=[node], kind=self.Env["LogWatcher"])
idle_watch.setwatch()
(_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1)
if not out:
out = ""
else:
out = out[0].strip()
self.debug("Node %s status: '%s'" %(node, out))
if out.find('ok') < 0:
if self.ShouldBeStatus[node] == "up":
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, "down", self.ShouldBeStatus[node]))
self.ShouldBeStatus[node] = "down"
return 0
if self.ShouldBeStatus[node] == "down":
self.log(
"Node status for %s is %s but we think it should be %s: %s"
% (node, "up", self.ShouldBeStatus[node], out))
self.ShouldBeStatus[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" % (node, out))
return 1
# Up and stable
return 2
# Is the node up or is the node down
def StataCM(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_CM(node) > 0:
return 1
return None
# Being up and being stable is not the same question...
def node_stable(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_CM(node) == 2:
return 1
self.log("Warn: Node %s not stable" % (node))
return None
def partition_stable(self, nodes, timeout=None):
watchpats = [ ]
watchpats.append("Current ping state: S_IDLE")
watchpats.append(self.templates["Pat:DC_IDLE"])
self.debug("Waiting for cluster stability...")
if timeout == None:
timeout = self.Env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return 1
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, "ClusterStable", timeout, hosts=nodes.split(), kind=self.Env["LogWatcher"])
idle_watch.setwatch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return 1
ret = idle_watch.look()
self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout))
return None
def cluster_stable(self, timeout=None, double_check=False):
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return None
if double_check:
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return None
return 1
def is_node_dc(self, node, status_line=None):
rc = 0
if not status_line:
(_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1)
if out:
status_line = out[0].strip()
if not status_line:
rc = 0
elif status_line.find('S_IDLE') != -1:
rc = 1
elif status_line.find('S_INTEGRATION') != -1:
rc = 1
elif status_line.find('S_FINALIZE_JOIN') != -1:
rc = 1
elif status_line.find('S_POLICY_ENGINE') != -1:
rc = 1
elif status_line.find('S_TRANSITION_ENGINE') != -1:
rc = 1
return rc
def active_resources(self, node):
(_, output) = self.rsh(node, "crm_resource -c", verbose=1)
resources = []
for line in output:
if re.search("^Resource", line):
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
def ResourceLocation(self, rid):
ResourceNodes = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
cmd = self.templates["RscRunning"] % (rid)
(rc, lines) = self.rsh(node, cmd)
if rc == 127:
self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
for line in lines:
self.log("Output: "+line)
elif rc == 0:
ResourceNodes.append(node)
return ResourceNodes
def find_partitions(self):
ccm_partitions = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
(_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
if not out:
self.log("no partition details for %s" % node)
continue
partition = out[0].strip()
if len(partition) > 2:
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" % (node, partition))
ccm_partitions.append(partition)
else:
self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
else:
self.log("bad partition details for %s" % node)
else:
self.debug("Node %s is down... skipping" % node)
self.debug("Found partitions: %s" % repr(ccm_partitions) )
return ccm_partitions
def HasQuorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.Env["nodes"]
for node in node_list:
if self.ShouldBeStatus[node] == "up":
(_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
quorum = quorum[0].strip()
if quorum.find("1") != -1:
return 1
elif quorum.find("0") != -1:
return 0
else:
self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum)
return 0
def Components(self):
complist = []
common_ignore = [
"Pending action:",
"(ERROR|error): crm_log_message_adv:",
"(ERROR|error): MSG: No message to dump",
"pending LRM operations at shutdown",
"Lost connection to the CIB manager",
"Connection to the CIB terminated...",
"Sending message to the CIB manager FAILED",
"Action A_RECOVER .* not supported",
"(ERROR|error): stonithd_op_result_ready: not signed on",
"pingd.*(ERROR|error): send_update: Could not send update",
"send_ipc_message: IPC Channel to .* is not connected",
"unconfirmed_actions: Waiting on .* unconfirmed actions",
"cib_native_msgready: Message pending on command channel",
r": Performing A_EXIT_1 - forcefully exiting ",
r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.",
]
stonith_ignore = [
r"Updating failcount for child_DoFencing",
r"error.*: Fencer connection failed \(will retry\)",
"pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.",
]
stonith_ignore.extend(common_ignore)
ccm = Process(self, "ccm", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
"pacemaker-controld.*Action A_RECOVER .* not supported",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
"pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
r"attrd.*exited with status 1",
r"cib.*exited with status 2",
# Not if it was fenced
# "A new node joined the cluster",
# "WARN: determine_online_status: Node .* is unclean",
# "Scheduling node .* for fencing",
# "Executing .* fencing operation",
# "tengine_stonith_callback: .*result=0",
# "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE",
# "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN",
"State transition S_STARTING -> S_PENDING",
], badnews_ignore = common_ignore)
based = Process(self, "pacemaker-based", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
"Lost connection to the CIB manager",
"Connection to the CIB manager terminated",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
"pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
r"pacemaker-controld.*: Could not recover from internal error",
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
r"attrd.*exited with status 1",
], badnews_ignore = common_ignore)
execd = Process(self, "pacemaker-execd", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
"LRM Connection failed",
"pacemaker-controld.*I_ERROR.*lrm_connection_destroy",
"State transition S_STARTING -> S_PENDING",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
# this status number is likely wrong now
r"pacemaker-controld.*exited with status 2",
], badnews_ignore = common_ignore)
controld = Process(self, "pacemaker-controld", triggersreboot=self.fastfail,
pats = [
# "WARN: determine_online_status: Node .* is unclean",
# "Scheduling node .* for fencing",
# "Executing .* fencing operation",
# "tengine_stonith_callback: .*result=0",
"State transition .* S_IDLE",
"State transition S_STARTING -> S_PENDING",
], badnews_ignore = common_ignore)
schedulerd = Process(self, "pacemaker-schedulerd", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed",
"pacemaker-controld.*I_ERROR.*save_cib_contents",
# this status number is likely wrong now
r"pacemaker-controld.*exited with status 2",
], badnews_ignore = common_ignore, dc_only=1)
- if self.Env["DoFencing"] == 1 :
+ if self.Env["DoFencing"]:
complist.append(Process(self, "stoniths", triggersreboot=self.fastfail, dc_pats = [
r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed",
"Attempting connection to fencing daemon",
], badnews_ignore = stonith_ignore))
if self.fastfail == 0:
ccm.pats.extend([
# these status numbers are likely wrong now
r"attrd.*exited with status 1",
r"pacemaker-(based|controld).*exited with status 2",
])
based.pats.extend([
# these status numbers are likely wrong now
r"attrd.*exited with status 1",
r"pacemaker-controld.*exited with status 2",
])
execd.pats.extend([
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
])
complist.append(ccm)
complist.append(based)
complist.append(execd)
complist.append(controld)
complist.append(schedulerd)
return complist
def StandbyStatus(self, node):
(_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
if not out:
return "off"
out = out[0].strip()
self.debug("Standby result: "+out)
return out
# status == "on" : Enter Standby mode
# status == "off": Enter Active mode
def SetStandbyMode(self, node, status):
current_status = self.StandbyStatus(node)
cmd = self.templates["StandbyCmd"] % (node, status)
self.rsh(node, cmd)
return True
def AddDummyRsc(self, node, rid):
rsc_xml = """ '<resources>
<primitive class=\"ocf\" id=\"%s\" provider=\"pacemaker\" type=\"Dummy\">
<operations>
<op id=\"%s-interval-10s\" interval=\"10s\" name=\"monitor\"/
</operations>
</primitive>
</resources>'""" % (rid, rid)
constraint_xml = """ '<constraints>
<rsc_location id=\"location-%s-%s\" node=\"%s\" rsc=\"%s\" score=\"INFINITY\"/>
</constraints>'
""" % (rid, node, node, rid)
self.rsh(node, self.templates['CibAddXml'] % (rsc_xml))
self.rsh(node, self.templates['CibAddXml'] % (constraint_xml))
def RemoveDummyRsc(self, node, rid):
constraint = "\"//rsc_location[@rsc='%s']\"" % (rid)
rsc = "\"//primitive[@id='%s']\"" % (rid)
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/cts/lab/Makefile.am b/cts/lab/Makefile.am
index 7d824172fc..0591f1044f 100644
--- a/cts/lab/Makefile.am
+++ b/cts/lab/Makefile.am
@@ -1,34 +1,33 @@
#
# Copyright 2001-2023 the Pacemaker project contributors
#
# The version control history for this file may have further details.
#
# This source code is licensed under the GNU General Public License version 2
# or later (GPLv2+) WITHOUT ANY WARRANTY.
#
MAINTAINERCLEANFILES = Makefile.in
noinst_SCRIPTS = cluster_test \
OCFIPraTest.py
# Commands intended to be run only via other commands
halibdir = $(CRM_DAEMON_DIR)
dist_halib_SCRIPTS = cts-log-watcher
ctslibdir = $(pythondir)/cts
ctslib_PYTHON = __init__.py \
CIB.py \
cib_xml.py \
ClusterManager.py \
CM_corosync.py \
CTS.py \
CTSaudits.py \
CTSscenarios.py \
CTStests.py \
- environment.py \
watcher.py
ctsdir = $(datadir)/$(PACKAGE)/tests/cts
cts_SCRIPTS = CTSlab.py \
cts
diff --git a/cts/lab/__init__.py b/cts/lab/__init__.py
index 890355a193..abed502d1e 100644
--- a/cts/lab/__init__.py
+++ b/cts/lab/__init__.py
@@ -1,16 +1,15 @@
"""Python modules for Pacemaker's Cluster Test Suite (CTS)
This package provides the following modules:
CIB
cib_xml
CM_common
CM_corosync
CTSaudits
CTS
CTSscenarios
CTStests
-environment
patterns
watcher
"""
diff --git a/cts/lab/environment.py b/cts/lab/environment.py
deleted file mode 100644
index 7b1c3f6d5f..0000000000
--- a/cts/lab/environment.py
+++ /dev/null
@@ -1,648 +0,0 @@
-""" Test environment classes for Pacemaker's Cluster Test Suite (CTS)
-"""
-
-__copyright__ = "Copyright 2014-2023 the Pacemaker project contributors"
-__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
-
-import sys, time, os, socket, random
-
-from pacemaker._cts.logging import LogFactory
-from pacemaker._cts.remote import RemoteFactory
-
-class Environment(object):
-
- def __init__(self, args):
- self.data = {}
- self.Nodes = []
-
- self["DeadTime"] = 300
- self["StartTime"] = 300
- self["StableTime"] = 30
- self["tests"] = []
- self["IPagent"] = "IPaddr2"
- self["DoStandby"] = 1
- self["DoFencing"] = 1
- self["XmitLoss"] = "0.0"
- self["RecvLoss"] = "0.0"
- self["ClobberCIB"] = 0
- self["CIBfilename"] = None
- self["CIBResource"] = 0
- self["DoBSC"] = 0
- self["oprofile"] = []
- self["warn-inactive"] = 0
- self["ListTests"] = 0
- self["benchmark"] = 0
- self["LogWatcher"] = "any"
- self["SyslogFacility"] = "daemon"
- self["LogFileName"] = "/var/log/messages"
- self["Schema"] = "pacemaker-3.0"
- self["Stack"] = "corosync"
- self["stonith-type"] = "external/ssh"
- self["stonith-params"] = "hostlist=all,livedangerously=yes"
- self["notification-agent"] = "/var/lib/pacemaker/notify.sh"
- self["notification-recipient"] = "/var/lib/pacemaker/notify.log"
- self["loop-minutes"] = 60
- self["valgrind-procs"] = "pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd"
- self["experimental-tests"] = 0
- self["container-tests"] = 0
- self["valgrind-tests"] = 0
- self["unsafe-tests"] = 1
- self["loop-tests"] = 1
- self["scenario"] = "random"
- self["stats"] = 0
- self["continue"] = 0
-
- self.RandomGen = random.Random()
- self.logger = LogFactory()
-
- self.SeedRandom()
- self.rsh = RemoteFactory().getInstance()
-
- self.target = "localhost"
-
- self.parse_args(args)
- if self["ListTests"] == 0:
- self.validate()
- self.discover()
-
- def SeedRandom(self, seed=None):
- if not seed:
- seed = int(time.time())
-
- self["RandSeed"] = seed
- self.RandomGen.seed(str(seed))
-
- def dump(self):
- keys = []
- for key in list(self.data.keys()):
- keys.append(key)
-
- keys.sort()
- for key in keys:
- self.logger.debug("Environment["+key+"]:\t"+str(self[key]))
-
- def keys(self):
- return list(self.data.keys())
-
- def has_key(self, key):
- if key == "nodes":
- return True
-
- return key in self.data
-
- def __getitem__(self, key):
- if str(key) == "0":
- raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead")
-
- if key == "nodes":
- return self.Nodes
-
- elif key == "Name":
- return self.get_stack_short()
-
- elif key in self.data:
- return self.data[key]
-
- else:
- return None
-
- def __setitem__(self, key, value):
- if key == "Stack":
- self.set_stack(value)
-
- elif key == "node-limit":
- self.data[key] = value
- self.filter_nodes()
-
- elif key == "nodes":
- self.Nodes = []
- for node in value:
- # I don't think I need the IP address, etc. but this validates
- # the node name against /etc/hosts and/or DNS, so it's a
- # GoodThing(tm).
- try:
- n = node.strip()
- socket.gethostbyname_ex(n)
- self.Nodes.append(n)
- except:
- self.logger.log(node+" not found in DNS... aborting")
- raise
-
- self.filter_nodes()
-
- else:
- self.data[key] = value
-
- def RandomNode(self):
- '''Choose a random node from the cluster'''
- return self.RandomGen.choice(self["nodes"])
-
- def set_stack(self, name):
- # Normalize stack names
- if name == "corosync" or name == "cs" or name == "mcp":
- self.data["Stack"] = "corosync 2+"
-
- else:
- raise ValueError("Unknown stack: "+name)
-
- def get_stack_short(self):
- # Create the Cluster Manager object
- if not "Stack" in self.data:
- return "unknown"
-
- elif self.data["Stack"] == "corosync 2+":
- return "crm-corosync"
-
- else:
- LogFactory().log("Unknown stack: "+self["stack"])
- raise ValueError("Unknown stack: "+self["stack"])
-
- def detect_syslog(self):
- # Detect syslog variant
- if not "syslogd" in self.data:
- if self["have_systemd"]:
- # Systemd
- (_, lines) = self.rsh(self.target, "systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
- self["syslogd"] = lines[0].strip()
- else:
- # SYS-V
- (_, lines) = self.rsh(self.target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
- self["syslogd"] = lines[0].strip()
-
- if not "syslogd" in self.data or not self["syslogd"]:
- # default
- self["syslogd"] = "rsyslog"
-
- def disable_service(self, node, service):
- if self["have_systemd"]:
- # Systemd
- (rc, _) = self.rsh(node, "systemctl disable %s" % service)
- return rc
-
- else:
- # SYS-V
- (rc, _) = self.rsh(node, "chkconfig %s off" % service)
- return rc
-
- def enable_service(self, node, service):
- if self["have_systemd"]:
- # Systemd
- (rc, _) = self.rsh(node, "systemctl enable %s" % service)
- return rc
-
- else:
- # SYS-V
- (rc, _) = self.rsh(node, "chkconfig %s on" % service)
- return rc
-
- def service_is_enabled(self, node, service):
- if self["have_systemd"]:
- # Systemd
-
- # With "systemctl is-enabled", we should check if the service is
- # explicitly "enabled" instead of the return code. For example it returns
- # 0 if the service is "static" or "indirect", but they don't really count
- # as "enabled".
- (rc, _) = self.rsh(node, "systemctl is-enabled %s | grep enabled" % service)
- return rc == 0
-
- else:
- # SYS-V
- (rc, _) = self.rsh(node, "chkconfig --list | grep -e %s.*on" % service)
- return rc == 0
-
- def detect_at_boot(self):
- # Detect if the cluster starts at boot
- if not "at-boot" in self.data:
- self["at-boot"] = self.service_is_enabled(self.target, "corosync") \
- or self.service_is_enabled(self.target, "pacemaker")
-
- def detect_ip_offset(self):
- # Try to determine an offset for IPaddr resources
- if self["CIBResource"] and not "IPBase" in self.data:
- (_, lines) = self.rsh(self.target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
- network = lines[0].strip()
-
- (_, lines) = self.rsh(self.target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
- self["IPBase"] = lines[0].strip()
-
- if not self["IPBase"]:
- self["IPBase"] = " fe80::1234:56:7890:1000"
- self.logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.")
- self.logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
-
- elif int(self["IPBase"].split('.')[3]) >= 240:
- self.logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
- % (self["IPBase"], self["IPBase"].split('.')[3]))
- self["IPBase"] = " fe80::1234:56:7890:1000"
- self.logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
-
- def filter_nodes(self):
- if self['node-limit'] is not None and self["node-limit"] > 0:
- if len(self["nodes"]) > self["node-limit"]:
- self.logger.log("Limiting the number of nodes configured=%d (max=%d)"
- %(len(self["nodes"]), self["node-limit"]))
- while len(self["nodes"]) > self["node-limit"]:
- self["nodes"].pop(len(self["nodes"])-1)
-
- def validate(self):
- if len(self["nodes"]) < 1:
- print("No nodes specified!")
- sys.exit(1)
-
- def discover(self):
- self.target = random.Random().choice(self["nodes"])
-
- exerciser = socket.gethostname()
-
- # Use the IP where possible to avoid name lookup failures
- for ip in socket.gethostbyname_ex(exerciser)[2]:
- if ip != "127.0.0.1":
- exerciser = ip
- break;
- self["cts-exerciser"] = exerciser
-
- if not "have_systemd" in self.data:
- (rc, _) = self.rsh(self.target, "systemctl list-units", verbose=0)
- self["have_systemd"] = rc == 0
-
- self.detect_syslog()
- self.detect_at_boot()
- self.detect_ip_offset()
-
- def parse_args(self, args):
- skipthis=None
-
- if not args:
- args=sys.argv[1:]
-
- for i in range(0, len(args)):
- if skipthis:
- skipthis=None
- continue
-
- elif args[i] == "-l" or args[i] == "--limit-nodes":
- skipthis=1
- self["node-limit"] = int(args[i+1])
-
- elif args[i] == "-r" or args[i] == "--populate-resources":
- self["CIBResource"] = 1
- self["ClobberCIB"] = 1
-
- elif args[i] == "--outputfile":
- skipthis=1
- self["OutputFile"] = args[i+1]
- LogFactory().add_file(self["OutputFile"])
-
- elif args[i] == "-L" or args[i] == "--logfile":
- skipthis=1
- self["LogWatcher"] = "remote"
- self["LogAuditDisabled"] = 1
- self["LogFileName"] = args[i+1]
-
- elif args[i] == "--ip" or args[i] == "--test-ip-base":
- skipthis=1
- self["IPBase"] = args[i+1]
- self["CIBResource"] = 1
- self["ClobberCIB"] = 1
-
- elif args[i] == "--oprofile":
- skipthis=1
- self["oprofile"] = args[i+1].split(' ')
-
- elif args[i] == "--trunc":
- self["TruncateLog"]=1
-
- elif args[i] == "--list-tests" or args[i] == "--list" :
- self["ListTests"]=1
-
- elif args[i] == "--benchmark":
- self["benchmark"]=1
-
- elif args[i] == "--bsc":
- self["DoBSC"] = 1
- self["scenario"] = "basic-sanity"
-
- elif args[i] == "--qarsh":
- RemoteFactory().enable_qarsh()
-
- elif args[i] == "--yes" or args[i] == "-y":
- self["continue"] = 1
- elif args[i] == "--stonith" or args[i] == "--fencing":
- skipthis=1
- if args[i+1] == "1" or args[i+1] == "yes":
- self["DoFencing"]=1
- elif args[i+1] == "0" or args[i+1] == "no":
- self["DoFencing"]=0
- elif args[i+1] == "rhcs" or args[i+1] == "xvm" or args[i+1] == "virt":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_xvm"
- elif args[i+1] == "scsi":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_scsi"
- elif args[i+1] == "ssh" or args[i+1] == "lha":
- self["DoStonith"]=1
- self["stonith-type"] = "external/ssh"
- self["stonith-params"] = "hostlist=all,livedangerously=yes"
- elif args[i+1] == "north":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_apc"
- self["stonith-params"] = "ipaddr=north-apc,login=apc,passwd=apc,pcmk_host_map=north-01:2;north-02:3;north-03:4;north-04:5;north-05:6;north-06:7;north-07:9;north-08:10;north-09:11;north-10:12;north-11:13;north-12:14;north-13:15;north-14:18;north-15:17;north-16:19;"
- elif args[i+1] == "south":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_apc"
- self["stonith-params"] = "ipaddr=south-apc,login=apc,passwd=apc,pcmk_host_map=south-01:2;south-02:3;south-03:4;south-04:5;south-05:6;south-06:7;south-07:9;south-08:10;south-09:11;south-10:12;south-11:13;south-12:14;south-13:15;south-14:18;south-15:17;south-16:19;"
- elif args[i+1] == "east":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_apc"
- self["stonith-params"] = "ipaddr=east-apc,login=apc,passwd=apc,pcmk_host_map=east-01:2;east-02:3;east-03:4;east-04:5;east-05:6;east-06:7;east-07:9;east-08:10;east-09:11;east-10:12;east-11:13;east-12:14;east-13:15;east-14:18;east-15:17;east-16:19;"
- elif args[i+1] == "west":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_apc"
- self["stonith-params"] = "ipaddr=west-apc,login=apc,passwd=apc,pcmk_host_map=west-01:2;west-02:3;west-03:4;west-04:5;west-05:6;west-06:7;west-07:9;west-08:10;west-09:11;west-10:12;west-11:13;west-12:14;west-13:15;west-14:18;west-15:17;west-16:19;"
- elif args[i+1] == "openstack":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_openstack"
-
- print("Obtaining OpenStack credentials from the current environment")
- self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % (
- os.environ['OS_REGION_NAME'],
- os.environ['OS_TENANT_NAME'],
- os.environ['OS_AUTH_URL'],
- os.environ['OS_USERNAME'],
- os.environ['OS_PASSWORD']
- )
-
- elif args[i+1] == "rhevm":
- self["DoStonith"]=1
- self["stonith-type"] = "fence_rhevm"
-
- print("Obtaining RHEV-M credentials from the current environment")
- self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
- os.environ['RHEVM_USERNAME'],
- os.environ['RHEVM_PASSWORD'],
- os.environ['RHEVM_SERVER'],
- os.environ['RHEVM_PORT'],
- )
-
- else:
- self.usage(args[i+1])
-
- elif args[i] == "--stonith-type":
- self["stonith-type"] = args[i+1]
- skipthis=1
-
- elif args[i] == "--stonith-args":
- self["stonith-params"] = args[i+1]
- skipthis=1
-
- elif args[i] == "--standby":
- skipthis=1
- if args[i+1] == "1" or args[i+1] == "yes":
- self["DoStandby"] = 1
- elif args[i+1] == "0" or args[i+1] == "no":
- self["DoStandby"] = 0
- else:
- self.usage(args[i+1])
-
- elif args[i] == "--clobber-cib" or args[i] == "-c":
- self["ClobberCIB"] = 1
-
- elif args[i] == "--cib-filename":
- skipthis=1
- self["CIBfilename"] = args[i+1]
-
- elif args[i] == "--xmit-loss":
- try:
- float(args[i+1])
- except ValueError:
- print("--xmit-loss parameter should be float")
- self.usage(args[i+1])
- skipthis=1
- self["XmitLoss"] = args[i+1]
-
- elif args[i] == "--recv-loss":
- try:
- float(args[i+1])
- except ValueError:
- print("--recv-loss parameter should be float")
- self.usage(args[i+1])
- skipthis=1
- self["RecvLoss"] = args[i+1]
-
- elif args[i] == "--choose":
- skipthis=1
- self["tests"].append(args[i+1])
- self["scenario"] = "sequence"
-
- elif args[i] == "--nodes":
- skipthis=1
- self["nodes"] = args[i+1].split(' ')
-
- elif args[i] == "-g" or args[i] == "--group" or args[i] == "--dsh-group":
- skipthis=1
- self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args[i+1])
- LogFactory().add_file(self["OutputFile"], "CTS")
-
- dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args[i+1])
-
- # Hacks to make my life easier
- if args[i+1] == "virt1":
- self["Stack"] = "corosync"
- self["DoStonith"]=1
- self["stonith-type"] = "fence_xvm"
- self["stonith-params"] = "delay=0"
- self["IPBase"] = " fe80::1234:56:7890:1000"
-
- elif args[i+1] == "east16" or args[i+1] == "nsew":
- self["Stack"] = "corosync"
- self["DoStonith"]=1
- self["stonith-type"] = "fence_apc"
- self["stonith-params"] = "ipaddr=east-apc,login=apc,passwd=apc,pcmk_host_map=east-01:2;east-02:3;east-03:4;east-04:5;east-05:6;east-06:7;east-07:9;east-08:10;east-09:11;east-10:12;east-11:13;east-12:14;east-13:15;east-14:18;east-15:17;east-16:19;"
- self["IPBase"] = " fe80::1234:56:7890:2000"
-
- if args[i+1] == "east16":
- # Requires newer python than available via nsew
- self["IPagent"] = "Dummy"
-
- elif args[i+1] == "corosync8":
- self["Stack"] = "corosync"
- self["DoStonith"]=1
- self["stonith-type"] = "fence_rhevm"
-
- print("Obtaining RHEV-M credentials from the current environment")
- self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
- os.environ['RHEVM_USERNAME'],
- os.environ['RHEVM_PASSWORD'],
- os.environ['RHEVM_SERVER'],
- os.environ['RHEVM_PORT'],
- )
- self["IPBase"] = " fe80::1234:56:7890:3000"
-
- if os.path.isfile(dsh_file):
- self["nodes"] = []
- f = open(dsh_file, 'r')
- for line in f:
- l = line.strip().rstrip()
- if not l.startswith('#'):
- self["nodes"].append(l)
- f.close()
-
- else:
- print("Unknown DSH group: %s" % args[i+1])
-
- elif args[i] == "--syslog-facility" or args[i] == "--facility":
- skipthis=1
- self["SyslogFacility"] = args[i+1]
-
- elif args[i] == "--seed":
- skipthis=1
- self.SeedRandom(args[i+1])
-
- elif args[i] == "--warn-inactive":
- self["warn-inactive"] = 1
-
- elif args[i] == "--schema":
- skipthis=1
- self["Schema"] = args[i+1]
-
- elif args[i] == "--at-boot" or args[i] == "--cluster-starts-at-boot":
- skipthis=1
- if args[i+1] == "1" or args[i+1] == "yes":
- self["at-boot"] = 1
- elif args[i+1] == "0" or args[i+1] == "no":
- self["at-boot"] = 0
- else:
- self.usage(args[i+1])
-
- elif args[i] == "--stack":
- if args[i+1] == "fedora" or args[i+1] == "fedora-17" or args[i+1] == "fedora-18":
- self["Stack"] = "corosync"
- elif args[i+1] == "rhel-7":
- self["Stack"] = "corosync"
- else:
- self["Stack"] = args[i+1]
- skipthis=1
-
- elif args[i] == "--once":
- self["scenario"] = "all-once"
-
- elif args[i] == "--boot":
- self["scenario"] = "boot"
-
- elif args[i] == "--notification-agent":
- self["notification-agent"] = args[i+1]
- skipthis = 1
-
- elif args[i] == "--notification-recipient":
- self["notification-recipient"] = args[i+1]
- skipthis = 1
-
- elif args[i] == "--valgrind-tests":
- self["valgrind-tests"] = 1
-
- elif args[i] == "--valgrind-procs":
- self["valgrind-procs"] = args[i+1]
- skipthis = 1
-
- elif args[i] == "--no-loop-tests":
- self["loop-tests"] = 0
-
- elif args[i] == "--loop-minutes":
- skipthis=1
- try:
- self["loop-minutes"]=int(args[i+1])
- except ValueError:
- self.usage(args[i])
-
- elif args[i] == "--no-unsafe-tests":
- self["unsafe-tests"] = 0
-
- elif args[i] == "--experimental-tests":
- self["experimental-tests"] = 1
-
- elif args[i] == "--container-tests":
- self["container-tests"] = 1
-
- elif args[i] == "--set":
- skipthis=1
- (name, value) = args[i+1].split('=')
- self[name] = value
- print("Setting %s = %s" % (name, value))
-
- elif args[i] == "--help":
- self.usage(args[i], 0)
-
- elif args[i] == "--":
- break
-
- else:
- try:
- NumIter=int(args[i])
- self["iterations"] = NumIter
- except ValueError:
- self.usage(args[i])
-
- def usage(self, arg, status=1):
- if status:
- print("Illegal argument %s" % arg)
- print("usage: " + sys.argv[0] +" [options] number-of-iterations")
- print("\nCommon options: ")
- print("\t [--nodes 'node list'] list of cluster nodes separated by whitespace")
- print("\t [--group | -g 'name'] use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
- print("\t [--limit-nodes max] only use the first 'max' cluster nodes supplied with --nodes")
- print("\t [--stack corosync] which cluster stack is installed")
- print("\t [--list-tests] list the valid tests")
- print("\t [--benchmark] add the timing information")
- print("\t ")
- print("Options that CTS will usually auto-detect correctly: ")
- print("\t [--logfile path] where should the test software look for logs from cluster nodes")
- print("\t [--syslog-facility name] which syslog facility should the test software log to")
- print("\t [--at-boot (1|0)] does the cluster software start at boot time")
- print("\t [--test-ip-base ip] offset for generated IP address resources")
- print("\t ")
- print("Options for release testing: ")
- print("\t [--populate-resources | -r] generate a sample configuration")
- print("\t [--choose name] run only the named test")
- print("\t [--stonith (1 | 0 | yes | no | rhcs | ssh)]")
- print("\t [--once] run all valid tests once")
- print("\t ")
- print("Additional (less common) options: ")
- print("\t [--clobber-cib | -c ] erase any existing configuration")
- print("\t [--outputfile path] optional location for the test software to write logs to")
- print("\t [--trunc] truncate logfile before starting")
- print("\t [--xmit-loss lost-rate(0.0-1.0)]")
- print("\t [--recv-loss lost-rate(0.0-1.0)]")
- print("\t [--standby (1 | 0 | yes | no)]")
- print("\t [--fencing (1 | 0 | yes | no | rhcs | lha | openstack )]")
- print("\t [--stonith-type type]")
- print("\t [--stonith-args name=value]")
- print("\t [--bsc]")
- print("\t [--notification-agent path] script to configure for Pacemaker alerts")
- print("\t [--notification-recipient r] recipient to pass to alert script")
- print("\t [--no-loop-tests] don't run looping/time-based tests")
- print("\t [--no-unsafe-tests] don't run tests that are unsafe for use with ocfs2/drbd")
- print("\t [--valgrind-tests] include tests using valgrind")
- print("\t [--experimental-tests] include experimental tests")
- print("\t [--container-tests] include pacemaker_remote tests that run in lxc container resources")
- print("\t [--oprofile 'node list'] list of cluster nodes to run oprofile on]")
- print("\t [--qarsh] use the QARSH backdoor to access nodes instead of SSH")
- print("\t [--seed random_seed]")
- print("\t [--set option=value]")
- print("\t [--yes | -y] continue to run cts when there is an interaction whether to continue running pacemaker-cts")
- print("\t ")
- print("\t Example: ")
- # @PYTHON@ would be better here but not worth making file this a .in
- print("\t python sys.argv[0] -g virt1 -r --stonith ssh --schema pacemaker-2.0 500")
-
- sys.exit(status)
-
-class EnvFactory(object):
- instance = None
- def __init__(self):
- pass
-
- def getInstance(self, args=None):
- if not EnvFactory.instance:
- EnvFactory.instance = Environment(args)
- return EnvFactory.instance
diff --git a/python/pacemaker/_cts/Makefile.am b/python/pacemaker/_cts/Makefile.am
index 73e3d88581..e3ce29f77d 100644
--- a/python/pacemaker/_cts/Makefile.am
+++ b/python/pacemaker/_cts/Makefile.am
@@ -1,21 +1,22 @@
#
# Copyright 2023 the Pacemaker project contributors
#
# The version control history for this file may have further details.
#
# This source code is licensed under the GNU General Public License version 2
# or later (GPLv2+) WITHOUT ANY WARRANTY.
#
MAINTAINERCLEANFILES = Makefile.in
pkgpythondir = $(pythondir)/$(PACKAGE)/_cts
pkgpython_PYTHON = __init__.py \
corosync.py \
+ environment.py \
errors.py \
logging.py \
patterns.py \
process.py \
remote.py \
test.py
diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
new file mode 100644
index 0000000000..6fc902b17e
--- /dev/null
+++ b/python/pacemaker/_cts/environment.py
@@ -0,0 +1,645 @@
+""" Test environment classes for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = ["EnvFactory"]
+__copyright__ = "Copyright 2014-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import argparse
+import os
+import random
+import socket
+import sys
+import time
+
+from pacemaker._cts.logging import LogFactory
+from pacemaker._cts.remote import RemoteFactory
+
+class Environment:
+ """ A class for managing the CTS environment, consisting largely of processing
+ and storing command line parameters
+ """
+
+ # pylint doesn't understand that self._rsh is callable (it stores the
+ # singleton instance of RemoteExec, as returned by the getInstance method
+ # of RemoteFactory). It's possible we could fix this with type annotations,
+ # but those were introduced with python 3.5 and we only support python 3.4.
+ # I think we could also fix this by getting rid of the getInstance methods,
+ # but that's a project for another day. For now, just disable the warning.
+ # pylint: disable=not-callable
+
+ def __init__(self, args):
+ """ Create a new Environment instance. This class can be treated kind
+ of like a dictionary due to the presence of typical dict functions
+ like has_key, __getitem__, and __setitem__. However, it is not a
+ dictionary so do not rely on standard dictionary behavior.
+
+ Arguments:
+
+ args -- A list of command line parameters, minus the program name.
+ If None, sys.argv will be used.
+ """
+
+ self.data = {}
+ self._nodes = []
+
+ # Set some defaults before processing command line arguments. These are
+ # either not set by any command line parameter, or they need a default
+ # that can't be set in add_argument.
+ self["DeadTime"] = 300
+ self["StartTime"] = 300
+ self["StableTime"] = 30
+ self["tests"] = []
+ self["IPagent"] = "IPaddr2"
+ self["DoFencing"] = True
+ self["ClobberCIB"] = False
+ self["CIBfilename"] = None
+ self["CIBResource"] = False
+ self["LogWatcher"] = "any"
+ self["node-limit"] = 0
+ self["scenario"] = "random"
+
+ self.random_gen = random.Random()
+
+ self._logger = LogFactory()
+ self._rsh = RemoteFactory().getInstance()
+ self._target = "localhost"
+
+ self._seed_random()
+ self._parse_args(args)
+
+ if not self["ListTests"]:
+ self._validate()
+ self._discover()
+
+ def _seed_random(self, seed=None):
+ """ Initialize the random number generator with the given seed, or use
+ the current time if None
+ """
+
+ if not seed:
+ seed = int(time.time())
+
+ self["RandSeed"] = seed
+ self.random_gen.seed(str(seed))
+
+ def dump(self):
+ """ Print the current environment """
+
+ keys = []
+ for key in list(self.data.keys()):
+ keys.append(key)
+
+ keys.sort()
+ for key in keys:
+ s = "Environment[%s]" % key
+ self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key])))
+
+ def keys(self):
+ """ Return a list of all environment keys stored in this instance """
+
+ return list(self.data.keys())
+
+ def has_key(self, key):
+ """ Does the given environment key exist? """
+
+ if key == "nodes":
+ return True
+
+ return key in self.data
+
+ def __getitem__(self, key):
+ """ Return the given environment key, or None if it does not exist """
+
+ if str(key) == "0":
+ raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead")
+
+ if key == "nodes":
+ return self._nodes
+
+ if key == "Name":
+ return self._get_stack_short()
+
+ if key in self.data:
+ return self.data[key]
+
+ return None
+
+ def __setitem__(self, key, value):
+ """ Set the given environment key to the given value, overriding any
+ previous value
+ """
+
+ if key == "Stack":
+ self._set_stack(value)
+
+ elif key == "node-limit":
+ self.data[key] = value
+ self._filter_nodes()
+
+ elif key == "nodes":
+ self._nodes = []
+ for node in value:
+ # I don't think I need the IP address, etc. but this validates
+ # the node name against /etc/hosts and/or DNS, so it's a
+ # GoodThing(tm).
+ try:
+ n = node.strip()
+ socket.gethostbyname_ex(n)
+ self._nodes.append(n)
+ except:
+ self._logger.log("%s not found in DNS... aborting" % node)
+ raise
+
+ self._filter_nodes()
+
+ else:
+ self.data[key] = value
+
+ def random_node(self):
+ """ Choose a random node from the cluster """
+
+ return self.random_gen.choice(self["nodes"])
+
+ def _set_stack(self, name):
+ """ Normalize the given cluster stack name """
+
+ if name in ["corosync", "cs", "mcp"]:
+ self.data["Stack"] = "corosync 2+"
+
+ else:
+ raise ValueError("Unknown stack: %s" % name)
+
+ def _get_stack_short(self):
+ """ Return the short name for the currently set cluster stack """
+
+ if "Stack" not in self.data:
+ return "unknown"
+
+ if self.data["Stack"] == "corosync 2+":
+ return "crm-corosync"
+
+ LogFactory().log("Unknown stack: %s" % self["stack"])
+ raise ValueError("Unknown stack: %s" % self["stack"])
+
+ def _detect_syslog(self):
+ """ Detect the syslog variant in use on the target node """
+
+ if "syslogd" not in self.data:
+ if self["have_systemd"]:
+ # Systemd
+ (_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
+ self["syslogd"] = lines[0].strip()
+ else:
+ # SYS-V
+ (_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
+ self["syslogd"] = lines[0].strip()
+
+ if "syslogd" not in self.data or not self["syslogd"]:
+ # default
+ self["syslogd"] = "rsyslog"
+
+ def disable_service(self, node, service):
+ """ Disable the given service on the given node """
+
+ if self["have_systemd"]:
+ # Systemd
+ (rc, _) = self._rsh(node, "systemctl disable %s" % service)
+ return rc
+
+ # SYS-V
+ (rc, _) = self._rsh(node, "chkconfig %s off" % service)
+ return rc
+
+ def enable_service(self, node, service):
+ """ Enable the given service on the given node """
+
+ if self["have_systemd"]:
+ # Systemd
+ (rc, _) = self._rsh(node, "systemctl enable %s" % service)
+ return rc
+
+ # SYS-V
+ (rc, _) = self._rsh(node, "chkconfig %s on" % service)
+ return rc
+
+ def service_is_enabled(self, node, service):
+ """ Is the given service enabled on the given node? """
+
+ if self["have_systemd"]:
+ # Systemd
+
+ # With "systemctl is-enabled", we should check if the service is
+ # explicitly "enabled" instead of the return code. For example it returns
+ # 0 if the service is "static" or "indirect", but they don't really count
+ # as "enabled".
+ (rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service)
+ return rc == 0
+
+ # SYS-V
+ (rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service)
+ return rc == 0
+
+ def _detect_at_boot(self):
+ """ Detect if the cluster starts at boot """
+
+ if "at-boot" not in self.data:
+ self["at-boot"] = self.service_is_enabled(self._target, "corosync") \
+ or self.service_is_enabled(self._target, "pacemaker")
+
+ def _detect_ip_offset(self):
+ """ Detect the offset for IPaddr resources """
+
+ if self["CIBResource"] and "IPBase" not in self.data:
+ (_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
+ network = lines[0].strip()
+
+ (_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
+
+ try:
+ self["IPBase"] = lines[0].strip()
+ except (IndexError, TypeError):
+ self["IPBase"] = None
+
+ if not self["IPBase"]:
+ self["IPBase"] = " fe80::1234:56:7890:1000"
+ self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.")
+ self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
+ return
+
+ # pylint thinks self["IPBase"] is a list, not a string, which causes it
+ # to error out because a list doesn't have split().
+ # pylint: disable=no-member
+ if int(self["IPBase"].split('.')[3]) >= 240:
+ self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
+ % (self["IPBase"], self["IPBase"].split('.')[3]))
+ self["IPBase"] = " fe80::1234:56:7890:1000"
+ self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
+
+ def _filter_nodes(self):
+ """ If --limit-nodes is given, keep that many nodes from the front of the
+ list of cluster nodes and drop the rest
+ """
+
+ if self["node-limit"] > 0:
+ if len(self["nodes"]) > self["node-limit"]:
+ # pylint thinks self["node-limit"] is a list even though we initialize
+ # it as an int in __init__ and treat it as an int everywhere.
+ # pylint: disable=bad-string-format-type
+ self._logger.log("Limiting the number of nodes configured=%d (max=%d)"
+ %(len(self["nodes"]), self["node-limit"]))
+
+ while len(self["nodes"]) > self["node-limit"]:
+ self["nodes"].pop(len(self["nodes"])-1)
+
+ def _validate(self):
+ """ Were we given all the required command line parameters? """
+
+ if not self["nodes"]:
+ raise ValueError("No nodes specified!")
+
+ def _discover(self):
+ """ Probe cluster nodes to figure out how to log and manage services """
+
+ self._target = random.Random().choice(self["nodes"])
+
+ exerciser = socket.gethostname()
+
+ # Use the IP where possible to avoid name lookup failures
+ for ip in socket.gethostbyname_ex(exerciser)[2]:
+ if ip != "127.0.0.1":
+ exerciser = ip
+ break
+
+ self["cts-exerciser"] = exerciser
+
+ if "have_systemd" not in self.data:
+ (rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0)
+ self["have_systemd"] = rc == 0
+
+ self._detect_syslog()
+ self._detect_at_boot()
+ self._detect_ip_offset()
+
+ def _parse_args(self, argv):
+ """ Parse and validate command line parameters, setting the appropriate
+ values in the environment dictionary. If argv is None, use sys.argv
+ instead.
+ """
+
+ if not argv:
+ argv = sys.argv[1:]
+
+ parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0])
+
+ grp1 = parser.add_argument_group("Common options")
+ grp1.add_argument("-g", "--dsh-group", "--group",
+ metavar="GROUP", dest="group",
+ help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
+ grp1.add_argument("-l", "--limit-nodes",
+ type=int, default=0,
+ metavar="MAX",
+ help="Only use the first MAX cluster nodes supplied with --nodes")
+ grp1.add_argument("--benchmark",
+ action="store_true",
+ help="Add timing information")
+ grp1.add_argument("--list", "--list-tests",
+ action="store_true", dest="list_tests",
+ help="List the valid tests")
+ grp1.add_argument("--nodes",
+ metavar="NODES",
+ help="List of cluster nodes separated by whitespace")
+ grp1.add_argument("--stack",
+ default="corosync",
+ metavar="STACK",
+ help="Which cluster stack is installed")
+
+ grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
+ grp2.add_argument("-L", "--logfile",
+ metavar="PATH",
+ help="Where to look for logs from cluster nodes")
+ grp2.add_argument("--at-boot", "--cluster-starts-at-boot",
+ choices=["1", "0", "yes", "no"],
+ help="Does the cluster software start at boot time?")
+ grp2.add_argument("--facility", "--syslog-facility",
+ default="daemon",
+ metavar="NAME",
+ help="Which syslog facility to log to")
+ grp2.add_argument("--ip", "--test-ip-base",
+ metavar="IP",
+ help="Offset for generated IP address resources")
+
+ grp3 = parser.add_argument_group("Options for release testing")
+ grp3.add_argument("-r", "--populate-resources",
+ action="store_true",
+ help="Generate a sample configuration")
+ grp3.add_argument("--choose",
+ metavar="NAME",
+ help="Run only the named test")
+ grp3.add_argument("--fencing", "--stonith",
+ choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
+ default="1",
+ help="What fencing agent to use")
+ grp3.add_argument("--once",
+ action="store_true",
+ help="Run all valid tests once")
+
+ grp4 = parser.add_argument_group("Additional (less common) options")
+ grp4.add_argument("-c", "--clobber-cib",
+ action="store_true",
+ help="Erase any existing configuration")
+ grp4.add_argument("-y", "--yes",
+ action="store_true", dest="always_continue",
+ help="Continue to run whenever prompted")
+ grp4.add_argument("--boot",
+ action="store_true",
+ help="")
+ grp4.add_argument("--bsc",
+ action="store_true",
+ help="")
+ grp4.add_argument("--cib-filename",
+ metavar="PATH",
+ help="Install the given CIB file to the cluster")
+ grp4.add_argument("--container-tests",
+ action="store_true",
+ help="Include pacemaker_remote tests that run in lxc container resources")
+ grp4.add_argument("--experimental-tests",
+ action="store_true",
+ help="Include experimental tests")
+ grp4.add_argument("--loop-minutes",
+ type=int, default=60,
+ help="")
+ grp4.add_argument("--no-loop-tests",
+ action="store_true",
+ help="Don't run looping/time-based tests")
+ grp4.add_argument("--no-unsafe-tests",
+ action="store_true",
+ help="Don't run tests that are unsafe for use with ocfs2/drbd")
+ grp4.add_argument("--notification-agent",
+ metavar="PATH",
+ default="/var/lib/pacemaker/notify.sh",
+ help="Script to configure for Pacemaker alerts")
+ grp4.add_argument("--notification-recipient",
+ metavar="R",
+ default="/var/lib/pacemaker/notify.log",
+ help="Recipient to pass to alert script")
+ grp4.add_argument("--oprofile",
+ metavar="NODES",
+ help="List of cluster nodes to run oprofile on")
+ grp4.add_argument("--outputfile",
+ metavar="PATH",
+ help="Location to write logs to")
+ grp4.add_argument("--qarsh",
+ action="store_true",
+ help="Use QARSH to access nodes instead of SSH")
+ grp4.add_argument("--schema",
+ metavar="SCHEMA",
+ default="pacemaker-3.0",
+ help="Create a CIB conforming to the given schema")
+ grp4.add_argument("--seed",
+ metavar="SEED",
+ help="Use the given string as the random number seed")
+ grp4.add_argument("--set",
+ action="append",
+ metavar="ARG",
+ default=[],
+ help="Set key=value pairs (can be specified multiple times)")
+ grp4.add_argument("--stonith-args",
+ metavar="ARGS",
+ default="hostlist=all,livedangerously=yes",
+ help="")
+ grp4.add_argument("--stonith-type",
+ metavar="TYPE",
+ default="external/ssh",
+ help="")
+ grp4.add_argument("--trunc",
+ action="store_true", dest="truncate",
+ help="Truncate log file before starting")
+ grp4.add_argument("--valgrind-procs",
+ metavar="PROCS",
+ default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd",
+ help="Run valgrind against the given space-separated list of processes")
+ grp4.add_argument("--valgrind-tests",
+ action="store_true",
+ help="Include tests using valgrind")
+ grp4.add_argument("--warn-inactive",
+ action="store_true",
+ help="Warn if a resource is assigned to an inactive node")
+
+ parser.add_argument("iterations",
+ type=int,
+ help="Number of tests to run")
+
+ args = parser.parse_args(args=argv)
+
+ # Set values on this object based on what happened with command line
+ # processing. This has to be done in several blocks.
+
+ # These values can always be set. They get a default from the add_argument
+ # calls, only do one thing, and they do not have any side effects.
+ self["ClobberCIB"] = args.clobber_cib
+ self["ListTests"] = args.list_tests
+ self["Schema"] = args.schema
+ self["Stack"] = args.stack
+ self["SyslogFacility"] = args.facility
+ self["TruncateLog"] = args.truncate
+ self["at-boot"] = args.at_boot in ["1", "yes"]
+ self["benchmark"] = args.benchmark
+ self["continue"] = args.always_continue
+ self["container-tests"] = args.container_tests
+ self["experimental-tests"] = args.experimental_tests
+ self["iterations"] = args.iterations
+ self["loop-minutes"] = args.loop_minutes
+ self["loop-tests"] = not args.no_loop_tests
+ self["notification-agent"] = args.notification_agent
+ self["notification-recipient"] = args.notification_recipient
+ self["node-limit"] = args.limit_nodes
+ self["stonith-params"] = args.stonith_args
+ self["stonith-type"] = args.stonith_type
+ self["unsafe-tests"] = not args.no_unsafe_tests
+ self["valgrind-procs"] = args.valgrind_procs
+ self["valgrind-tests"] = args.valgrind_tests
+ self["warn-inactive"] = args.warn_inactive
+
+ # Nodes and groups are mutually exclusive, so their defaults cannot be
+ # set in their add_argument calls. Additionally, groups does more than
+ # just set a value. Here, set nodes first and then if a group is
+ # specified, override the previous nodes value.
+ if args.nodes:
+ self["nodes"] = args.nodes.split(" ")
+ else:
+ self["nodes"] = []
+
+ if args.group:
+ self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group)
+ LogFactory().add_file(self["OutputFile"], "CTS")
+
+ dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group)
+
+ if os.path.isfile(dsh_file):
+ self["nodes"] = []
+
+ with open(dsh_file, "r", encoding="utf-8") as f:
+ for line in f:
+ l = line.strip()
+
+ if not l.startswith('#'):
+ self["nodes"].append(l)
+ else:
+ print("Unknown DSH group: %s" % args.dsh_group)
+
+ # Everything else either can't have a default set in an add_argument
+ # call (likely because we don't want to always have a value set for it)
+ # or it does something fancier than just set a single value. However,
+ # order does not matter for these as long as the user doesn't provide
+ # conflicting arguments on the command line. So just do Everything
+ # alphabetically.
+ if args.boot:
+ self["scenario"] = "boot"
+
+ if args.bsc:
+ self["DoBSC"] = True
+ self["scenario"] = "basic-sanity"
+
+ if args.cib_filename:
+ self["CIBfilename"] = args.cib_filename
+ else:
+ self["CIBfilename"] = None
+
+ if args.choose:
+ self["scenario"] = "sequence"
+ self["tests"].append(args.choose)
+
+ if args.fencing:
+ if args.fencing in ["0", "no"]:
+ self["DoFencing"] = False
+ else:
+ self["DoFencing"] = True
+
+ if args.fencing in ["rhcs", "virt", "xvm"]:
+ self["stonith-type"] = "fence_xvm"
+
+ elif args.fencing == "scsi":
+ self["stonith-type"] = "fence_scsi"
+
+ elif args.fencing in ["lha", "ssh"]:
+ self["stonith-params"] = "hostlist=all,livedangerously=yes"
+ self["stonith-type"] = "external/ssh"
+
+ elif args.fencing == "openstack":
+ self["stonith-type"] = "fence_openstack"
+
+ print("Obtaining OpenStack credentials from the current environment")
+ self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % (
+ os.environ['OS_REGION_NAME'],
+ os.environ['OS_TENANT_NAME'],
+ os.environ['OS_AUTH_URL'],
+ os.environ['OS_USERNAME'],
+ os.environ['OS_PASSWORD']
+ )
+
+ elif args.fencing == "rhevm":
+ self["stonith-type"] = "fence_rhevm"
+
+ print("Obtaining RHEV-M credentials from the current environment")
+ self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
+ os.environ['RHEVM_USERNAME'],
+ os.environ['RHEVM_PASSWORD'],
+ os.environ['RHEVM_SERVER'],
+ os.environ['RHEVM_PORT'],
+ )
+
+ if args.ip:
+ self["CIBResource"] = True
+ self["ClobberCIB"] = True
+ self["IPBase"] = args.ip
+
+ if args.logfile:
+ self["LogAuditDisabled"] = True
+ self["LogFileName"] = args.logfile
+ self["LogWatcher"] = "remote"
+ else:
+ # We can't set this as the default on the parser.add_argument call
+ # for this option because then args.logfile will be set, which means
+ # the above branch will be taken and those other values will also be
+ # set.
+ self["LogFileName"] = "/var/log/messages"
+
+ if args.once:
+ self["scenario"] = "all-once"
+
+ if args.oprofile:
+ self["oprofile"] = args.oprofile.split(" ")
+ else:
+ self["oprofile"] = []
+
+ if args.outputfile:
+ self["OutputFile"] = args.outputfile
+ LogFactory().add_file(self["OutputFile"])
+
+ if args.populate_resources:
+ self["CIBResource"] = True
+ self["ClobberCIB"] = True
+
+ if args.qarsh:
+ self._rsh.enable_qarsh()
+
+ for kv in args.set:
+ (name, value) = kv.split("=")
+ self[name] = value
+ print("Setting %s = %s" % (name, value))
+
+class EnvFactory:
+ """ A class for constructing a singleton instance of an Environment object """
+
+ instance = None
+
+ # pylint: disable=invalid-name
+ def getInstance(self, args=None):
+ """ Returns the previously created instance of Environment, or creates a
+ new instance if one does not already exist.
+ """
+
+ if not EnvFactory.instance:
+ EnvFactory.instance = Environment(args)
+
+ return EnvFactory.instance
diff --git a/python/pacemaker/_cts/patterns.py b/python/pacemaker/_cts/patterns.py
index a50dbb5948..2e4c23d232 100644
--- a/python/pacemaker/_cts/patterns.py
+++ b/python/pacemaker/_cts/patterns.py
@@ -1,411 +1,408 @@
""" Pattern-holding classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["PatternSelector"]
__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+)"
import argparse
from pacemaker.buildoptions import BuildOptions
class BasePatterns:
""" The base class for holding a stack-specific set of command and log
file/stdout patterns. Stack-specific classes need to be built on top
of this one.
"""
def __init__(self):
""" Create a new BasePatterns instance which holds a very minimal set of
basic patterns.
"""
self._bad_news = []
self._components = {}
self._name = "crm-base"
self._ignore = [
"avoid confusing Valgrind",
# Logging bug in some versions of libvirtd
r"libvirtd.*: internal error: Failed to parse PCI config address",
# pcs can log this when node is fenced, but fencing is OK in some
# tests (and we will catch it in pacemaker logs when not OK)
r"pcs.daemon:No response from: .* request: get_configs, error:",
]
self._commands = {
"StatusCmd" : "crmadmin -t 60 -S %s 2>/dev/null",
"CibQuery" : "cibadmin -Ql",
"CibAddXml" : "cibadmin --modify -c --xml-text %s",
"CibDelXpath" : "cibadmin --delete --xpath %s",
"RscRunning" : BuildOptions.DAEMON_DIR + "/cts-exec-helper -R -r %s",
"CIBfile" : "%s:" + BuildOptions.CIB_DIR + "/cib.xml",
"TmpDir" : "/tmp",
"BreakCommCmd" : "iptables -A INPUT -s %s -j DROP >/dev/null 2>&1",
"FixCommCmd" : "iptables -D INPUT -s %s -j DROP >/dev/null 2>&1",
- "ReduceCommCmd" : "",
- "RestoreCommCmd" : "tc qdisc del dev lo root",
-
"MaintenanceModeOn" : "cibadmin --modify -c --xml-text '<cluster_property_set id=\"cib-bootstrap-options\"><nvpair id=\"cts-maintenance-mode-setting\" name=\"maintenance-mode\" value=\"true\"/></cluster_property_set>'",
"MaintenanceModeOff" : "cibadmin --delete --xpath \"//nvpair[@name='maintenance-mode']\"",
"StandbyCmd" : "crm_attribute -Vq -U %s -n standby -l forever -v %s 2>/dev/null",
"StandbyQueryCmd" : "crm_attribute -qG -U %s -n standby -l forever -d off 2>/dev/null",
}
self._search = {
"Pat:DC_IDLE" : r"pacemaker-controld.*State transition.*-> S_IDLE",
# This won't work if we have multiple partitions
"Pat:Local_started" : r"%s\W.*controller successfully started",
"Pat:NonDC_started" : r"%s\W.*State transition.*-> S_NOT_DC",
"Pat:DC_started" : r"%s\W.*State transition.*-> S_IDLE",
"Pat:We_stopped" : r"%s\W.*OVERRIDE THIS PATTERN",
"Pat:They_stopped" : r"%s\W.*LOST:.* %s ",
"Pat:They_dead" : r"node %s.*: is dead",
"Pat:They_up" : r"%s %s\W.*OVERRIDE THIS PATTERN",
"Pat:TransitionComplete" : "Transition status: Complete: complete",
"Pat:Fencing_start" : r"Requesting peer fencing .* targeting %s",
"Pat:Fencing_ok" : r"pacemaker-fenced.*:\s*Operation .* targeting %s by .* for .*@.*: OK",
"Pat:Fencing_recover" : r"pacemaker-schedulerd.*: Recover\s+%s",
"Pat:Fencing_active" : r"stonith resource .* is active on 2 nodes (attempting recovery)",
"Pat:Fencing_probe" : r"pacemaker-controld.* Result of probe operation for %s on .*: Error",
"Pat:RscOpOK" : r"pacemaker-controld.*:\s+Result of %s operation for %s.*: (0 \()?ok",
"Pat:RscOpFail" : r"pacemaker-schedulerd.*:.*Unexpected result .* recorded for %s of %s ",
"Pat:CloneOpFail" : r"pacemaker-schedulerd.*:.*Unexpected result .* recorded for %s of (%s|%s) ",
"Pat:RscRemoteOpOK" : r"pacemaker-controld.*:\s+Result of %s operation for %s on %s: (0 \()?ok",
"Pat:NodeFenced" : r"pacemaker-controld.*:\s* Peer %s was terminated \(.*\) by .* on behalf of .*: OK",
}
def get_component(self, key):
""" Return the patterns for a single component as a list, given by key.
This is typically the name of some subprogram (pacemaker-based,
pacemaker-fenced, etc.) or various special purpose keys. If key is
unknown, return an empty list.
"""
if key in self._components:
return self._components[key]
print("Unknown component '%s' for %s" % (key, self._name))
return []
def get_patterns(self, key):
""" Return various patterns supported by this object, given by key.
Depending on the key, this could either be a list or a hash. If key
is unknown, return None.
"""
if key == "BadNews":
return self._bad_news
if key == "BadNewsIgnore":
return self._ignore
if key == "Commands":
return self._commands
if key == "Search":
return self._search
if key == "Components":
return self._components
print("Unknown pattern '%s' for %s" % (key, self._name))
return None
def __getitem__(self, key):
if key == "Name":
return self._name
if key in self._commands:
return self._commands[key]
if key in self._search:
return self._search[key]
print("Unknown template '%s' for %s" % (key, self._name))
return None
class Corosync2Patterns(BasePatterns):
""" Patterns for Corosync version 2 cluster manager class """
def __init__(self):
BasePatterns.__init__(self)
self._name = "crm-corosync"
self._commands.update({
"StartCmd" : "service corosync start && service pacemaker start",
"StopCmd" : "service pacemaker stop; [ ! -e /usr/sbin/pacemaker-remoted ] || service pacemaker_remote stop; service corosync stop",
"EpochCmd" : "crm_node -e",
"QuorumCmd" : "crm_node -q",
"PartitionCmd" : "crm_node -p",
})
self._search.update({
# Close enough ... "Corosync Cluster Engine exiting normally" isn't
# printed reliably.
"Pat:We_stopped" : r"%s\W.*Unloading all Corosync service engines",
"Pat:They_stopped" : r"%s\W.*pacemaker-controld.*Node %s(\[|\s).*state is now lost",
"Pat:They_dead" : r"pacemaker-controld.*Node %s(\[|\s).*state is now lost",
"Pat:They_up" : r"\W%s\W.*pacemaker-controld.*Node %s state is now member",
"Pat:ChildExit" : r"\[[0-9]+\] exited with status [0-9]+ \(",
# "with signal 9" == pcmk_child_exit(), "$" == check_active_before_startup_processes()
"Pat:ChildKilled" : r"%s\W.*pacemakerd.*%s\[[0-9]+\] terminated( with signal 9|$)",
"Pat:ChildRespawn" : r"%s\W.*pacemakerd.*Respawning %s subdaemon after unexpected exit",
"Pat:InfraUp" : r"%s\W.*corosync.*Initializing transport",
"Pat:PacemakerUp" : r"%s\W.*pacemakerd.*Starting Pacemaker",
})
self._ignore += [
r"crm_mon:",
r"crmadmin:",
r"update_trace_data",
r"async_notify:.*strange, client not found",
r"Parse error: Ignoring unknown option .*nodename",
r"error.*: Operation 'reboot' .* using FencingFail returned ",
r"getinfo response error: 1$",
r"sbd.* error: inquisitor_child: DEBUG MODE IS ACTIVE",
r"sbd.* pcmk:\s*error:.*Connection to cib_ro.* (failed|closed)",
]
self._bad_news = [
r"[^(]error:",
r"crit:",
r"ERROR:",
r"CRIT:",
r"Shutting down...NOW",
r"Timer I_TERMINATE just popped",
r"input=I_ERROR",
r"input=I_FAIL",
r"input=I_INTEGRATED cause=C_TIMER_POPPED",
r"input=I_FINALIZED cause=C_TIMER_POPPED",
r"input=I_ERROR",
r"(pacemakerd|pacemaker-execd|pacemaker-controld):.*, exiting",
r"schedulerd.*Attempting recovery of resource",
r"is taking more than 2x its timeout",
r"Confirm not received from",
r"Welcome reply not received from",
r"Attempting to schedule .* after a stop",
r"Resource .* was active at shutdown",
r"duplicate entries for call_id",
r"Search terminated:",
r":global_timer_callback",
r"Faking parameter digest creation",
r"Parameters to .* action changed:",
r"Parameters to .* changed",
r"pacemakerd.*\[[0-9]+\] terminated( with signal| as IPC server|$)",
r"pacemaker-schedulerd.*Recover\s+.*\(.* -\> .*\)",
r"rsyslogd.* imuxsock lost .* messages from pid .* due to rate-limiting",
r"Peer is not part of our cluster",
r"We appear to be in an election loop",
r"Unknown node -> we will not deliver message",
r"(Blackbox dump requested|Problem detected)",
r"pacemakerd.*Could not connect to Cluster Configuration Database API",
r"Receiving messages from a node we think is dead",
r"share the same cluster nodeid",
r"share the same name",
r"pacemaker-controld:.*Transition failed: terminated",
r"Local CIB .* differs from .*:",
r"warn.*:\s*Continuing but .* will NOT be used",
r"warn.*:\s*Cluster configuration file .* is corrupt",
r"Election storm",
r"stalled the FSA with pending inputs",
]
self._components["common-ignore"] = [
r"Pending action:",
r"resource( was|s were) active at shutdown",
r"pending LRM operations at shutdown",
r"Lost connection to the CIB manager",
r"pacemaker-controld.*:\s*Action A_RECOVER .* not supported",
r"pacemaker-controld.*:\s*Performing A_EXIT_1 - forcefully exiting ",
r".*:\s*Requesting fencing \([^)]+\) of node ",
r"(Blackbox dump requested|Problem detected)",
]
self._components["corosync-ignore"] = [
r"Could not connect to Corosync CFG: CS_ERR_LIBRARY",
r"error:.*Connection to the CPG API failed: Library error",
r"\[[0-9]+\] exited with status [0-9]+ \(",
r"\[[0-9]+\] terminated with signal 15",
r"pacemaker-based.*error:.*Corosync connection lost",
r"pacemaker-fenced.*error:.*Corosync connection terminated",
r"pacemaker-controld.*State transition .* S_RECOVERY",
r"pacemaker-controld.*error:.*Input (I_ERROR|I_TERMINATE ) .*received in state",
r"pacemaker-controld.*error:.*Could not recover from internal error",
r"error:.*Connection to cib_(shm|rw).* (failed|closed)",
r"error:.*cib_(shm|rw) IPC provider disconnected while waiting",
r"error:.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
r"crit: Fencing daemon connection failed",
# This is overbroad, but we don't have a way to say that only
# certain transition errors are acceptable (if the fencer respawns,
# fence devices may appear multiply active). We have to rely on
# other causes of a transition error logging their own error
# message, which is the usual practice.
r"pacemaker-schedulerd.* Calculated transition .*/pe-error",
]
self._components["corosync"] = [
# We expect each daemon to lose its cluster connection.
# However, if the CIB manager loses its connection first,
# it's possible for another daemon to lose that connection and
# exit before losing the cluster connection.
r"pacemakerd.*:\s*warning:.*Lost connection to cluster layer",
r"pacemaker-attrd.*:\s*(crit|error):.*Lost connection to (cluster layer|the CIB manager)",
r"pacemaker-based.*:\s*(crit|error):.*Lost connection to cluster layer",
r"pacemaker-controld.*:\s*(crit|error):.*Lost connection to (cluster layer|the CIB manager)",
r"pacemaker-fenced.*:\s*(crit|error):.*Lost connection to (cluster layer|the CIB manager)",
r"schedulerd.*Scheduling node .* for fencing",
r"pacemaker-controld.*:\s*Peer .* was terminated \(.*\) by .* on behalf of .*:\s*OK",
]
self._components["pacemaker-based"] = [
r"pacemakerd.* pacemaker-attrd\[[0-9]+\] exited with status 102",
r"pacemakerd.* pacemaker-controld\[[0-9]+\] exited with status 1",
r"pacemakerd.* Respawning pacemaker-attrd subdaemon after unexpected exit",
r"pacemakerd.* Respawning pacemaker-based subdaemon after unexpected exit",
r"pacemakerd.* Respawning pacemaker-controld subdaemon after unexpected exit",
r"pacemakerd.* Respawning pacemaker-fenced subdaemon after unexpected exit",
r"pacemaker-.* Connection to cib_.* (failed|closed)",
r"pacemaker-attrd.*:.*Lost connection to the CIB manager",
r"pacemaker-controld.*:.*Lost connection to the CIB manager",
r"pacemaker-controld.*I_ERROR.*handle_cib_disconnect",
r"pacemaker-controld.* State transition .* S_RECOVERY",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*Could not recover from internal error",
]
self._components["pacemaker-based-ignore"] = [
r"pacemaker-execd.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
r"pacemaker-controld.*:\s+Result of .* operation for Fencing.*Error \(Lost connection to fencer\)",
r"pacemaker-controld.*:Could not connect to attrd: Connection refused",
# This is overbroad, but we don't have a way to say that only
# certain transition errors are acceptable (if the fencer respawns,
# fence devices may appear multiply active). We have to rely on
# other causes of a transition error logging their own error
# message, which is the usual practice.
r"pacemaker-schedulerd.* Calculated transition .*/pe-error",
]
self._components["pacemaker-execd"] = [
r"pacemaker-controld.*Connection to executor failed",
r"pacemaker-controld.*I_ERROR.*lrm_connection_destroy",
r"pacemaker-controld.*State transition .* S_RECOVERY",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*Could not recover from internal error",
r"pacemakerd.*pacemaker-controld\[[0-9]+\] exited with status 1",
r"pacemakerd.* Respawning pacemaker-execd subdaemon after unexpected exit",
r"pacemakerd.* Respawning pacemaker-controld subdaemon after unexpected exit",
]
self._components["pacemaker-execd-ignore"] = [
r"pacemaker-(attrd|controld).*Connection to lrmd.* (failed|closed)",
r"pacemaker-(attrd|controld).*Could not execute alert",
]
self._components["pacemaker-controld"] = [
r"State transition .* -> S_IDLE",
]
self._components["pacemaker-controld-ignore"] = []
self._components["pacemaker-attrd"] = []
self._components["pacemaker-attrd-ignore"] = []
self._components["pacemaker-schedulerd"] = [
r"State transition .* S_RECOVERY",
r"pacemakerd.* Respawning pacemaker-controld subdaemon after unexpected exit",
r"pacemaker-controld\[[0-9]+\] exited with status 1 \(",
r"Connection to the scheduler failed",
r"pacemaker-controld.*I_ERROR.*save_cib_contents",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*Could not recover from internal error",
]
self._components["pacemaker-schedulerd-ignore"] = [
r"Connection to pengine.* (failed|closed)",
]
self._components["pacemaker-fenced"] = [
r"error:.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
r"Fencing daemon connection failed",
r"pacemaker-controld.*Fencer successfully connected",
]
self._components["pacemaker-fenced-ignore"] = [
r"(error|warning):.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
r"crit:.*Fencing daemon connection failed",
r"error:.*Fencer connection failed \(will retry\)",
r"pacemaker-controld.*:\s+Result of .* operation for Fencing.*Error \(Lost connection to fencer\)",
# This is overbroad, but we don't have a way to say that only
# certain transition errors are acceptable (if the fencer respawns,
# fence devices may appear multiply active). We have to rely on
# other causes of a transition error logging their own error
# message, which is the usual practice.
r"pacemaker-schedulerd.* Calculated transition .*/pe-error",
]
self._components["pacemaker-fenced-ignore"].extend(self._components["common-ignore"])
patternVariants = {
"crm-base": BasePatterns,
"crm-corosync": Corosync2Patterns
}
class PatternSelector:
""" A class for choosing one of several Pattern objects and then extracting
various pieces of information from that object
"""
def __init__(self, name="crm-corosync"):
""" Create a new PatternSelector object by instantiating whatever class
is given by name. Defaults to Corosync2Patterns for "crm-corosync" or
None. While other objects could be supported in the future, only this
and the base object are supported at this time.
"""
self._name = name
# If no name was given, use the default. Otherwise, look up the appropriate
# class in patternVariants, instantiate it, and use that.
if not name:
self._base = Corosync2Patterns()
else:
self._base = patternVariants[name]()
def get_patterns(self, kind):
""" Call get_patterns on the previously instantiated pattern object """
return self._base.get_patterns(kind)
def get_template(self, key):
""" Return a single pattern from the previously instantiated pattern
object as a string, or None if no pattern exists for the given key.
"""
return self._base[key]
def get_component(self, kind):
""" Call get_component on the previously instantiated pattern object """
return self._base.get_component(kind)
def __getitem__(self, key):
return self.get_template(key)
# PYTHONPATH=python python python/pacemaker/_cts/patterns.py -k crm-corosync -t StartCmd
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--kind", metavar="KIND")
parser.add_argument("-t", "--template", metavar="TEMPLATE")
args = parser.parse_args()
print(PatternSelector(args.kind)[args.template])
diff --git a/python/pylintrc b/python/pylintrc
index db2a661d16..e65110b601 100644
--- a/python/pylintrc
+++ b/python/pylintrc
@@ -1,555 +1,556 @@
# NOTE: Any line with CHANGED: describes something that we changed from the
# default pylintrc configuration.
[MAIN]
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Files or directories to be skipped. They should be base names, not
# paths.
ignore=CVS
# Add files or directories matching the regex patterns to the ignore-list. The
# regex matches against paths and can be in Posix or Windows format.
ignore-paths=
# Files or directories matching the regex patterns are skipped. The regex
# matches against base names, not paths.
ignore-patterns=^\.#
# Pickle collected data for later comparisons.
persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.
jobs=1
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
suggestion-mode=yes
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code
extension-pkg-allow-list=
# Minimum supported python version
# CHANGED
py-version = 3.4
# Control the amount of potential inferred values when inferring a single
# object. This can help the performance when dealing with large functions or
# complex, nested conditions.
limit-inference-results=100
# Specify a score threshold under which the program will exit with error.
fail-under=10.0
# Return non-zero exit code if any of these messages/categories are detected,
# even if score is above --fail-under value. Syntax same as enable. Messages
# specified are enabled, while categories only check already-enabled messages.
fail-on=
# Clear in-memory caches upon conclusion of linting. Useful if running pylint in
# a server-like mode.
clear-cache-post-run=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED
# confidence=
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=
use-symbolic-message-instead,
useless-suppression,
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).You can also use "--disable=all" to
# disable everything first and then re-enable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
# CHANGED
disable=line-too-long,
too-few-public-methods,
too-many-arguments,
too-many-branches,
too-many-instance-attributes,
+ too-many-statements,
unrecognized-option,
useless-option-value
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html. You can also give a reporter class, eg
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages
reports=no
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables 'fatal', 'error', 'warning', 'refactor', 'convention'
# and 'info', which contain the number of messages in each category, as
# well as 'statement', which is the total number of statements analyzed. This
# score is used by the global evaluation report (RP0004).
evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details
#msg-template=
# Activate the evaluation score.
score=yes
[LOGGING]
# Logging modules to check that the string format arguments are in logging
# function parameter format
logging-modules=logging
# The type of string formatting that logging methods do. `old` means using %
# formatting, `new` is for `{}` formatting.
logging-format-style=old
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
# CHANGED: Don't do anything about FIXME, XXX, or TODO
notes=
# Regular expression of note tags to take in consideration.
#notes-rgx=
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=6
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=yes
# Signatures are removed from the similarity computation
ignore-signatures=yes
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid defining new builtins when possible.
additional-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,_cb
# Tells whether unused global variables should be treated as a violation.
allow-global-unused-variables=yes
# List of names allowed to shadow builtins
allowed-redefined-builtins=
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=100
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
single-line-class-stmt=no
# Maximum number of lines in a module
max-module-lines=2000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
[BASIC]
# Good variable names which should always be accepted, separated by a comma
# CHANGED: Single variable names are handled by variable-rgx below, leaving
# _ here as the name for any variable that should be ignored.
good-names=_
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs=
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
# Bad variable names regexes, separated by a comma. If names match any regex,
# they will always be refused
bad-names-rgxs=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Include a hint for the correct naming format with invalid-name
include-naming-hint=no
# Naming style matching correct function names.
function-naming-style=snake_case
# Regular expression matching correct function names
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming style matching correct variable names.
variable-naming-style=snake_case
# Regular expression matching correct variable names
# CHANGED: One letter variables are fine
variable-rgx=[a-z_][a-z0-9_]{,30}$
# Naming style matching correct constant names.
const-naming-style=UPPER_CASE
# Regular expression matching correct constant names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$
# Naming style matching correct attribute names.
attr-naming-style=snake_case
# Regular expression matching correct attribute names
attr-rgx=[a-z_][a-z0-9_]{2,}$
# Naming style matching correct argument names.
argument-naming-style=snake_case
# Regular expression matching correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming style matching correct class attribute names.
class-attribute-naming-style=any
# Regular expression matching correct class attribute names
class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
# Naming style matching correct class constant names.
class-const-naming-style=UPPER_CASE
# Regular expression matching correct class constant names. Overrides class-
# const-naming-style.
#class-const-rgx=
# Naming style matching correct inline iteration names.
inlinevar-naming-style=any
# Regular expression matching correct inline iteration names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Naming style matching correct class names.
class-naming-style=PascalCase
# Regular expression matching correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Naming style matching correct module names.
module-naming-style=snake_case
# Regular expression matching correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Naming style matching correct method names.
method-naming-style=snake_case
# Regular expression matching correct method names
method-rgx=[a-z_][a-z0-9_]{2,}$
# Regular expression matching correct type variable names
#typevar-rgx=
# Regular expression which should only match function or class names that do
# not require a docstring. Use ^(?!__init__$)_ to also check __init__.
no-docstring-rgx=__.*__
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
# List of decorators that define properties, such as abc.abstractproperty.
property-classes=abc.abstractproperty
[TYPECHECK]
# Regex pattern to define which classes are considered mixins if ignore-mixin-
# members is set to 'yes'
mixin-class-rgx=.*MixIn
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=REQUEST,acl_users,aq_parent,argparse.Namespace
# List of decorators that create context managers from functions, such as
# contextlib.contextmanager.
contextmanager-decorators=contextlib.contextmanager
# Tells whether to warn about missing members when the owner of the attribute
# is inferred to be None.
ignore-none=yes
# This flag controls whether pylint should warn about no-member and similar
# checks whenever an opaque object is returned when inferring. The inference
# can return multiple potential results while evaluating a Python object, but
# some branches might not be evaluated, which results in partial inference. In
# that case, it might be useful to still emit no-member and other checks for
# the rest of the inferred objects.
ignore-on-opaque-inference=yes
# Show a hint with possible names when a member name was not found. The aspect
# of finding the hint is based on edit distance.
missing-member-hint=yes
# The minimum edit distance a name should have in order to be considered a
# similar match for a missing member name.
missing-member-hint-distance=1
# The total number of similar names that should be taken in consideration when
# showing a hint for a missing member.
missing-member-max-choices=1
[SPELLING]
# Spelling dictionary name. Available dictionaries: none. To make it working
# install python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# List of comma separated words that should be considered directives if they
# appear and the beginning of a comment and should not be checked.
spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:,pragma:,# noinspection
# A path to a file that contains private dictionary; one word per line.
spelling-private-dict-file=.pyenchant_pylint_custom_dict.txt
# Tells whether to store unknown words to indicated private dictionary in
# --spelling-private-dict-file option instead of raising a message.
spelling-store-unknown-words=no
# Limits count of emitted suggestions for spelling mistakes.
max-spelling-suggestions=2
[DESIGN]
# Maximum number of arguments for function / method
max-args=10
# Maximum number of locals for function / method body
max-locals=25
# Maximum number of return / yield for function / method body
max-returns=11
# Maximum number of branch for function / method body
max-branches=27
# Maximum number of statements in function / method body
max-statements=100
# Maximum number of parents for a class (see R0901).
max-parents=7
# List of qualified class names to ignore when counting class parents (see R0901).
ignored-parents=
# Maximum number of attributes for a class (see R0902).
max-attributes=11
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
# Maximum number of public methods for a class (see R0904).
max-public-methods=25
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
# Maximum number of statements in a try-block
max-try-statements = 14
# List of regular expressions of class ancestor names to
# ignore when counting public methods (see R0903).
exclude-too-few-public-methods=
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp,__post_init__
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=mcs
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,_fields,_replace,_source,_make
# Warn about protected attribute access inside special methods
check-protected-access-in-special-methods=no
[IMPORTS]
# List of modules that can be imported at any level, not just the top level
# one.
allow-any-import-level=
# Allow wildcard imports from modules that define __all__.
allow-wildcard-with-all=no
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Couples of modules and preferred modules, separated by a comma.
preferred-modules=
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=builtins.Exception
[TYPING]
# Set to ``no`` if the app / library does **NOT** need to support runtime
# introspection of type annotations. If you use type annotations
# **exclusively** for type checking of an application, you're probably fine.
# For libraries, evaluate if some users what to access the type hints at
# runtime first, e.g., through ``typing.get_type_hints``. Applies to Python
# versions 3.7 - 3.9
runtime-typing = no
[DEPRECATED_BUILTINS]
# List of builtins function names that should not be used, separated by a comma
bad-functions=map,input
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
# Complete name of functions that never returns. When checking for
# inconsistent-return-statements if a never returning function is called then
# it will be considered as an explicit return statement and no message will be
# printed.
never-returning-functions=sys.exit,argparse.parse_error
[STRING]
# This flag controls whether inconsistent-quotes generates a warning when the
# character used as a quote delimiter is used inconsistently within a module.
check-quote-consistency=no
# This flag controls whether the implicit-str-concat should generate a warning
# on implicit string concatenation in sequences defined over several lines.
check-str-concat-over-line-jumps=no
[CODE_STYLE]
# Max line length for which to sill emit suggestions. Used to prevent optional
# suggestions which would get split by a code formatter (e.g., black). Will
# default to the setting for ``max-line-length``.
#max-line-length-suggestions=

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jan 25, 6:33 AM (1 d, 4 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1316047
Default Alt Text
(324 KB)

Event Timeline