diff --git a/cts/lab/CM_corosync.py b/cts/lab/CM_corosync.py
index 13d84b3f97..9a79e46d0a 100644
--- a/cts/lab/CM_corosync.py
+++ b/cts/lab/CM_corosync.py
@@ -1,60 +1,60 @@
""" Corosync-specific class for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2007-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.CTS import Process
from pacemaker._cts.clustermanager import ClusterManager
from pacemaker._cts.patterns import PatternSelector
class crm_corosync(ClusterManager):
'''
Corosync version 2 cluster manager class
'''
def __init__(self, name=None):
if not name: name="crm-corosync"
ClusterManager.__init__(self)
self.fullcomplist = {}
self.templates = PatternSelector(self.name)
@property
def components(self):
complist = []
if not len(list(self.fullcomplist.keys())):
for c in [ "pacemaker-based", "pacemaker-controld", "pacemaker-attrd", "pacemaker-execd", "pacemaker-fenced" ]:
self.fullcomplist[c] = Process(
self, c,
pats = self.templates.get_component(c),
badnews_ignore = self.templates.get_component("%s-ignore" % c) +
self.templates.get_component("common-ignore"))
# the scheduler uses dc_pats instead of pats
self.fullcomplist["pacemaker-schedulerd"] = Process(
self, "pacemaker-schedulerd",
dc_pats = self.templates.get_component("pacemaker-schedulerd"),
badnews_ignore = self.templates.get_component("pacemaker-schedulerd-ignore") +
self.templates.get_component("common-ignore"))
# add (or replace) extra components
self.fullcomplist["corosync"] = Process(
self, "corosync",
pats = self.templates.get_component("corosync"),
badnews_ignore = self.templates.get_component("corosync-ignore") +
self.templates.get_component("common-ignore")
)
# Processes running under valgrind can't be shot with "killall -9 processname",
# so don't include them in the returned list
- vgrind = self.Env["valgrind-procs"].split()
+ vgrind = self.env["valgrind-procs"].split()
for key in list(self.fullcomplist.keys()):
- if self.Env["valgrind-tests"]:
+ if self.env["valgrind-tests"]:
if key in vgrind:
self.log("Filtering %s from the component list as it is being profiled by valgrind" % key)
continue
- if key == "pacemaker-fenced" and not self.Env["DoFencing"]:
+ if key == "pacemaker-fenced" and not self.env["DoFencing"]:
continue
complist.append(self.fullcomplist[key])
return complist
diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index c5521aaded..4584867e01 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1029 +1,1029 @@
""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
""" The base class for various kinds of auditors. Specific audit implementations
should be built on top of this one. Audits can do all kinds of checks on the
system. The basic interface for callers is the `__call__` method, which
returns True if the audit passes and False if it fails.
"""
def __init__(self, cm):
""" Create a new ClusterAudit instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
raise NotImplementedError
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
""" Log a message """
self._cm.log("audit: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("audit: %s" % args)
class LogAudit(ClusterAudit):
""" Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that
we can read back that test message using logging tools.
"""
def __init__(self, cm):
""" Create a new LogAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
""" Restart logging on the given nodes, or all if none are given """
if not nodes:
- nodes = self._cm.Env["nodes"]
+ nodes = self._cm.env["nodes"]
self._cm.debug("Restarting logging on: %r" % nodes)
for node in nodes:
- if self._cm.Env["have_systemd"]:
+ if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node)
- (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"])
+ (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"])
if rc != 0:
- self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node))
+ self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node))
def _create_watcher(self, patterns, kind):
""" Create a new LogWatcher instance for the given patterns """
- watch = LogWatcher(self._cm.Env["LogFileName"], patterns,
- self._cm.Env["nodes"], kind, "LogAudit", 5,
+ watch = LogWatcher(self._cm.env["LogFileName"], patterns,
+ self._cm.env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
""" Perform the log audit """
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
- for node in self._cm.Env["nodes"]:
+ for node in self._cm.env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
- watch_pref = self._cm.Env["LogWatcher"]
+ watch_pref = self._cm.env["LogWatcher"]
if watch_pref == LogKind.ANY:
kinds = [ LogKind.FILE ]
- if self._cm.Env["have_systemd"]:
+ if self._cm.env["have_systemd"]:
kinds += [ LogKind.JOURNAL ]
kinds += [ LogKind.REMOTE_FILE ]
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log("Logging test message with identifier %s" % suffix)
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
- for node in self._cm.Env["nodes"]:
- cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix)
+ for node in self._cm.env["nodes"]:
+ cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in list(watch.keys()):
w = watch[k]
if watch_pref == LogKind.ANY:
self._cm.log("Checking for test message in %s logs" % k)
w.look_for_all(silent=True)
if w.unmatched:
for regex in w.unmatched:
self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
else:
if watch_pref == LogKind.ANY:
self._cm.log("Found test message in %s logs" % k)
- self._cm.Env["LogWatcher"] = k
+ self._cm.env["LogWatcher"] = k
return 1
return False
def __call__(self):
max_attempts = 3
attempt = 0
- self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60*attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
return False
return True
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
- if self._cm.Env["LogAuditDisabled"]:
+ if self._cm.env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
""" Audit disk usage on cluster nodes to verify that there is enough free
space left on whichever mounted file system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
""" Create a new DiskAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
result = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
- self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
- for node in self._cm.Env["nodes"]:
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
+ for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
% (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
% (node, used_percent, remaining_mb))
result = False
- if not should_continue(self._cm.Env):
+ if not should_continue(self._cm.env):
raise ValueError("Disk full on %s" % node)
elif remaining_mb < 100 or used_percent > 90:
self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class FileAudit(ClusterAudit):
""" Audit the filesystem looking for various failure conditions:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
""" Create a new FileAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def __call__(self):
result = True
- self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
- for node in self._cm.Env["nodes"]:
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
+ for node in self._cm.env["nodes"]:
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = False
clean = True
self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug("ps[%s]: %s" % (node, line))
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug("Skipping %s" % node)
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class AuditResource:
""" A base class for storing information about a cluster resource """
def __init__(self, cm, line):
""" Create a new AuditResource instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
""" Is this resource unique? """
return self.flags & 0x20
@property
def orphan(self):
""" Is this resource an orphan? """
return self.flags & 0x01
@property
def managed(self):
""" Is this resource managed by the cluster? """
return self.flags & 0x02
class AuditConstraint:
""" A base class for storing information about a cluster constraint """
def __init__(self, cm, line):
""" Create a new AuditConstraint instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
""" Audit primitive resources to verify a variety of conditions, including that
they are active and managed only when expected; they are active on the
expected clusted node; and that they are not orphaned.
"""
def __init__(self, cm):
""" Create a new PrimitiveAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
""" Perform the audit of a single resource """
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %r" % (resource.id, active))
elif resource.needs_quorum == 1:
self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
rc = False
elif not resource.managed:
self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
rc = False
elif resource.orphan:
self.debug("Resource %s is an inactive orphan" % resource.id)
elif not self._inactive_nodes:
self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
rc = False
- elif self._cm.Env["warn-inactive"]:
+ elif self._cm.env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
% (resource.id, self._inactive_nodes))
return rc
def _setup(self):
""" Verify cluster nodes are active, and collect resource and colocation
information used for performing the audit.
"""
- for node in self._cm.Env["nodes"]:
+ for node in self._cm.env["nodes"]:
if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
- for node in self._cm.Env["nodes"]:
+ for node in self._cm.env["nodes"]:
if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name)
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log("Unknown entry: %s" % line)
return True
def __call__(self):
result = True
if not self._setup():
return result
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
result = False
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
""" Audit group resources to verify that each of its child primitive
resources is active on the expected cluster node.
"""
def __init__(self, cm):
""" Create a new GroupAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
result = True
if not self._setup():
return result
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
result = False
self._cm.log("Child %s of %s is active more than once: %r"
% (child.id, group.id, nodes))
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
result = False
self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
% (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return result
class CloneAudit(PrimitiveAudit):
""" Audit clone resources. NOTE: Currently, this class does not perform
any actual audit functions.
"""
def __init__(self, cm):
""" Create a new CloneAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
result = True
if not self._setup():
return result
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return result
class ColocationAudit(PrimitiveAudit):
""" Audit cluster resources to verify that those that should be colocated
with each other actually are.
"""
def __init__(self, cm):
""" Create a new ColocationAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
""" Return a list of cluster nodes where a given resource is running """
(rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
result = True
if not self._setup():
return result
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if not node in target:
result = False
self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
% (coloc.id, coloc.rsc, node, target))
else:
self.debug("Colocation audit (%s): %s running on %s (in %r)"
% (coloc.id, coloc.rsc, node, target))
return result
class ControllerStateAudit(ClusterAudit):
""" Audit cluster nodes to verify that those we expect to be active are
active, and those that are expected to be inactive are inactive.
"""
def __init__(self, cm):
""" Create a new ControllerStateAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
result = True
up_are_down = 0
down_are_up = 0
unstable_list = []
- for node in self._cm.Env["nodes"]:
+ for node in self._cm.env["nodes"]:
should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
result = False
self._cm.log("Cluster is not stable: %d (of %d): %r"
% (len(unstable_list), self._cm.upcount(), unstable_list))
if up_are_down > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be up were down."
- % (up_are_down, len(self._cm.Env["nodes"])))
+ % (up_are_down, len(self._cm.env["nodes"])))
if down_are_up > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be down were up."
- % (down_are_up, len(self._cm.Env["nodes"])))
+ % (down_are_up, len(self._cm.env["nodes"])))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
""" Audit the CIB by verifying that it is identical across cluster nodes """
def __init__(self, cm):
""" Create a new CIBAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return result
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
if self._audit_cib_contents(partition) == 0:
result = False
return result
def _audit_cib_contents(self, hostlist):
""" Perform the CIB audit on the given hosts """
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node)
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node0)
passed = False
else:
(rc, result) = self._cm.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
return passed
def _store_remote_cib(self, node, target):
""" Store a copy of the given node's CIB on the given target node. If
no target is given, store the CIB on the given node.
"""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
""" Audit each partition in a cluster to verify a variety of conditions:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
""" Create a new PartitionAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return result
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
result = False
for partition in ccm_partitions:
self._cm.log("\t %s" % partition)
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
result = False
return result
def _trim_string(self, avalue):
""" Remove the last character from a multi-character string """
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
""" Remove the last character from a multi-character string and convert
the result to an int.
"""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
""" Perform the audit of a single partition """
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % partition)
for node in node_list:
if self._cm.expected_status[node] != "up":
self._cm.log("Warn: Node %s appeared out of nowhere" % node)
self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log("Lowest epoch not determined in %s" % partition)
passed = False
for node in node_list:
if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self._node_epoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self._cm.log("DC %s is not the oldest node (%d vs. %d)"
% (node, self._node_epoch[node], lowest_epoch))
passed = False
if not dc_found:
self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
% (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self._cm.log("%d DCs (%s) found in cluster partition: %s"
% (len(dc_found), str(dc_found), str(node_list)))
passed = False
if not passed:
for node in node_list:
if self._cm.expected_status[node] == "up":
self._cm.log("epoch %s : %s"
% (self._node_epoch[node], self._node_state[node]))
return passed
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
""" Return a list of instances of applicable audits that can be performed
for the given ClusterManager.
"""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/cib.py b/python/pacemaker/_cts/cib.py
index f272e4707c..63295ac0c2 100644
--- a/python/pacemaker/_cts/cib.py
+++ b/python/pacemaker/_cts/cib.py
@@ -1,424 +1,424 @@
""" CIB generator for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["ConfigFactory"]
__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import warnings
import tempfile
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.cibxml import Alerts, Clone, Expression, FencingTopology, Group, Nodes, OpDefaults, Option, Resource, Rule
from pacemaker._cts.network import next_ip
class CIB:
""" A class for generating, representing, and installing a CIB file onto
cluster nodes
"""
def __init__(self, cm, version, factory, tmpfile=None):
""" Create a new CIB instance
Arguments:
cm -- A ClusterManager instance
version -- The schema syntax version
factory -- A ConfigFactory instance
tmpfile -- Where to store the CIB, or None to use a new tempfile
"""
# pylint: disable=invalid-name
self._cib = None
self._cm = cm
self._counter = 1
self._factory = factory
self._num_nodes = 0
self.version = version
if not tmpfile:
warnings.filterwarnings("ignore")
# pylint: disable=consider-using-with
f = tempfile.NamedTemporaryFile(delete=True)
f.close()
tmpfile = f.name
warnings.resetwarnings()
self._factory.tmpfile = tmpfile
def _show(self):
""" Query a cluster node for its generated CIB; log and return the result """
output = ""
(_, result) = self._factory.rsh(self._factory.target, "HOME=/root CIB_file=%s cibadmin -Ql" % self._factory.tmpfile, verbose=1)
for line in result:
output += line
self._factory.debug("Generated Config: %s" % line)
return output
def new_ip(self, name=None):
""" Generate an IP resource for the next available IP address, optionally
specifying the resource's name.
"""
- if self._cm.Env["IPagent"] == "IPaddr2":
- ip = next_ip(self._cm.Env["IPBase"])
+ if self._cm.env["IPagent"] == "IPaddr2":
+ ip = next_ip(self._cm.env["IPBase"])
if not name:
if ":" in ip:
(_, _, suffix) = ip.rpartition(":")
name = "r%s" % suffix
else:
name = "r%s" % ip
- r = Resource(self._factory, name, self._cm.Env["IPagent"], "ocf")
+ r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
r["ip"] = ip
if ":" in ip:
r["cidr_netmask"] = "64"
r["nic"] = "eth0"
else:
r["cidr_netmask"] = "32"
else:
if not name:
- name = "r%s%d" % (self._cm.Env["IPagent"], self._counter)
+ name = "r%s%d" % (self._cm.env["IPagent"], self._counter)
self._counter += 1
- r = Resource(self._factory, name, self._cm.Env["IPagent"], "ocf")
+ r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
r.add_op("monitor", "5s")
return r
def get_node_id(self, node_name):
""" Check the cluster configuration for the node ID for the given node_name """
# We can't account for every possible configuration,
# so we only return a node ID if:
# * The node is specified in /etc/corosync/corosync.conf
# with "ring0_addr:" equal to node_name and "nodeid:"
# explicitly specified.
# In all other cases, we return 0.
node_id = 0
# awkward command: use } as record separator
# so each corosync.conf "object" is one record;
# match the "node {" record that has "ring0_addr: node_name";
# then print the substring of that record after "nodeid:"
(rc, output) = self._factory.rsh(self._factory.target,
r"""awk -v RS="}" """
r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/"""
r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s"""
% (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1)
if rc == 0 and len(output) == 1:
try:
node_id = int(output[0])
except ValueError:
node_id = 0
return node_id
def install(self, target):
""" Generate a CIB file and install it to the given cluster node """
old = self._factory.tmpfile
# Force a rebuild
self._cib = None
self._factory.tmpfile = "%s/cib.xml" % BuildOptions.CIB_DIR
self.contents(target)
self._factory.rsh(self._factory.target, "chown %s %s" % (BuildOptions.DAEMON_USER, self._factory.tmpfile))
self._factory.tmpfile = old
def contents(self, target):
""" Generate a complete CIB file """
# fencing resource
if self._cib:
return self._cib
if target:
self._factory.target = target
self._factory.rsh(self._factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self._factory.tmpfile))
- self._num_nodes = len(self._cm.Env["nodes"])
+ self._num_nodes = len(self._cm.env["nodes"])
no_quorum = "stop"
if self._num_nodes < 3:
no_quorum = "ignore"
self._factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self._num_nodes)
# We don't need a nodes section unless we add attributes
stn = None
# Fencing resource
# Define first so that the shell doesn't reject every update
- if self._cm.Env["DoFencing"]:
+ if self._cm.env["DoFencing"]:
# Define the "real" fencing device
- st = Resource(self._factory, "Fencing", self._cm.Env["stonith-type"], "stonith")
+ st = Resource(self._factory, "Fencing", self._cm.env["stonith-type"], "stonith")
# Set a threshold for unreliable stonith devices such as the vmware one
st.add_meta("migration-threshold", "5")
st.add_op("monitor", "120s", timeout="120s")
st.add_op("stop", "0", timeout="60s")
st.add_op("start", "0", timeout="60s")
# For remote node tests, a cluster node is stopped and brought back up
# as a remote node with the name "remote-OLDNAME". To allow fencing
# devices to fence these nodes, create a list of all possible node names.
- all_node_names = [ prefix+n for n in self._cm.Env["nodes"] for prefix in ('', 'remote-') ]
+ all_node_names = [ prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-') ]
# Add all parameters specified by user
- entries = self._cm.Env["stonith-params"].split(',')
+ entries = self._cm.env["stonith-params"].split(',')
for entry in entries:
try:
(name, value) = entry.split('=', 1)
except ValueError:
print("Warning: skipping invalid fencing parameter: %s" % entry)
continue
# Allow user to specify "all" as the node list, and expand it here
if name in [ "hostlist", "pcmk_host_list" ] and value == "all":
value = ' '.join(all_node_names)
st[name] = value
st.commit()
# Test advanced fencing logic
stf_nodes = []
stt_nodes = []
attr_nodes = {}
# Create the levels
stl = FencingTopology(self._factory)
- for node in self._cm.Env["nodes"]:
+ for node in self._cm.env["nodes"]:
# Remote node tests will rename the node
remote_node = "remote-%s" % node
# Randomly assign node to a fencing method
- ftype = self._cm.Env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
+ ftype = self._cm.env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
# For levels-and, randomly choose targeting by node name or attribute
by = ""
if ftype == "levels-and":
node_id = self.get_node_id(node)
- if node_id == 0 or self._cm.Env.random_gen.choice([True, False]):
+ if node_id == 0 or self._cm.env.random_gen.choice([True, False]):
by = " (by name)"
else:
attr_nodes[node] = node_id
by = " (by attribute)"
self._cm.log(" - Using %s fencing for node: %s%s" % (ftype, node, by))
if ftype == "levels-and":
# If targeting by name, add a topology level for this node
if node not in attr_nodes:
stl.level(1, node, "FencingPass,Fencing")
# Always target remote nodes by name, otherwise we would need to add
# an attribute to the remote node only during remote tests (we don't
# want nonexistent remote nodes showing up in the non-remote tests).
# That complexity is not worth the effort.
stl.level(1, remote_node, "FencingPass,Fencing")
# Add the node (and its remote equivalent) to the list of levels-and nodes.
stt_nodes.extend([node, remote_node])
elif ftype == "levels-or ":
for n in [ node, remote_node ]:
stl.level(1, n, "FencingFail")
stl.level(2, n, "Fencing")
stf_nodes.extend([node, remote_node])
# If any levels-and nodes were targeted by attribute,
# create the attributes and a level for the attribute.
if attr_nodes:
stn = Nodes(self._factory)
for (node_name, node_id) in attr_nodes.items():
stn.add_node(node_name, node_id, { "cts-fencing" : "levels-and" })
stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and")
# Create a Dummy agent that always passes for levels-and
if stt_nodes:
stt = Resource(self._factory, "FencingPass", "fence_dummy", "stonith")
stt["pcmk_host_list"] = " ".join(stt_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stt["random_sleep_range"] = "30"
stt["mode"] = "pass"
stt.commit()
# Create a Dummy agent that always fails for levels-or
if stf_nodes:
stf = Resource(self._factory, "FencingFail", "fence_dummy", "stonith")
stf["pcmk_host_list"] = " ".join(stf_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stf["random_sleep_range"] = "30"
stf["mode"] = "fail"
stf.commit()
# Now commit the levels themselves
stl.commit()
o = Option(self._factory)
- o["stonith-enabled"] = self._cm.Env["DoFencing"]
+ o["stonith-enabled"] = self._cm.env["DoFencing"]
o["start-failure-is-fatal"] = "false"
o["pe-input-series-max"] = "5000"
o["shutdown-escalation"] = "5min"
o["batch-limit"] = "10"
o["dc-deadtime"] = "5s"
o["no-quorum-policy"] = no_quorum
o.commit()
o = OpDefaults(self._factory)
o["timeout"] = "90s"
o.commit()
# Commit the nodes section if we defined one
if stn is not None:
stn.commit()
# Add an alerts section if possible
- if self._factory.rsh.exists_on_all(self._cm.Env["notification-agent"], self._cm.Env["nodes"]):
+ if self._factory.rsh.exists_on_all(self._cm.env["notification-agent"], self._cm.env["nodes"]):
alerts = Alerts(self._factory)
- alerts.add_alert(self._cm.Env["notification-agent"],
- self._cm.Env["notification-recipient"])
+ alerts.add_alert(self._cm.env["notification-agent"],
+ self._cm.env["notification-recipient"])
alerts.commit()
# Add resources?
- if self._cm.Env["CIBResource"]:
+ if self._cm.env["CIBResource"]:
self.add_resources()
# generate cib
self._cib = self._show()
if self._factory.tmpfile != "%s/cib.xml" % BuildOptions.CIB_DIR:
self._factory.rsh(self._factory.target, "rm -f %s" % self._factory.tmpfile)
return self._cib
def add_resources(self):
""" Add various resources and their constraints to the CIB """
# Per-node resources
- for node in self._cm.Env["nodes"]:
+ for node in self._cm.env["nodes"]:
name = "rsc_%s" % node
r = self.new_ip(name)
r.prefer(node, "100")
r.commit()
# Migrator
# Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach
m = Resource(self._factory, "migrator","Dummy", "ocf", "pacemaker")
m["passwd"] = "whatever"
m.add_meta("resource-stickiness","1")
m.add_meta("allow-migrate", "1")
m.add_op("monitor", "P10S")
m.commit()
# Ping the test exerciser
p = Resource(self._factory, "ping-1","ping", "ocf", "pacemaker")
p.add_op("monitor", "60s")
- p["host_list"] = self._cm.Env["cts-exerciser"]
+ p["host_list"] = self._cm.env["cts-exerciser"]
p["name"] = "connected"
p["debug"] = "true"
c = Clone(self._factory, "Connectivity", p)
c["globally-unique"] = "false"
c.commit()
# promotable clone resource
s = Resource(self._factory, "stateful-1", "Stateful", "ocf", "pacemaker")
s.add_op("monitor", "15s", timeout="60s")
s.add_op("monitor", "16s", timeout="60s", role="Promoted")
ms = Clone(self._factory, "promotable-1", s)
ms["promotable"] = "true"
ms["clone-max"] = self._num_nodes
ms["clone-node-max"] = 1
ms["promoted-max"] = 1
ms["promoted-node-max"] = 1
# Require connectivity to run the promotable clone
r = Rule(self._factory, "connected", "-INFINITY", op="or")
r.add_child(Expression(self._factory, "m1-connected-1", "connected", "lt", "1"))
r.add_child(Expression(self._factory, "m1-connected-2", "connected", "not_defined", None))
ms.prefer("connected", rule=r)
ms.commit()
# Group Resource
g = Group(self._factory, "group-1")
g.add_child(self.new_ip())
- if self._cm.Env["have_systemd"]:
+ if self._cm.env["have_systemd"]:
sysd = Resource(self._factory, "petulant", "pacemaker-cts-dummyd@10", "service")
sysd.add_op("monitor", "P10S")
g.add_child(sysd)
else:
g.add_child(self.new_ip())
g.add_child(self.new_ip())
# Make group depend on the promotable clone
g.after("promotable-1", first="promote", then="start")
g.colocate("promotable-1", "INFINITY", withrole="Promoted")
g.commit()
# LSB resource
lsb = Resource(self._factory, "lsb-dummy", "LSBDummy", "lsb")
lsb.add_op("monitor", "5s")
# LSB with group
lsb.after("group-1")
lsb.colocate("group-1")
lsb.commit()
class ConfigFactory:
""" Singleton to generate a CIB file for the environment's schema version """
def __init__(self, cm):
""" Create a new ConfigFactory instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.rsh = self._cm.rsh
- if not self._cm.Env["ListTests"]:
- self.target = self._cm.Env["nodes"][0]
+ if not self._cm.env["ListTests"]:
+ self.target = self._cm.env["nodes"][0]
self.tmpfile = None
def log(self, args):
""" Log a message """
self._cm.log("cib: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("cib: %s" % args)
def create_config(self, name="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION):
""" Return a CIB object for the given schema version """
return CIB(self._cm, name, self)
diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py
index b7ed1eb666..d3508caa9f 100644
--- a/python/pacemaker/_cts/clustermanager.py
+++ b/python/pacemaker/_cts/clustermanager.py
@@ -1,806 +1,806 @@
""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS)
"""
__all__ = ["ClusterManager"]
__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
Certain portions by Huang Zhen are copyright 2004
International Business Machines. The version control history for this file
may have further details."""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
import time
from collections import UserDict
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.CTS import NodeStatus
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.cib import ConfigFactory
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.patterns import PatternSelector
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogWatcher
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
# ClusterManager has a lot of methods.
# pylint: disable=too-many-public-methods
class ClusterManager(UserDict):
'''The Cluster Manager class.
This is an subclass of the Python dictionary class.
(this is because it contains lots of {name,value} pairs,
not because it's behavior is that terribly similar to a
dictionary in other ways.)
This is an abstract class which class implements high-level
operations on the cluster and/or its cluster managers.
Actual cluster managers classes are subclassed from this type.
One of the things we do is track the state we think every node should
be in.
'''
def _final_conditions(self):
for key in list(self.keys()):
if self[key] is None:
raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key)
def __init__(self):
# Eventually, ClusterManager should not be a UserDict subclass. Until
# that point...
# pylint: disable=super-init-not-called
- self.Env = EnvFactory().getInstance()
- self.templates = PatternSelector(self.Env["Name"])
+ self.env = EnvFactory().getInstance()
+ self.templates = PatternSelector(self.env["Name"])
self.logger = LogFactory()
self.data = {}
- self.name = self.Env["Name"]
+ self.name = self.env["Name"]
self.rsh = RemoteFactory().getInstance()
self.expected_status = {}
# pylint: disable=invalid-name
- self.ns = NodeStatus(self.Env)
+ self.ns = NodeStatus(self.env)
self.OurNode = os.uname()[1].lower()
self.__instance_errors_to_ignore = []
self.cib_installed = False
self._final_conditions()
self.CIBsync = {}
self.CibFactory = ConfigFactory(self)
- self.cib = self.CibFactory.create_config(self.Env["Schema"])
+ self.cib = self.CibFactory.create_config(self.env["Schema"])
def __getitem__(self, key):
if key == "Name":
return self.name
print("FIXME: Getting %s from %r" % (key, self))
if key in self.data:
return self.data[key]
return self.templates.get_patterns(key)
def __setitem__(self, key, value):
print("FIXME: Setting %s=%s on %r" % (key, value, self))
self.data[key] = value
def key_for_node(self, node):
return node
def clear_instance_errors_to_ignore(self):
""" Reset instance-specific errors to ignore on each iteration """
self.__instance_errors_to_ignore = []
@property
def instance_errors_to_ignore(self):
""" Return a list of known errors that should be ignored for a specific
test instance
"""
return self.__instance_errors_to_ignore
@property
def errors_to_ignore(self):
""" Return a list of known error messages that should be ignored """
return self.templates.get_patterns("BadNewsIgnore")
def log(self, args):
self.logger.log(args)
def debug(self, args):
self.logger.debug(args)
def upcount(self):
'''How many nodes are up?'''
count = 0
- for node in self.Env["nodes"]:
+ for node in self.env["nodes"]:
if self.expected_status[node] == "up":
count = count + 1
return count
def install_support(self, command="install"):
- for node in self.Env["nodes"]:
+ for node in self.env["nodes"]:
self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command))
def prepare_fencing_watcher(self):
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
if self.has_quorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonithPats = []
- for peer in self.Env["nodes"]:
+ for peer in self.env["nodes"]:
if self.expected_status[peer] == "up":
continue
stonithPats.extend([ self.templates["Pat:Fencing_ok"] % peer,
self.templates["Pat:Fencing_start"] % peer ])
- stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0)
+ stonith = LogWatcher(self.env["LogFileName"], stonithPats, self.env["nodes"], self.env["LogWatcher"], "StartupFencing", 0)
stonith.set_watch()
return stonith
def fencing_cleanup(self, node, stonith):
peer_list = []
peer_state = {}
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.has_quorum(None)
- if not q and len(self.Env["nodes"]) > 2:
+ if not q and len(self.env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
- self.debug("Quorum: %s Len: %d" % (q, len(self.Env["nodes"])))
+ self.debug("Quorum: %s Len: %d" % (q, len(self.env["nodes"])))
return peer_list
- for n in self.Env["nodes"]:
+ for n in self.env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug("looking for: %r" % stonith.regexes)
shot = stonith.look(0)
while shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
# Extract node name
- for n in self.Env["nodes"]:
+ for n in self.env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self.logger.log("ERROR: Unknown stonith match: %r" % shot)
elif not peer in peer_list:
self.debug("Found peer: %s" % peer)
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
- if self.Env["at-boot"]:
+ if self.env["at-boot"]:
self.expected_status[peer] = "up"
else:
self.expected_status[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while stonith.regexes and shot:
self.debug("Found: %r" % shot)
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
- self.ns.wait_for_node(peer, self.Env["DeadTime"])
+ self.ns.wait_for_node(peer, self.env["DeadTime"])
# Poll until it comes up
- if self.Env["at-boot"]:
+ if self.env["at-boot"]:
if not self.stat_cm(peer):
- time.sleep(self.Env["StartTime"])
+ time.sleep(self.env["StartTime"])
if not self.stat_cm(peer):
self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
return None
return peer_list
def start_cm(self, node, verbose=False):
""" Start up the cluster manager on a given node """
if verbose:
self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
else:
self.debug("Starting %s on node %s" % (self.templates["Name"], node))
if not node in self.expected_status:
self.expected_status[node] = "down"
if self.expected_status[node] != "down":
return True
# Technically we should always be able to notice ourselves starting
patterns = [ self.templates["Pat:Local_started"] % node ]
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(
- self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10)
+ self.env["LogFileName"], patterns, self.env["nodes"], self.env["LogWatcher"], "StartaCM", self.env["StartTime"]+10)
self.install_config(node)
self.expected_status[node] = "any"
- if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]):
+ if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
self.logger.log ("%s was already started" % node)
return True
stonith = self.prepare_fencing_watcher()
watch.set_watch()
(rc, _) = self.rsh(node, self.templates["StartCmd"])
if rc != 0:
self.logger.log ("Warn: Start command failed on node %s" % node)
self.fencing_cleanup(node, stonith)
return False
self.expected_status[node] = "up"
watch_result = watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % regex)
- if watch_result and self.cluster_stable(self.Env["DeadTime"]):
+ if watch_result and self.cluster_stable(self.env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
- if self.stat_cm(node) and self.cluster_stable(self.Env["DeadTime"]):
+ if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return True
self.logger.log ("Warn: Start failed for node %s" % node)
return False
def start_cm_async(self, node, verbose=False):
""" Start up the cluster manager on a given node without blocking """
if verbose:
self.logger.log("Starting %s on node %s" % (self["Name"], node))
else:
self.debug("Starting %s on node %s" % (self["Name"], node))
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=False)
self.expected_status[node] = "up"
def stop_cm(self, node, verbose=False, force=False):
""" Stop the cluster manager on a given node """
if verbose:
self.logger.log("Stopping %s on node %s" % (self["Name"], node))
else:
self.debug("Stopping %s on node %s" % (self["Name"], node))
if self.expected_status[node] != "up" and not force:
return True
(rc, _) = self.rsh(node, self.templates["StopCmd"])
if rc == 0:
# Make sure we can continue even if corosync leaks
self.expected_status[node] = "down"
- self.cluster_stable(self.Env["DeadTime"])
+ self.cluster_stable(self.env["DeadTime"])
return True
self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
return False
def stop_cm_async(self, node):
""" Stop the cluster manager on a given node without blocking """
self.debug("Stopping %s on node %s" % (self["Name"], node))
self.rsh(node, self.templates["StopCmd"], synchronous=False)
self.expected_status[node] = "down"
def startall(self, nodelist=None, verbose=False, quick=False):
""" Start the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
if not nodelist:
- nodelist = self.Env["nodes"]
+ nodelist = self.env["nodes"]
for node in nodelist:
if self.expected_status[node] == "down":
self.ns.wait_for_all_nodes(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
return self.start_cm(nodelist[0], verbose=verbose)
# Approximation of SimulStartList for --boot
watchpats = [ self.templates["Pat:DC_IDLE"] ]
for node in nodelist:
watchpats.extend([ self.templates["Pat:InfraUp"] % node,
self.templates["Pat:PacemakerUp"] % node,
self.templates["Pat:Local_started"] % node,
self.templates["Pat:They_up"] % (nodelist[0], node) ])
# Start all the nodes - at about the same time...
- watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10)
+ watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"], self.env["LogWatcher"], "fast-start", self.env["DeadTime"]+10)
watch.set_watch()
if not self.start_cm(nodelist[0], verbose=verbose):
return False
for node in nodelist:
self.start_cm_async(node, verbose=verbose)
watch.look_for_all()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % regex)
if not self.cluster_stable():
self.logger.log("Cluster did not stabilize")
return False
return True
def stopall(self, nodelist=None, verbose=False, force=False):
""" Stop the cluster manager on every node in the cluster, or on every
node in nodelist if not None
"""
ret = True
if not nodelist:
- nodelist = self.Env["nodes"]
- for node in self.Env["nodes"]:
+ nodelist = self.env["nodes"]
+ for node in self.env["nodes"]:
if self.expected_status[node] == "up" or force:
if not self.stop_cm(node, verbose=verbose, force=force):
ret = False
return ret
def statall(self, nodelist=None):
'''Return the status of the cluster managers in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
result = {}
if not nodelist:
- nodelist = self.Env["nodes"]
+ nodelist = self.env["nodes"]
for node in nodelist:
if self.stat_cm(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
'''isolate the communication between the nodes'''
if not nodes:
- nodes = self.Env["nodes"]
+ nodes = self.env["nodes"]
for node in nodes:
if node == target:
continue
(rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
if rc != 0:
self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
return False
self.debug("Communication cut between %s and %s" % (target, node))
return True
def unisolate_node(self, target, nodes=None):
'''fix the communication between the nodes'''
if not nodes:
- nodes = self.Env["nodes"]
+ nodes = self.env["nodes"]
for node in nodes:
if node == target:
continue
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False)
self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False)
self.debug("Communication restored between %s and %s" % (target, node))
def oprofile_start(self, node=None):
if not node:
- for n in self.Env["oprofile"]:
+ for n in self.env["oprofile"]:
self.oprofile_start(n)
- elif node in self.Env["oprofile"]:
+ elif node in self.env["oprofile"]:
self.debug("Enabling oprofile on %s" % node)
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
def oprofile_save(self, test, node=None):
if not node:
- for n in self.Env["oprofile"]:
+ for n in self.env["oprofile"]:
self.oprofile_save(test, n)
- elif node in self.Env["oprofile"]:
+ elif node in self.env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, "opcontrol --save=cts.%d" % test)
# Read back with: opreport -l session:cts.0 image:/c*
self.oprofile_stop(node)
self.oprofile_start(node)
def oprofile_stop(self, node=None):
if not node:
- for n in self.Env["oprofile"]:
+ for n in self.env["oprofile"]:
self.oprofile_stop(n)
- elif node in self.Env["oprofile"]:
+ elif node in self.env["oprofile"]:
self.debug("Stopping oprofile on %s" % node)
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
def install_config(self, node):
if not self.ns.wait_for_node(node):
self.log("Node %s is not up." % node)
return
- if node in self.CIBsync or not self.Env["ClobberCIB"]:
+ if node in self.CIBsync or not self.env["ClobberCIB"]:
return
self.CIBsync[node] = True
self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
# Only install the CIB on the first node, all the other ones will pick it up from there
if self.cib_installed:
return
self.cib_installed = True
- if self.Env["CIBfilename"] is None:
+ if self.env["CIBfilename"] is None:
self.log("Installing Generated CIB on node %s" % node)
self.cib.install(node)
else:
- self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node))
+ self.log("Installing CIB (%s) on node %s" % (self.env["CIBfilename"], node))
- rc = self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
+ rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
if rc != 0:
raise ValueError("Can not scp file to %s %d" % (node, rc))
self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR))
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
self.partitions_expected = 1
- for node in self.Env["nodes"]:
+ for node in self.env["nodes"]:
self.expected_status[node] = ""
- if self.Env["experimental-tests"]:
+ if self.env["experimental-tests"]:
self.unisolate_node(node)
self.stat_cm(node)
def test_node_cm(self, node):
'''Report the status of the cluster manager on a given node'''
watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)",
self.templates["Pat:NonDC_started"] % node,
self.templates["Pat:DC_started"] % node ]
- idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle")
+ idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node], self.env["LogWatcher"], "ClusterIdle")
idle_watch.set_watch()
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if not out:
out = ""
else:
out = out[0].strip()
self.debug("Node %s status: '%s'" % (node, out))
if out.find('ok') < 0:
if self.expected_status[node] == "up":
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, "down", self.expected_status[node]))
self.expected_status[node] = "down"
return 0
if self.expected_status[node] == "down":
self.log(
"Node status for %s is %s but we think it should be %s: %s"
% (node, "up", self.expected_status[node], out))
self.expected_status[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" % (node, out))
return 1
# Up and stable
return 2
def stat_cm(self, node):
""" Report the status of the cluster manager on a given node """
return self.test_node_cm(node) > 0
# Being up and being stable is not the same question...
def node_stable(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_cm(node) == 2:
return True
self.log("Warn: Node %s not stable" % node)
return False
def partition_stable(self, nodes, timeout=None):
watchpats = [ "Current ping state: S_IDLE",
self.templates["Pat:DC_IDLE"] ]
self.debug("Waiting for cluster stability...")
if timeout is None:
- timeout = self.Env["DeadTime"]
+ timeout = self.env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return True
- idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout)
+ idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(), self.env["LogWatcher"], "ClusterStable", timeout)
idle_watch.set_watch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return True
ret = idle_watch.look()
self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout))
return False
def cluster_stable(self, timeout=None, double_check=False):
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
if not double_check:
return True
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return False
return True
def is_node_dc(self, node, status_line=None):
if not status_line:
(_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
if out:
status_line = out[0].strip()
if not status_line:
return False
if status_line.find('S_IDLE') != -1:
return True
if status_line.find('S_INTEGRATION') != -1:
return True
if status_line.find('S_FINALIZE_JOIN') != -1:
return True
if status_line.find('S_POLICY_ENGINE') != -1:
return True
if status_line.find('S_TRANSITION_ENGINE') != -1:
return True
return False
def active_resources(self, node):
(_, output) = self.rsh(node, "crm_resource -c", verbose=1)
resources = []
for line in output:
if not re.search("^Resource", line):
continue
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
def resource_location(self, rid):
ResourceNodes = []
- for node in self.Env["nodes"]:
+ for node in self.env["nodes"]:
if self.expected_status[node] != "up":
continue
cmd = self.templates["RscRunning"] % rid
(rc, lines) = self.rsh(node, cmd)
if rc == 127:
self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
for line in lines:
self.log("Output: %s " % line)
elif rc == 0:
ResourceNodes.append(node)
return ResourceNodes
def find_partitions(self):
ccm_partitions = []
- for node in self.Env["nodes"]:
+ for node in self.env["nodes"]:
if self.expected_status[node] != "up":
self.debug("Node %s is down... skipping" % node)
continue
(_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
if not out:
self.log("no partition details for %s" % node)
continue
partition = out[0].strip()
if len(partition) <= 2:
self.log("bad partition details for %s" % node)
continue
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" % (node, partition))
ccm_partitions.append(partition)
else:
self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
self.debug("Found partitions: %r" % ccm_partitions)
return ccm_partitions
def has_quorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
- node_list = self.Env["nodes"]
+ node_list = self.env["nodes"]
for node in node_list:
if self.expected_status[node] != "up":
continue
(_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
quorum = quorum[0].strip()
if quorum.find("1") != -1:
return True
if quorum.find("0") != -1:
return False
self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum))
return False
@property
def components(self):
raise NotImplementedError
def standby_status(self, node):
(_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
if not out:
return "off"
out = out[0].strip()
self.debug("Standby result: %s" % out)
return out
# status == "on" : Enter Standby mode
# status == "off": Enter Active mode
def set_standby_mode(self, node, status):
current_status = self.standby_status(node)
if current_status == status:
return True
cmd = self.templates["StandbyCmd"] % (node, status)
(rc, _) = self.rsh(node, cmd)
return rc == 0
def add_dummy_rsc(self, node, rid):
rsc_xml = """ '
'""" % (rid, rid)
constraint_xml = """ '
'
""" % (rid, node, node, rid)
self.rsh(node, self.templates['CibAddXml'] % rsc_xml)
self.rsh(node, self.templates['CibAddXml'] % constraint_xml)
def remove_dummy_rsc(self, node, rid):
constraint = "\"//rsc_location[@rsc='%s']\"" % rid
rsc = "\"//primitive[@id='%s']\"" % rid
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py
index 86466770f4..6c3de3ab40 100644
--- a/python/pacemaker/_cts/scenarios.py
+++ b/python/pacemaker/_cts/scenarios.py
@@ -1,408 +1,408 @@
""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence" ]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker._cts.audits import ClusterAudit
from pacemaker._cts.input import should_continue
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.watcher import LogWatcher
class ScenarioComponent:
""" The base class for all scenario components. A scenario component is
one single step in a scenario. Each component is basically just a setup
and teardown method.
"""
def __init__(self, cm, env):
""" Create a new ScenarioComponent instance
Arguments:
cm -- A ClusterManager instance
env -- An Environment instance
"""
# pylint: disable=invalid-name
self._cm = cm
self._env = env
def is_applicable(self):
""" Return True if this component is applicable in the given Environment.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def setup(self):
""" Set up the component, returning True on success. This method must be
provided by all subclasses.
"""
raise NotImplementedError
def teardown(self):
""" Tear down the given component. This method must be provided by all
subclasses.
"""
raise NotImplementedError
class Scenario:
""" The base class for scenario. A scenario is an ordered list of
ScenarioComponent objects. A scenario proceeds by setting up all its
components in sequence, running a list of tests and audits, and then
tearing down its components in reverse.
"""
def __init__(self, cm, components, audits, tests):
""" Create a new Scenario instance
Arguments:
cm -- A ClusterManager instance
components -- A list of ScenarioComponents comprising this Scenario
audits -- A list of ClusterAudits that will be performed as
part of this Scenario
tests -- A list of CTSTests that will be run
"""
# pylint: disable=invalid-name
self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 }
self.tests = tests
self._audits = audits
self._bad_news = None
self._cm = cm
self._components = components
for comp in components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def is_applicable(self):
""" Return True if all ScenarioComponents are applicable """
for comp in self._components:
if not comp.is_applicable():
return False
return True
def setup(self):
""" Set up the scenario, returning True on success. If setup fails at
some point, tear down those components that did successfully set up.
"""
self._cm.prepare()
self.audit() # Also detects remote/local log config
- self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"])
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
self.audit()
self._cm.install_support()
- self._bad_news = LogWatcher(self._cm.Env["LogFileName"],
+ self._bad_news = LogWatcher(self._cm.env["LogFileName"],
self._cm.templates.get_patterns("BadNews"),
- self._cm.Env["nodes"],
- self._cm.Env["LogWatcher"],
+ self._cm.env["nodes"],
+ self._cm.env["LogWatcher"],
"BadNews", 0)
self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self._components):
if not self._components[j].setup():
# OOPS! We failed. Tear partial setups down.
self.audit()
self._cm.log("Tearing down partial setup")
self.teardown(j)
return False
j += 1
self.audit()
return True
def teardown(self, n_components=None):
""" Tear down the scenario in the reverse order it was set up. If
n_components is not None, only tear down that many components.
"""
if not n_components:
n_components = len(self._components)-1
j = n_components
while j >= 0:
self._components[j].teardown()
j -= 1
self.audit()
self._cm.install_support("uninstall")
def incr(self, name):
""" Increment the given stats key """
if not name in self.stats:
self.stats[name] = 0
self.stats[name] += 1
def run(self, iterations):
""" Run all tests in the scenario the given number of times """
self._cm.oprofile_start()
try:
self._run_loop(iterations)
self._cm.oprofile_stop()
except:
self._cm.oprofile_stop()
raise
def _run_loop(self, iterations):
""" Do the hard part of the run method - actually run all the tests the
given number of times.
"""
raise NotImplementedError
def run_test(self, test, testcount):
""" Run the given test. testcount is the number of tests (including
this one) that have been run across all iterations.
"""
- nodechoice = self._cm.Env.random_node()
+ nodechoice = self._cm.env.random_node()
ret = True
did_run = False
self._cm.clear_instance_errors_to_ignore()
choice = "(%s)" % nodechoice
self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount))
starttime = test.set_timer()
if not test.setup(nodechoice):
self._cm.log("Setup failed")
ret = False
elif not test.can_run_now(nodechoice):
self._cm.log("Skipped")
test.skipped()
else:
did_run = True
ret = test(nodechoice)
if not test.teardown(nodechoice):
self._cm.log("Teardown failed")
- if not should_continue(self._cm.Env):
+ if not should_continue(self._cm.env):
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = False
stoptime = time.time()
self._cm.oprofile_save(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if "min_time" not in test.stats:
test.stats["elapsed_time"] = elapsed_time
test.stats["min_time"] = test_time
test.stats["max_time"] = test_time
else:
test.stats["elapsed_time"] += elapsed_time
if test_time < test.stats["min_time"]:
test.stats["min_time"] = test_time
if test_time > test.stats["max_time"]:
test.stats["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self._cm.statall()
did_run = True # Force the test count to be incremented anyway so test extraction works
self.audit(test.errors_to_ignore)
return did_run
def summarize(self):
""" Output scenario results """
self._cm.log("****************")
self._cm.log("Overall Results:%r" % self.stats)
self._cm.log("****************")
stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0 }
self._cm.log("Test Summary")
for test in self.tests:
for key in stat_filter:
stat_filter[key] = test.stats[key]
name = "Test %s:" % test.name
self._cm.log("{:<25} {!r}".format(name, stat_filter))
self._cm.debug("Detailed Results")
for test in self.tests:
name = "Test %s:" % test.name
self._cm.debug("{:<25} {!r}".format(name, stat_filter))
self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, local_ignore=None):
""" Perform all scenario audits and log results. If there are too many
failures, prompt the user to confirm that the scenario should continue
running.
"""
errcount = 0
ignorelist = ["CTS:"]
if local_ignore:
ignorelist.extend(local_ignore)
ignorelist.extend(self._cm.errors_to_ignore)
ignorelist.extend(self._cm.instance_errors_to_ignore)
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self._audits:
if not audit():
self._cm.log("Audit %s FAILED." % audit.name)
failed += 1
else:
self._cm.debug("Audit %s passed." % audit.name)
while errcount < 1000:
match = None
if self._bad_news:
match = self._bad_news.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._cm.log("BadNews: %s" % match)
self.incr("BadNews")
errcount += 1
else:
break
else:
print("Big problems")
- if not should_continue(self._cm.Env):
+ if not should_continue(self._cm.env):
self._cm.log("Shutting down.")
self.summarize()
self.teardown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self._bad_news:
self._bad_news.end()
return failed
class AllOnce(Scenario):
""" Every Test Once """
def _run_loop(self, iterations):
testcount = 1
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
""" Random Test Execution """
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
- test = self._cm.Env.random_gen.choice(self.tests)
+ test = self._cm.env.random_gen.choice(self.tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
""" Named Tests in Sequence """
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
""" Start the Cluster """
def _run_loop(self, iterations):
return
class BootCluster(ScenarioComponent):
""" The BootCluster component simply starts the cluster manager on all
nodes, waiting for each to come up before starting given that a node
might have been rebooted or crashed beforehand.
"""
def is_applicable(self):
""" BootCluster is always applicable """
return True
def setup(self):
""" Set up the component, returning True on success """
self._cm.prepare()
# Clear out the cobwebs ;-)
self._cm.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
self._cm.log("Starting Cluster Manager on all nodes.")
return self._cm.startall(verbose=True, quick=True)
def teardown(self):
""" Tear down the component """
self._cm.log("Stopping Cluster Manager on all nodes")
self._cm.stopall(verbose=True, force=False)
class LeaveBooted(BootCluster):
""" The LeaveBooted component leaves all nodes up when the scenario
is complete.
"""
def teardown(self):
""" Tear down the component """
self._cm.log("Leaving Cluster running on all nodes")