Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index c3eb402c51..dc66f964a1 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1029 +1,1029 @@
""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
""" The base class for various kinds of auditors. Specific audit implementations
should be built on top of this one. Audits can do all kinds of checks on the
system. The basic interface for callers is the `__call__` method, which
returns True if the audit passes and False if it fails.
"""
def __init__(self, cm):
""" Create a new ClusterAudit instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
raise NotImplementedError
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
""" Log a message """
self._cm.log("audit: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("audit: %s" % args)
class LogAudit(ClusterAudit):
""" Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that
we can read back that test message using logging tools.
"""
def __init__(self, cm):
""" Create a new LogAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
""" Restart logging on the given nodes, or all if none are given """
if not nodes:
nodes = self._cm.env["nodes"]
self._cm.debug("Restarting logging on: %r" % nodes)
for node in nodes:
if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log("ERROR: Cannot stop 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log("ERROR: Cannot start 'systemd-journald' on %s" % node)
(rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"])
if rc != 0:
self._cm.log("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node))
def _create_watcher(self, patterns, kind):
""" Create a new LogWatcher instance for the given patterns """
watch = LogWatcher(self._cm.env["LogFileName"], patterns,
self._cm.env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
""" Perform the log audit """
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self._cm.env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
watch_pref = self._cm.env["LogWatcher"]
if watch_pref == LogKind.ANY:
kinds = [LogKind.FILE]
if self._cm.env["have_systemd"]:
kinds += [LogKind.JOURNAL]
kinds += [LogKind.REMOTE_FILE]
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log("Logging test message with identifier %s" % suffix)
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.env["nodes"]:
cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix)
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
for k in list(watch.keys()):
w = watch[k]
if watch_pref == LogKind.ANY:
self._cm.log("Checking for test message in %s logs" % k)
w.look_for_all(silent=True)
if w.unmatched:
for regex in w.unmatched:
self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
else:
if watch_pref == LogKind.ANY:
self._cm.log("Found test message in %s logs" % k)
self._cm.env["LogWatcher"] = k
return 1
return False
def __call__(self):
max_attempts = 3
attempt = 0
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60*attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
return False
return True
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
if self._cm.env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
""" Audit disk usage on cluster nodes to verify that there is enough free
space left on whichever mounted file system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
""" Create a new DiskAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
result = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
- % (dfout, node, used, remain))
+ % (dfout, node, used, remain))
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
- % (node, used_percent, remaining_mb))
+ % (node, used_percent, remaining_mb))
result = False
if not should_continue(self._cm.env):
raise ValueError("Disk full on %s" % node)
elif remaining_mb < 100 or used_percent > 90:
self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class FileAudit(ClusterAudit):
""" Audit the filesystem looking for various failure conditions:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
""" Create a new FileAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def __call__(self):
result = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
(_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
for line in lsout:
line = line.strip()
if line not in self.known:
result = False
self.known.append(line)
self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
result = False
clean = True
self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug("ps[%s]: %s" % (node, line))
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug("Skipping %s" % node)
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
return True
class AuditResource:
""" A base class for storing information about a cluster resource """
def __init__(self, cm, line):
""" Create a new AuditResource instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
""" Is this resource unique? """
return self.flags & 0x20
@property
def orphan(self):
""" Is this resource an orphan? """
return self.flags & 0x01
@property
def managed(self):
""" Is this resource managed by the cluster? """
return self.flags & 0x02
class AuditConstraint:
""" A base class for storing information about a cluster constraint """
def __init__(self, cm, line):
""" Create a new AuditConstraint instance
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
""" Audit primitive resources to verify a variety of conditions, including that
they are active and managed only when expected; they are active on the
expected clusted node; and that they are not orphaned.
"""
def __init__(self, cm):
""" Create a new PrimitiveAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
""" Perform the audit of a single resource """
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug("Resource %s active on %r" % (resource.id, active))
elif resource.needs_quorum == 1:
self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
rc = False
elif not resource.managed:
self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
else:
self.debug("Non-unique resource %s is not active" % resource.id)
elif len(active) > 1:
self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
rc = False
elif resource.orphan:
self.debug("Resource %s is an inactive orphan" % resource.id)
elif not self._inactive_nodes:
self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
rc = False
elif self._cm.env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
- % (resource.id, self._inactive_nodes))
+ % (resource.id, self._inactive_nodes))
else:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
- % (resource.id, self._inactive_nodes))
+ % (resource.id, self._inactive_nodes))
elif quorum or not resource.needs_quorum:
self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
- % (resource.id, self._inactive_nodes))
+ % (resource.id, self._inactive_nodes))
return rc
def _setup(self):
""" Verify cluster nodes are active, and collect resource and colocation
information used for performing the audit.
"""
for node in self._cm.env["nodes"]:
if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.env["nodes"]:
if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug("No nodes active - skipping %s" % self.name)
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log("Unknown entry: %s" % line)
return True
def __call__(self):
result = True
if not self._setup():
return result
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
result = False
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
""" Audit group resources to verify that each of its child primitive
resources is active on the expected cluster node.
"""
def __init__(self, cm):
""" Create a new GroupAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
result = True
if not self._setup():
return result
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
result = False
self._cm.log("Child %s of %s is active more than once: %r"
- % (child.id, group.id, nodes))
+ % (child.id, group.id, nodes))
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug("Child %s of %s is stopped" % (child.id, group.id))
elif nodes[0] != group_location:
result = False
self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
- % (child.id, group.id, nodes[0], group_location))
+ % (child.id, group.id, nodes[0], group_location))
else:
self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
return result
class CloneAudit(PrimitiveAudit):
""" Audit clone resources. NOTE: Currently, this class does not perform
any actual audit functions.
"""
def __init__(self, cm):
""" Create a new CloneAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
result = True
if not self._setup():
return result
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug("Checking child %s of %s..." % (child.id, clone.id))
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return result
class ColocationAudit(PrimitiveAudit):
""" Audit cluster resources to verify that those that should be colocated
with each other actually are.
"""
def __init__(self, cm):
""" Create a new ColocationAudit instance
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
""" Return a list of cluster nodes where a given resource is running """
(rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
result = True
if not self._setup():
return result
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
else:
for node in source:
if not node in target:
result = False
self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
- % (coloc.id, coloc.rsc, node, target))
+ % (coloc.id, coloc.rsc, node, target))
else:
self.debug("Colocation audit (%s): %s running on %s (in %r)"
- % (coloc.id, coloc.rsc, node, target))
+ % (coloc.id, coloc.rsc, node, target))
return result
class ControllerStateAudit(ClusterAudit):
""" Audit cluster nodes to verify that those we expect to be active are
active, and those that are expected to be inactive are inactive.
"""
def __init__(self, cm):
""" Create a new ControllerStateAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
result = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.env["nodes"]:
should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
result = False
self._cm.log("Cluster is not stable: %d (of %d): %r"
- % (len(unstable_list), self._cm.upcount(), unstable_list))
+ % (len(unstable_list), self._cm.upcount(), unstable_list))
if up_are_down > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be up were down."
- % (up_are_down, len(self._cm.env["nodes"])))
+ % (up_are_down, len(self._cm.env["nodes"])))
if down_are_up > 0:
result = False
self._cm.log("%d (of %d) nodes expected to be down were up."
- % (down_are_up, len(self._cm.env["nodes"])))
+ % (down_are_up, len(self._cm.env["nodes"])))
return result
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
""" Audit the CIB by verifying that it is identical across cluster nodes """
def __init__(self, cm):
""" Create a new CIBAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return result
for partition in ccm_partitions:
self.debug("\tAuditing CIB consistency for: %s" % partition)
if self._audit_cib_contents(partition) == 0:
result = False
return result
def _audit_cib_contents(self, hostlist):
""" Perform the CIB audit on the given hosts """
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node)
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log("Could not perform audit: No configuration from %s" % node0)
passed = False
else:
(rc, result) = self._cm.rsh(
node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
if rc != 0:
self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
passed = False
for line in result:
if not re.search("<diff/>", line):
passed = False
self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
else:
self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
return passed
def _store_remote_cib(self, node, target):
""" Store a copy of the given node's CIB on the given target node. If
no target is given, store the CIB on the given node.
"""
filename = "/tmp/ctsaudit.%s.xml" % node
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", "rm -f %s" % filename)
for line in lines:
self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
""" Audit each partition in a cluster to verify a variety of conditions:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
""" Create a new PartitionAudit instance
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
result = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return result
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
result = False
for partition in ccm_partitions:
self._cm.log("\t %s" % partition)
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
result = False
return result
def _trim_string(self, avalue):
""" Remove the last character from a multi-character string """
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
""" Remove the last character from a multi-character string and convert
the result to an int.
"""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
""" Perform the audit of a single partition """
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug("Auditing partition: %s" % partition)
for node in node_list:
if self._cm.expected_status[node] != "up":
self._cm.log("Warn: Node %s appeared out of nowhere" % node)
self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log("Lowest epoch not determined in %s" % partition)
passed = False
for node in node_list:
if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug("%s: OK" % node)
elif not self._node_epoch[node]:
self.debug("Check on %s ignored: no node epoch" % node)
elif not lowest_epoch:
self.debug("Check on %s ignored: no lowest epoch" % node)
else:
self._cm.log("DC %s is not the oldest node (%d vs. %d)"
- % (node, self._node_epoch[node], lowest_epoch))
+ % (node, self._node_epoch[node], lowest_epoch))
passed = False
if not dc_found:
self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
- % (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
+ % (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
self._cm.log("%d DCs (%s) found in cluster partition: %s"
- % (len(dc_found), str(dc_found), str(node_list)))
+ % (len(dc_found), str(dc_found), str(node_list)))
passed = False
if not passed:
for node in node_list:
if self._cm.expected_status[node] == "up":
self._cm.log("epoch %s : %s"
- % (self._node_epoch[node], self._node_state[node]))
+ % (self._node_epoch[node], self._node_state[node]))
return passed
def is_applicable(self):
""" Return True if this audit is applicable in the current test configuration. """
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
#if self._cm["Name"] == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
""" Return a list of instances of applicable audits that can be performed
for the given ClusterManager.
"""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/cib.py b/python/pacemaker/_cts/cib.py
index fba76bfbb3..b8b5d5d3b7 100644
--- a/python/pacemaker/_cts/cib.py
+++ b/python/pacemaker/_cts/cib.py
@@ -1,424 +1,425 @@
""" CIB generator for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["ConfigFactory"]
__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import warnings
import tempfile
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.cibxml import Alerts, Clone, Expression, FencingTopology, Group, Nodes, OpDefaults, Option, Resource, Rule
from pacemaker._cts.network import next_ip
class CIB:
""" A class for generating, representing, and installing a CIB file onto
cluster nodes
"""
def __init__(self, cm, version, factory, tmpfile=None):
""" Create a new CIB instance
Arguments:
cm -- A ClusterManager instance
version -- The schema syntax version
factory -- A ConfigFactory instance
tmpfile -- Where to store the CIB, or None to use a new tempfile
"""
# pylint: disable=invalid-name
self._cib = None
self._cm = cm
self._counter = 1
self._factory = factory
self._num_nodes = 0
self.version = version
if not tmpfile:
warnings.filterwarnings("ignore")
# pylint: disable=consider-using-with
f = tempfile.NamedTemporaryFile(delete=True)
f.close()
tmpfile = f.name
warnings.resetwarnings()
self._factory.tmpfile = tmpfile
def _show(self):
""" Query a cluster node for its generated CIB; log and return the result """
output = ""
(_, result) = self._factory.rsh(self._factory.target, "HOME=/root CIB_file=%s cibadmin -Ql" % self._factory.tmpfile, verbose=1)
for line in result:
output += line
self._factory.debug("Generated Config: %s" % line)
return output
def new_ip(self, name=None):
""" Generate an IP resource for the next available IP address, optionally
specifying the resource's name.
"""
if self._cm.env["IPagent"] == "IPaddr2":
ip = next_ip(self._cm.env["IPBase"])
if not name:
if ":" in ip:
(_, _, suffix) = ip.rpartition(":")
name = "r%s" % suffix
else:
name = "r%s" % ip
r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
r["ip"] = ip
if ":" in ip:
r["cidr_netmask"] = "64"
r["nic"] = "eth0"
else:
r["cidr_netmask"] = "32"
else:
if not name:
name = "r%s%d" % (self._cm.env["IPagent"], self._counter)
self._counter += 1
r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
r.add_op("monitor", "5s")
return r
def get_node_id(self, node_name):
""" Check the cluster configuration for the node ID for the given node_name """
# We can't account for every possible configuration,
# so we only return a node ID if:
# * The node is specified in /etc/corosync/corosync.conf
# with "ring0_addr:" equal to node_name and "nodeid:"
# explicitly specified.
# In all other cases, we return 0.
node_id = 0
# awkward command: use } as record separator
# so each corosync.conf "object" is one record;
# match the "node {" record that has "ring0_addr: node_name";
# then print the substring of that record after "nodeid:"
- (rc, output) = self._factory.rsh(self._factory.target,
- r"""awk -v RS="}" """
- r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/"""
- r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s"""
- % (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1)
+ awk = r"""awk -v RS="}" """ \
+ r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/""" \
+ r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s""" \
+ % (node_name, BuildOptions.COROSYNC_CONFIG_FILE)
+
+ (rc, output) = self._factory.rsh(self._factory.target, awk, verbose=1)
if rc == 0 and len(output) == 1:
try:
node_id = int(output[0])
except ValueError:
node_id = 0
return node_id
def install(self, target):
""" Generate a CIB file and install it to the given cluster node """
old = self._factory.tmpfile
# Force a rebuild
self._cib = None
self._factory.tmpfile = "%s/cib.xml" % BuildOptions.CIB_DIR
self.contents(target)
self._factory.rsh(self._factory.target, "chown %s %s" % (BuildOptions.DAEMON_USER, self._factory.tmpfile))
self._factory.tmpfile = old
def contents(self, target):
""" Generate a complete CIB file """
# fencing resource
if self._cib:
return self._cib
if target:
self._factory.target = target
self._factory.rsh(self._factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self._factory.tmpfile))
self._num_nodes = len(self._cm.env["nodes"])
no_quorum = "stop"
if self._num_nodes < 3:
no_quorum = "ignore"
self._factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self._num_nodes)
# We don't need a nodes section unless we add attributes
stn = None
# Fencing resource
# Define first so that the shell doesn't reject every update
if self._cm.env["DoFencing"]:
# Define the "real" fencing device
st = Resource(self._factory, "Fencing", self._cm.env["stonith-type"], "stonith")
# Set a threshold for unreliable stonith devices such as the vmware one
st.add_meta("migration-threshold", "5")
st.add_op("monitor", "120s", timeout="120s")
st.add_op("stop", "0", timeout="60s")
st.add_op("start", "0", timeout="60s")
# For remote node tests, a cluster node is stopped and brought back up
# as a remote node with the name "remote-OLDNAME". To allow fencing
# devices to fence these nodes, create a list of all possible node names.
all_node_names = [prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-')]
# Add all parameters specified by user
entries = self._cm.env["stonith-params"].split(',')
for entry in entries:
try:
(name, value) = entry.split('=', 1)
except ValueError:
print("Warning: skipping invalid fencing parameter: %s" % entry)
continue
# Allow user to specify "all" as the node list, and expand it here
if name in ["hostlist", "pcmk_host_list"] and value == "all":
value = ' '.join(all_node_names)
st[name] = value
st.commit()
# Test advanced fencing logic
stf_nodes = []
stt_nodes = []
attr_nodes = {}
# Create the levels
stl = FencingTopology(self._factory)
for node in self._cm.env["nodes"]:
# Remote node tests will rename the node
remote_node = "remote-%s" % node
# Randomly assign node to a fencing method
ftype = self._cm.env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
# For levels-and, randomly choose targeting by node name or attribute
by = ""
if ftype == "levels-and":
node_id = self.get_node_id(node)
if node_id == 0 or self._cm.env.random_gen.choice([True, False]):
by = " (by name)"
else:
attr_nodes[node] = node_id
by = " (by attribute)"
self._cm.log(" - Using %s fencing for node: %s%s" % (ftype, node, by))
if ftype == "levels-and":
# If targeting by name, add a topology level for this node
if node not in attr_nodes:
stl.level(1, node, "FencingPass,Fencing")
# Always target remote nodes by name, otherwise we would need to add
# an attribute to the remote node only during remote tests (we don't
# want nonexistent remote nodes showing up in the non-remote tests).
# That complexity is not worth the effort.
stl.level(1, remote_node, "FencingPass,Fencing")
# Add the node (and its remote equivalent) to the list of levels-and nodes.
stt_nodes.extend([node, remote_node])
elif ftype == "levels-or ":
for n in [node, remote_node]:
stl.level(1, n, "FencingFail")
stl.level(2, n, "Fencing")
stf_nodes.extend([node, remote_node])
# If any levels-and nodes were targeted by attribute,
# create the attributes and a level for the attribute.
if attr_nodes:
stn = Nodes(self._factory)
for (node_name, node_id) in attr_nodes.items():
stn.add_node(node_name, node_id, {"cts-fencing": "levels-and"})
stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and")
# Create a Dummy agent that always passes for levels-and
if stt_nodes:
stt = Resource(self._factory, "FencingPass", "fence_dummy", "stonith")
stt["pcmk_host_list"] = " ".join(stt_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stt["random_sleep_range"] = "30"
stt["mode"] = "pass"
stt.commit()
# Create a Dummy agent that always fails for levels-or
if stf_nodes:
stf = Resource(self._factory, "FencingFail", "fence_dummy", "stonith")
stf["pcmk_host_list"] = " ".join(stf_nodes)
# Wait this many seconds before doing anything, handy for letting disks get flushed too
stf["random_sleep_range"] = "30"
stf["mode"] = "fail"
stf.commit()
# Now commit the levels themselves
stl.commit()
o = Option(self._factory)
o["stonith-enabled"] = self._cm.env["DoFencing"]
o["start-failure-is-fatal"] = "false"
o["pe-input-series-max"] = "5000"
o["shutdown-escalation"] = "5min"
o["batch-limit"] = "10"
o["dc-deadtime"] = "5s"
o["no-quorum-policy"] = no_quorum
o.commit()
o = OpDefaults(self._factory)
o["timeout"] = "90s"
o.commit()
# Commit the nodes section if we defined one
if stn is not None:
stn.commit()
# Add an alerts section if possible
if self._factory.rsh.exists_on_all(self._cm.env["notification-agent"], self._cm.env["nodes"]):
alerts = Alerts(self._factory)
alerts.add_alert(self._cm.env["notification-agent"],
self._cm.env["notification-recipient"])
alerts.commit()
# Add resources?
if self._cm.env["CIBResource"]:
self.add_resources()
# generate cib
self._cib = self._show()
if self._factory.tmpfile != "%s/cib.xml" % BuildOptions.CIB_DIR:
self._factory.rsh(self._factory.target, "rm -f %s" % self._factory.tmpfile)
return self._cib
def add_resources(self):
""" Add various resources and their constraints to the CIB """
# Per-node resources
for node in self._cm.env["nodes"]:
name = "rsc_%s" % node
r = self.new_ip(name)
r.prefer(node, "100")
r.commit()
# Migrator
# Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach
m = Resource(self._factory, "migrator", "Dummy", "ocf", "pacemaker")
m["passwd"] = "whatever"
m.add_meta("resource-stickiness", "1")
m.add_meta("allow-migrate", "1")
m.add_op("monitor", "P10S")
m.commit()
# Ping the test exerciser
p = Resource(self._factory, "ping-1", "ping", "ocf", "pacemaker")
p.add_op("monitor", "60s")
p["host_list"] = self._cm.env["cts-exerciser"]
p["name"] = "connected"
p["debug"] = "true"
c = Clone(self._factory, "Connectivity", p)
c["globally-unique"] = "false"
c.commit()
# promotable clone resource
s = Resource(self._factory, "stateful-1", "Stateful", "ocf", "pacemaker")
s.add_op("monitor", "15s", timeout="60s")
s.add_op("monitor", "16s", timeout="60s", role="Promoted")
ms = Clone(self._factory, "promotable-1", s)
ms["promotable"] = "true"
ms["clone-max"] = self._num_nodes
ms["clone-node-max"] = 1
ms["promoted-max"] = 1
ms["promoted-node-max"] = 1
# Require connectivity to run the promotable clone
r = Rule(self._factory, "connected", "-INFINITY", op="or")
r.add_child(Expression(self._factory, "m1-connected-1", "connected", "lt", "1"))
r.add_child(Expression(self._factory, "m1-connected-2", "connected", "not_defined", None))
ms.prefer("connected", rule=r)
ms.commit()
# Group Resource
g = Group(self._factory, "group-1")
g.add_child(self.new_ip())
if self._cm.env["have_systemd"]:
sysd = Resource(self._factory, "petulant", "pacemaker-cts-dummyd@10", "service")
sysd.add_op("monitor", "P10S")
g.add_child(sysd)
else:
g.add_child(self.new_ip())
g.add_child(self.new_ip())
# Make group depend on the promotable clone
g.after("promotable-1", first="promote", then="start")
g.colocate("promotable-1", "INFINITY", withrole="Promoted")
g.commit()
# LSB resource
lsb = Resource(self._factory, "lsb-dummy", "LSBDummy", "lsb")
lsb.add_op("monitor", "5s")
# LSB with group
lsb.after("group-1")
lsb.colocate("group-1")
lsb.commit()
class ConfigFactory:
""" Singleton to generate a CIB file for the environment's schema version """
def __init__(self, cm):
""" Create a new ConfigFactory instance
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.rsh = self._cm.rsh
if not self._cm.env["ListTests"]:
self.target = self._cm.env["nodes"][0]
self.tmpfile = None
def log(self, args):
""" Log a message """
self._cm.log("cib: %s" % args)
def debug(self, args):
""" Log a debug message """
self._cm.debug("cib: %s" % args)
def create_config(self, name="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION):
""" Return a CIB object for the given schema version """
return CIB(self._cm, name, self)
diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
index 56246abea4..732ab24cec 100644
--- a/python/pacemaker/_cts/environment.py
+++ b/python/pacemaker/_cts/environment.py
@@ -1,646 +1,646 @@
""" Test environment classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["EnvFactory"]
__copyright__ = "Copyright 2014-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import random
import socket
import sys
import time
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogKind
class Environment:
""" A class for managing the CTS environment, consisting largely of processing
and storing command line parameters
"""
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
def __init__(self, args):
""" Create a new Environment instance. This class can be treated kind
of like a dictionary due to the presence of typical dict functions
like __contains__, __getitem__, and __setitem__. However, it is not a
dictionary so do not rely on standard dictionary behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
If None, sys.argv will be used.
"""
self.data = {}
self._nodes = []
# Set some defaults before processing command line arguments. These are
# either not set by any command line parameter, or they need a default
# that can't be set in add_argument.
self["DeadTime"] = 300
self["StartTime"] = 300
self["StableTime"] = 30
self["tests"] = []
self["IPagent"] = "IPaddr2"
self["DoFencing"] = True
self["ClobberCIB"] = False
self["CIBfilename"] = None
self["CIBResource"] = False
self["LogWatcher"] = LogKind.ANY
self["node-limit"] = 0
self["scenario"] = "random"
self.random_gen = random.Random()
self._logger = LogFactory()
self._rsh = RemoteFactory().getInstance()
self._target = "localhost"
self._seed_random()
self._parse_args(args)
if not self["ListTests"]:
self._validate()
self._discover()
def _seed_random(self, seed=None):
""" Initialize the random number generator with the given seed, or use
the current time if None
"""
if not seed:
seed = int(time.time())
self["RandSeed"] = seed
self.random_gen.seed(str(seed))
def dump(self):
""" Print the current environment """
keys = []
for key in list(self.data.keys()):
keys.append(key)
keys.sort()
for key in keys:
s = "Environment[%s]" % key
self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key])))
def keys(self):
""" Return a list of all environment keys stored in this instance """
return list(self.data.keys())
def __contains__(self, key):
""" Does the given environment key exist? """
if key == "nodes":
return True
return key in self.data
def __getitem__(self, key):
""" Return the given environment key, or None if it does not exist """
if str(key) == "0":
raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead")
if key == "nodes":
return self._nodes
if key == "Name":
return self._get_stack_short()
return self.data.get(key)
def __setitem__(self, key, value):
""" Set the given environment key to the given value, overriding any
previous value
"""
if key == "Stack":
self._set_stack(value)
elif key == "node-limit":
self.data[key] = value
self._filter_nodes()
elif key == "nodes":
self._nodes = []
for node in value:
# I don't think I need the IP address, etc. but this validates
# the node name against /etc/hosts and/or DNS, so it's a
# GoodThing(tm).
try:
n = node.strip()
socket.gethostbyname_ex(n)
self._nodes.append(n)
except:
self._logger.log("%s not found in DNS... aborting" % node)
raise
self._filter_nodes()
else:
self.data[key] = value
def random_node(self):
""" Choose a random node from the cluster """
return self.random_gen.choice(self["nodes"])
def get(self, key, default=None):
""" Return the value for key if key is in the environment, else default """
if key == "nodes":
return self._nodes
return self.data.get(key, default)
def _set_stack(self, name):
""" Normalize the given cluster stack name """
if name in ["corosync", "cs", "mcp"]:
self.data["Stack"] = "corosync 2+"
else:
raise ValueError("Unknown stack: %s" % name)
def _get_stack_short(self):
""" Return the short name for the currently set cluster stack """
if "Stack" not in self.data:
return "unknown"
if self.data["Stack"] == "corosync 2+":
return "crm-corosync"
LogFactory().log("Unknown stack: %s" % self["stack"])
raise ValueError("Unknown stack: %s" % self["stack"])
def _detect_systemd(self):
""" Detect whether systemd is in use on the target node """
if "have_systemd" not in self.data:
(rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0)
self["have_systemd"] = rc == 0
def _detect_syslog(self):
""" Detect the syslog variant in use on the target node """
if "syslogd" not in self.data:
if self["have_systemd"]:
# Systemd
(_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
self["syslogd"] = lines[0].strip()
else:
# SYS-V
(_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
self["syslogd"] = lines[0].strip()
if "syslogd" not in self.data or not self["syslogd"]:
# default
self["syslogd"] = "rsyslog"
def disable_service(self, node, service):
""" Disable the given service on the given node """
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, "systemctl disable %s" % service)
return rc
# SYS-V
(rc, _) = self._rsh(node, "chkconfig %s off" % service)
return rc
def enable_service(self, node, service):
""" Enable the given service on the given node """
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, "systemctl enable %s" % service)
return rc
# SYS-V
(rc, _) = self._rsh(node, "chkconfig %s on" % service)
return rc
def service_is_enabled(self, node, service):
""" Is the given service enabled on the given node? """
if self["have_systemd"]:
# Systemd
# With "systemctl is-enabled", we should check if the service is
# explicitly "enabled" instead of the return code. For example it returns
# 0 if the service is "static" or "indirect", but they don't really count
# as "enabled".
(rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service)
return rc == 0
# SYS-V
(rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service)
return rc == 0
def _detect_at_boot(self):
""" Detect if the cluster starts at boot """
if "at-boot" not in self.data:
self["at-boot"] = self.service_is_enabled(self._target, "corosync") \
or self.service_is_enabled(self._target, "pacemaker")
def _detect_ip_offset(self):
""" Detect the offset for IPaddr resources """
if self["CIBResource"] and "IPBase" not in self.data:
(_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
network = lines[0].strip()
(_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
try:
self["IPBase"] = lines[0].strip()
except (IndexError, TypeError):
self["IPBase"] = None
if not self["IPBase"]:
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.")
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
return
# pylint thinks self["IPBase"] is a list, not a string, which causes it
# to error out because a list doesn't have split().
# pylint: disable=no-member
if int(self["IPBase"].split('.')[3]) >= 240:
self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
- % (self["IPBase"], self["IPBase"].split('.')[3]))
+ % (self["IPBase"], self["IPBase"].split('.')[3]))
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
def _filter_nodes(self):
""" If --limit-nodes is given, keep that many nodes from the front of the
list of cluster nodes and drop the rest
"""
if self["node-limit"] > 0:
if len(self["nodes"]) > self["node-limit"]:
# pylint thinks self["node-limit"] is a list even though we initialize
# it as an int in __init__ and treat it as an int everywhere.
# pylint: disable=bad-string-format-type
self._logger.log("Limiting the number of nodes configured=%d (max=%d)"
- %(len(self["nodes"]), self["node-limit"]))
+ % (len(self["nodes"]), self["node-limit"]))
while len(self["nodes"]) > self["node-limit"]:
self["nodes"].pop(len(self["nodes"])-1)
def _validate(self):
""" Were we given all the required command line parameters? """
if not self["nodes"]:
raise ValueError("No nodes specified!")
def _discover(self):
""" Probe cluster nodes to figure out how to log and manage services """
self._target = random.Random().choice(self["nodes"])
exerciser = socket.gethostname()
# Use the IP where possible to avoid name lookup failures
for ip in socket.gethostbyname_ex(exerciser)[2]:
if ip != "127.0.0.1":
exerciser = ip
break
self["cts-exerciser"] = exerciser
self._detect_systemd()
self._detect_syslog()
self._detect_at_boot()
self._detect_ip_offset()
def _parse_args(self, argv):
""" Parse and validate command line parameters, setting the appropriate
values in the environment dictionary. If argv is None, use sys.argv
instead.
"""
if not argv:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0])
grp1 = parser.add_argument_group("Common options")
grp1.add_argument("-g", "--dsh-group", "--group",
metavar="GROUP", dest="group",
help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
grp1.add_argument("-l", "--limit-nodes",
type=int, default=0,
metavar="MAX",
help="Only use the first MAX cluster nodes supplied with --nodes")
grp1.add_argument("--benchmark",
action="store_true",
help="Add timing information")
grp1.add_argument("--list", "--list-tests",
action="store_true", dest="list_tests",
help="List the valid tests")
grp1.add_argument("--nodes",
metavar="NODES",
help="List of cluster nodes separated by whitespace")
grp1.add_argument("--stack",
default="corosync",
metavar="STACK",
help="Which cluster stack is installed")
grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
grp2.add_argument("-L", "--logfile",
metavar="PATH",
help="Where to look for logs from cluster nodes")
grp2.add_argument("--at-boot", "--cluster-starts-at-boot",
choices=["1", "0", "yes", "no"],
help="Does the cluster software start at boot time?")
grp2.add_argument("--facility", "--syslog-facility",
default="daemon",
metavar="NAME",
help="Which syslog facility to log to")
grp2.add_argument("--ip", "--test-ip-base",
metavar="IP",
help="Offset for generated IP address resources")
grp3 = parser.add_argument_group("Options for release testing")
grp3.add_argument("-r", "--populate-resources",
action="store_true",
help="Generate a sample configuration")
grp3.add_argument("--choose",
metavar="NAME",
help="Run only the named test")
grp3.add_argument("--fencing", "--stonith",
choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
default="1",
help="What fencing agent to use")
grp3.add_argument("--once",
action="store_true",
help="Run all valid tests once")
grp4 = parser.add_argument_group("Additional (less common) options")
grp4.add_argument("-c", "--clobber-cib",
action="store_true",
help="Erase any existing configuration")
grp4.add_argument("-y", "--yes",
action="store_true", dest="always_continue",
help="Continue to run whenever prompted")
grp4.add_argument("--boot",
action="store_true",
help="")
grp4.add_argument("--cib-filename",
metavar="PATH",
help="Install the given CIB file to the cluster")
grp4.add_argument("--experimental-tests",
action="store_true",
help="Include experimental tests")
grp4.add_argument("--loop-minutes",
type=int, default=60,
help="")
grp4.add_argument("--no-loop-tests",
action="store_true",
help="Don't run looping/time-based tests")
grp4.add_argument("--no-unsafe-tests",
action="store_true",
help="Don't run tests that are unsafe for use with ocfs2/drbd")
grp4.add_argument("--notification-agent",
metavar="PATH",
default="/var/lib/pacemaker/notify.sh",
help="Script to configure for Pacemaker alerts")
grp4.add_argument("--notification-recipient",
metavar="R",
default="/var/lib/pacemaker/notify.log",
help="Recipient to pass to alert script")
grp4.add_argument("--oprofile",
metavar="NODES",
help="List of cluster nodes to run oprofile on")
grp4.add_argument("--outputfile",
metavar="PATH",
help="Location to write logs to")
grp4.add_argument("--qarsh",
action="store_true",
help="Use QARSH to access nodes instead of SSH")
grp4.add_argument("--schema",
metavar="SCHEMA",
default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION,
help="Create a CIB conforming to the given schema")
grp4.add_argument("--seed",
metavar="SEED",
help="Use the given string as the random number seed")
grp4.add_argument("--set",
action="append",
metavar="ARG",
default=[],
help="Set key=value pairs (can be specified multiple times)")
grp4.add_argument("--stonith-args",
metavar="ARGS",
default="hostlist=all,livedangerously=yes",
help="")
grp4.add_argument("--stonith-type",
metavar="TYPE",
default="external/ssh",
help="")
grp4.add_argument("--trunc",
action="store_true", dest="truncate",
help="Truncate log file before starting")
grp4.add_argument("--valgrind-procs",
metavar="PROCS",
default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd",
help="Run valgrind against the given space-separated list of processes")
grp4.add_argument("--valgrind-tests",
action="store_true",
help="Include tests using valgrind")
grp4.add_argument("--warn-inactive",
action="store_true",
help="Warn if a resource is assigned to an inactive node")
parser.add_argument("iterations",
nargs='?',
type=int, default=1,
help="Number of tests to run")
args = parser.parse_args(args=argv)
# Set values on this object based on what happened with command line
# processing. This has to be done in several blocks.
# These values can always be set. They get a default from the add_argument
# calls, only do one thing, and they do not have any side effects.
self["ClobberCIB"] = args.clobber_cib
self["ListTests"] = args.list_tests
self["Schema"] = args.schema
self["Stack"] = args.stack
self["SyslogFacility"] = args.facility
self["TruncateLog"] = args.truncate
self["at-boot"] = args.at_boot in ["1", "yes"]
self["benchmark"] = args.benchmark
self["continue"] = args.always_continue
self["experimental-tests"] = args.experimental_tests
self["iterations"] = args.iterations
self["loop-minutes"] = args.loop_minutes
self["loop-tests"] = not args.no_loop_tests
self["notification-agent"] = args.notification_agent
self["notification-recipient"] = args.notification_recipient
self["node-limit"] = args.limit_nodes
self["stonith-params"] = args.stonith_args
self["stonith-type"] = args.stonith_type
self["unsafe-tests"] = not args.no_unsafe_tests
self["valgrind-procs"] = args.valgrind_procs
self["valgrind-tests"] = args.valgrind_tests
self["warn-inactive"] = args.warn_inactive
# Nodes and groups are mutually exclusive, so their defaults cannot be
# set in their add_argument calls. Additionally, groups does more than
# just set a value. Here, set nodes first and then if a group is
# specified, override the previous nodes value.
if args.nodes:
self["nodes"] = args.nodes.split(" ")
else:
self["nodes"] = []
if args.group:
self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group)
LogFactory().add_file(self["OutputFile"], "CTS")
dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group)
if os.path.isfile(dsh_file):
self["nodes"] = []
with open(dsh_file, "r", encoding="utf-8") as f:
for line in f:
l = line.strip()
if not l.startswith('#'):
self["nodes"].append(l)
else:
print("Unknown DSH group: %s" % args.dsh_group)
# Everything else either can't have a default set in an add_argument
# call (likely because we don't want to always have a value set for it)
# or it does something fancier than just set a single value. However,
# order does not matter for these as long as the user doesn't provide
# conflicting arguments on the command line. So just do Everything
# alphabetically.
if args.boot:
self["scenario"] = "boot"
if args.cib_filename:
self["CIBfilename"] = args.cib_filename
else:
self["CIBfilename"] = None
if args.choose:
self["scenario"] = "sequence"
self["tests"].append(args.choose)
if args.fencing:
if args.fencing in ["0", "no"]:
self["DoFencing"] = False
else:
self["DoFencing"] = True
if args.fencing in ["rhcs", "virt", "xvm"]:
self["stonith-type"] = "fence_xvm"
elif args.fencing == "scsi":
self["stonith-type"] = "fence_scsi"
elif args.fencing in ["lha", "ssh"]:
self["stonith-params"] = "hostlist=all,livedangerously=yes"
self["stonith-type"] = "external/ssh"
elif args.fencing == "openstack":
self["stonith-type"] = "fence_openstack"
print("Obtaining OpenStack credentials from the current environment")
self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % (
os.environ['OS_REGION_NAME'],
os.environ['OS_TENANT_NAME'],
os.environ['OS_AUTH_URL'],
os.environ['OS_USERNAME'],
os.environ['OS_PASSWORD']
)
elif args.fencing == "rhevm":
self["stonith-type"] = "fence_rhevm"
print("Obtaining RHEV-M credentials from the current environment")
self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
os.environ['RHEVM_USERNAME'],
os.environ['RHEVM_PASSWORD'],
os.environ['RHEVM_SERVER'],
os.environ['RHEVM_PORT'],
)
if args.ip:
self["CIBResource"] = True
self["ClobberCIB"] = True
self["IPBase"] = args.ip
if args.logfile:
self["LogAuditDisabled"] = True
self["LogFileName"] = args.logfile
self["LogWatcher"] = LogKind.REMOTE_FILE
else:
# We can't set this as the default on the parser.add_argument call
# for this option because then args.logfile will be set, which means
# the above branch will be taken and those other values will also be
# set.
self["LogFileName"] = "/var/log/messages"
if args.once:
self["scenario"] = "all-once"
if args.oprofile:
self["oprofile"] = args.oprofile.split(" ")
else:
self["oprofile"] = []
if args.outputfile:
self["OutputFile"] = args.outputfile
LogFactory().add_file(self["OutputFile"])
if args.populate_resources:
self["CIBResource"] = True
self["ClobberCIB"] = True
if args.qarsh:
self._rsh.enable_qarsh()
for kv in args.set:
(name, value) = kv.split("=")
self[name] = value
print("Setting %s = %s" % (name, value))
class EnvFactory:
""" A class for constructing a singleton instance of an Environment object """
instance = None
# pylint: disable=invalid-name
def getInstance(self, args=None):
""" Returns the previously created instance of Environment, or creates a
new instance if one does not already exist.
"""
if not EnvFactory.instance:
EnvFactory.instance = Environment(args)
return EnvFactory.instance
diff --git a/python/pacemaker/_cts/tests/resourcerecover.py b/python/pacemaker/_cts/tests/resourcerecover.py
index 9fd07e2b4c..252eb1f5c2 100644
--- a/python/pacemaker/_cts/tests/resourcerecover.py
+++ b/python/pacemaker/_cts/tests/resourcerecover.py
@@ -1,175 +1,175 @@
""" Fail a random resource and verify its fail count increases """
__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from pacemaker._cts.audits import AuditResource
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.tests.simulstartlite import SimulStartLite
from pacemaker._cts.tests.starttest import StartTest
from pacemaker._cts.timer import Timer
# Disable various pylint warnings that occur in so many places throughout this
# file it's easiest to just take care of them globally. This does introduce the
# possibility that we'll miss some other cause of the same warning, but we'll
# just have to be careful.
# pylint doesn't understand that self._rsh is callable.
# pylint: disable=not-callable
class ResourceRecover(CTSTest):
""" A concrete test that fails a random resource """
def __init__(self, cm):
""" Create a new ResourceRecover instance
Arguments:
cm -- A ClusterManager instance
"""
CTSTest.__init__(self, cm)
self.benchmark = True
self.name = "ResourceRecover"
self._action = "asyncmon"
self._interval = 0
self._rid = None
self._rid_alt = None
self._start = StartTest(cm)
self._startall = SimulStartLite(cm)
def __call__(self, node):
""" Perform this test """
self.incr("calls")
if not self._startall(None):
return self.failure("Setup failed")
# List all resources active on the node (skip test if none)
resourcelist = self._cm.active_resources(node)
if not resourcelist:
self._logger.log("No active resources on %s" % node)
return self.skipped()
# Choose one resource at random
rsc = self._choose_resource(node, resourcelist)
if rsc is None:
return self.failure("Could not get details of resource '%s'" % self._rid)
if rsc.id == rsc.clone_id:
self.debug("Failing %s" % rsc.id)
else:
self.debug("Failing %s (also known as %s)" % (rsc.id, rsc.clone_id))
# Log patterns to watch for (failure, plus restart if managed)
pats = [
self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id)
]
if rsc.managed:
pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._rid))
if rsc.unique:
pats.append(self.templates["Pat:RscOpOK"] % ("start", self._rid))
else:
# Anonymous clones may get restarted with a different clone number
pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
# Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
# is incrementing properly, but it might restart on a different node.
# We'd have to temporarily ban it from all other nodes and ensure the
# migration-threshold hasn't been reached.)
if self._fail_resource(rsc, node, pats) is None:
# self.failure() already called
return None
return self.success()
def _choose_resource(self, node, resourcelist):
""" Choose a random resource to target """
self._rid = self._env.random_gen.choice(resourcelist)
self._rid_alt = self._rid
(_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
for line in lines:
if line.startswith("Resource: "):
rsc = AuditResource(self._cm, line)
if rsc.id == self._rid:
# Handle anonymous clones that get renamed
self._rid = rsc.clone_id
return rsc
return None
def _get_failcount(self, node):
""" Check the fail count of targeted resource on given node """
cmd = "crm_failcount --quiet --query --resource %s --operation %s --interval %d --node %s"
(rc, lines) = self._rsh(node, cmd % (self._rid, self._action, self._interval, node),
verbose=1)
if rc != 0 or len(lines) != 1:
lines = [l.strip() for l in lines]
self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(lines)))
return -1
try:
failcount = int(lines[0])
except (IndexError, ValueError):
self._logger.log("crm_failcount output on %s unparseable: %s" % (node, " ".join(lines)))
return -1
return failcount
def _fail_resource(self, rsc, node, pats):
""" Fail the targeted resource, and verify as expected """
orig_failcount = self._get_failcount(node)
watch = self.create_watch(pats, 60)
watch.set_watch()
self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
with Timer(self._logger, self.name, "recover"):
watch.look_for_all()
self._cm.cluster_stable()
recovered = self._cm.resource_location(self._rid)
if watch.unmatched:
return self.failure("Patterns not found: %r" % watch.unmatched)
if rsc.unique and len(recovered) > 1:
return self.failure("%s is now active on more than one node: %r" % (self._rid, recovered))
if recovered:
self.debug("%s is running on: %r" % (self._rid, recovered))
elif rsc.managed:
return self.failure("%s was not recovered and is inactive" % self._rid)
new_failcount = self._get_failcount(node)
if new_failcount != orig_failcount + 1:
- return self.failure("%s fail count is %d not %d" % (self._rid,
- new_failcount, orig_failcount + 1))
+ return self.failure("%s fail count is %d not %d"
+ % (self._rid, new_failcount, orig_failcount + 1))
return 0 # Anything but None is success
@property
def errors_to_ignore(self):
""" Return list of errors which should be ignored """
return [
r"Updating failcount for %s" % self._rid,
r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt),
r"Unknown operation: fail",
self.templates["Pat:RscOpOK"] % (self._action, self._rid),
r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval)
]
diff --git a/python/pacemaker/_cts/watcher.py b/python/pacemaker/_cts/watcher.py
index 5c07e88066..3e6d70204b 100644
--- a/python/pacemaker/_cts/watcher.py
+++ b/python/pacemaker/_cts/watcher.py
@@ -1,551 +1,551 @@
""" Log searching classes for Pacemaker's Cluster Test Suite (CTS) """
__all__ = ["LogKind", "LogWatcher"]
__copyright__ = "Copyright 2014-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
from enum import Enum, unique
import re
import time
import threading
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
LOG_WATCHER_BIN = "%s/cts-log-watcher" % BuildOptions.DAEMON_DIR
@unique
class LogKind(Enum):
""" The various kinds of log files that can be watched """
ANY = 0
FILE = 1
REMOTE_FILE = 2
JOURNAL = 3
def __str__(self):
if self.value == 0:
return "any"
if self.value == 1:
return "combined syslog"
if self.value == 2:
return "remote"
return "journal"
class SearchObj:
""" The base class for various kinds of log watchers. Log-specific watchers
need to be built on top of this one.
"""
def __init__(self, filename, host=None, name=None):
""" Create a new SearchObj instance
Arguments:
filename -- The log to watch
host -- The cluster node on which to watch the log
name -- A unique name to use when logging about this watch
"""
self.cache = []
self.filename = filename
self.limit = None
self.logger = LogFactory()
self.name = name
self.offset = "EOF"
self.rsh = RemoteFactory().getInstance()
if host:
self.host = host
else:
self.host = "localhost"
def __str__(self):
if self.host:
return "%s:%s" % (self.host, self.filename)
return self.filename
def log(self, args):
""" Log a message """
message = "lw: %s: %s" % (self, args)
self.logger.log(message)
def debug(self, args):
""" Log a debug message """
message = "lw: %s: %s" % (self, args)
self.logger.debug(message)
def harvest(self, delegate=None):
""" Collect lines from a log, optionally calling delegate when complete """
async_task = self.harvest_async(delegate)
async_task.join()
def harvest_async(self, delegate=None):
""" Collect lines from a log asynchronously, optionally calling delegate
when complete. This method must be implemented by all subclasses.
"""
raise NotImplementedError
def end(self):
""" Mark that a log is done being watched, resetting internal data structures
to the beginning of the file. Subsequent watches will therefore start
from the beginning again.
"""
self.debug("Unsetting the limit")
self.limit = None
class FileObj(SearchObj):
""" A specialized SearchObj subclass for watching log files """
def __init__(self, filename, host=None, name=None):
""" Create a new FileObj instance
Arguments:
filename -- The file to watch
host -- The cluster node on which to watch the file
name -- A unique name to use when logging about this watch
"""
SearchObj.__init__(self, filename, host, name)
self._delegate = None
self.harvest()
def async_complete(self, pid, returncode, out, err):
""" Called when an asynchronous log file read is complete. This function
saves the output from that read for look()/look_for_all() to process
and records the current position in the journal. Future reads will
pick back up from that spot.
Arguments:
pid -- The ID of the process that did the read
returncode -- The return code of the process that did the read
out -- stdout from the file read
err -- stderr from the file read
"""
for line in out:
match = re.search(r"^CTSwatcher:Last read: (\d+)", line)
if match:
self.offset = match.group(1)
self.debug("Got %d lines, new offset: %s %r" % (len(out), self.offset, self._delegate))
elif re.search(r"^CTSwatcher:.*truncated", line):
self.log(line)
elif re.search(r"^CTSwatcher:", line):
self.debug("Got control line: %s" % line)
else:
self.cache.append(line)
if self._delegate:
self._delegate.async_complete(pid, returncode, self.cache, err)
def harvest_async(self, delegate=None):
""" Collect lines from the log file on a single host asynchronously,
optionally calling delegate when complete. This can be called
repeatedly, reading a chunk each time or until the end of the log
file is hit.
"""
self._delegate = delegate
self.cache = []
if self.limit and (self.offset == "EOF" or int(self.offset) > self.limit):
if self._delegate:
self._delegate.async_complete(-1, -1, [], [])
return None
return self.rsh.call_async(self.host,
"%s -t %s -p CTSwatcher: -l 200 -f %s -o %s" % (LOG_WATCHER_BIN, self.name, self.filename, self.offset),
delegate=self)
def set_end(self):
""" Internally record where we expect to find the end of a log file,
which is just the number of lines in the file. Calls to harvest
from the log file will not go any farther than what this function
records.
"""
if self.limit:
return
# pylint: disable=not-callable
(_, lines) = self.rsh(self.host,
"%s -t %s -p CTSwatcher: -l 2 -f %s -o %s" % (LOG_WATCHER_BIN, self.name, self.filename, "EOF"),
verbose=0)
for line in lines:
match = re.search(r"^CTSwatcher:Last read: (\d+)", line)
if match:
self.limit = int(match.group(1))
self.debug("Set limit to: %d" % self.limit)
class JournalObj(SearchObj):
""" A specialized SearchObj subclass for watching systemd journals """
def __init__(self, host=None, name=None):
""" Create a new JournalObj instance
Arguments:
host -- The cluster node on which to watch the journal
name -- A unique name to use when logging about this watch
"""
SearchObj.__init__(self, name, host, name)
self._delegate = None
self._hit_limit = False
self.harvest()
def async_complete(self, pid, returncode, out, err):
""" Called when an asynchronous journal read is complete. This function
saves the output from that read for look()/look_for_all() to process
and records the current position in the journal. Future reads will
pick back up from that spot.
Arguments:
pid -- The ID of the process that did the journal read
returncode -- The return code of the process that did the journal read
out -- stdout from the journal read
err -- stderr from the journal read
"""
found_cursor = False
for line in out:
match = re.search(r"^-- cursor: ([^.]+)", line)
if match:
found_cursor = True
self.offset = match.group(1).strip()
self.debug("Got %d lines, new cursor: %s" % (len(out), self.offset))
else:
self.cache.append(line)
if self.limit and not found_cursor:
self._hit_limit = True
self.debug("Got %d lines but no cursor: %s" % (len(out), self.offset))
# Get the current cursor
# pylint: disable=not-callable
(_, out) = self.rsh(self.host, "journalctl -q -n 0 --show-cursor", verbose=0)
for line in out:
match = re.search(r"^-- cursor: ([^.]+)", line)
if match:
self.offset = match.group(1).strip()
self.debug("Got %d lines, new cursor: %s" % (len(out), self.offset))
else:
self.log("Not a new cursor: %s" % line)
self.cache.append(line)
if self._delegate:
self._delegate.async_complete(pid, returncode, self.cache, err)
def harvest_async(self, delegate=None):
""" Collect lines from the journal on a single host asynchronously,
optionally calling delegate when complete. This can be called
repeatedly, reading a chunk each time or until the end of the
journal is hit.
"""
self._delegate = delegate
self.cache = []
# Use --lines to prevent journalctl from overflowing the Popen input buffer
if self.limit and self._hit_limit:
return None
if self.offset == "EOF":
command = "journalctl -q -n 0 --show-cursor"
elif self.limit:
command = "journalctl -q --after-cursor='%s' --until '%s' --lines=200 --show-cursor" % (self.offset, self.limit)
else:
command = "journalctl -q --after-cursor='%s' --lines=200 --show-cursor" % (self.offset)
return self.rsh.call_async(self.host, command, delegate=self)
def set_end(self):
""" Internally record where we expect to find the end of a host's journal,
which is just the current time. Calls to harvest from the journal will
not go any farther than what this function records.
"""
if self.limit:
return
self._hit_limit = False
# pylint: disable=not-callable
(rc, lines) = self.rsh(self.host, "date +'%Y-%m-%d %H:%M:%S'", verbose=0)
if rc == 0 and len(lines) == 1:
self.limit = lines[0].strip()
self.debug("Set limit to: %s" % self.limit)
else:
- self.debug("Unable to set limit for %s because date returned %d lines with status %d" % (self.host,
- len(lines), rc))
+ self.debug("Unable to set limit for %s because date returned %d lines with status %d"
+ % (self.host, len(lines), rc))
class LogWatcher:
""" A class for watching a single log file or journal across multiple hosts,
looking for lines that match given regular expressions.
The way you use this class is as follows:
- Construct a LogWatcher object
- Call set_watch() when you want to start watching the log
- Call look() to scan the log looking for the patterns
"""
def __init__(self, log, regexes, hosts, kind=LogKind.ANY, name="Anon", timeout=10, silent=False):
""" Create a new LogWatcher instance.
Arguments:
log -- The log file to watch
regexes -- A list of regular expressions to match against the log
hosts -- A list of cluster nodes on which to watch the log
kind -- What type of log is this object watching?
name -- A unique name to use when logging about this watch
timeout -- Default number of seconds to watch a log file at a time;
this can be overridden by the timeout= parameter to
self.look on an as-needed basis
silent -- If False, log extra information
"""
self.filename = log
self.hosts = hosts
self.kind = kind
self.name = name
self.regexes = regexes
self.unmatched = None
self.whichmatch = -1
self._cache_lock = threading.Lock()
self._file_list = []
self._line_cache = []
self._logger = LogFactory()
self._timeout = int(timeout)
# Validate our arguments. Better sooner than later ;-)
for regex in regexes:
re.compile(regex)
if not self.hosts:
raise ValueError("LogWatcher requires hosts argument")
if not self.filename:
raise ValueError("LogWatcher requires log argument")
if not silent:
for regex in self.regexes:
self._debug("Looking for regex: %s" % regex)
def _debug(self, args):
""" Log a debug message """
message = "lw: %s: %s" % (self.name, args)
self._logger.debug(message)
def set_watch(self):
""" Mark the place to start watching the log from """
if self.kind == LogKind.REMOTE_FILE:
for node in self.hosts:
self._file_list.append(FileObj(self.filename, node, self.name))
elif self.kind == LogKind.JOURNAL:
for node in self.hosts:
self._file_list.append(JournalObj(node, self.name))
else:
self._file_list.append(FileObj(self.filename))
def async_complete(self, pid, returncode, out, err):
""" Called when an asynchronous log file read is complete. This function
saves the output from that read for look()/look_for_all() to process
and records the current position. Future reads will pick back up
from that spot.
Arguments:
pid -- The ID of the process that did the read
returncode -- The return code of the process that did the read
out -- stdout from the file read
err -- stderr from the file read
"""
# It's not clear to me whether this function ever gets called as
# delegate somewhere, which is what would pass returncode and err
# as parameters. Just disable the warning for now.
# pylint: disable=unused-argument
# TODO: Probably need a lock for updating self._line_cache
self._logger.debug("%s: Got %d lines from %d (total %d)" % (self.name, len(out), pid, len(self._line_cache)))
if out:
with self._cache_lock:
self._line_cache.extend(out)
def __get_lines(self):
""" Iterate over all watched log files and collect new lines from each """
if not self._file_list:
raise ValueError("No sources to read from")
pending = []
for f in self._file_list:
t = f.harvest_async(self)
if t:
pending.append(t)
for t in pending:
t.join(60.0)
if t.is_alive():
self._logger.log("%s: Aborting after 20s waiting for %r logging commands" % (self.name, t))
return
def end(self):
""" Mark that a log is done being watched, resetting internal data structures
to the beginning of the file. Subsequent watches will therefore start
from the beginning again.
"""
for f in self._file_list:
f.end()
def look(self, timeout=None):
""" Examine the log looking for the regexes that were given when this
object was created. It starts looking from the place marked by
set_watch(), continuing through the file in the fashion of
`tail -f`. It properly recovers from log file truncation but not
from removing and recreating the log.
Arguments:
timeout -- Number of seconds to watch the log file; defaults to
seconds argument passed when this object was created
Returns:
The first line which matches any regex
"""
if not timeout:
timeout = self._timeout
lines = 0
begin = time.time()
end = begin + timeout + 1
if not self.regexes:
self._debug("Nothing to look for")
return None
if timeout == 0:
for f in self._file_list:
f.set_end()
while True:
if self._line_cache:
lines += 1
with self._cache_lock:
line = self._line_cache[0]
self._line_cache.remove(line)
which = -1
if re.search("CTS:", line):
continue
for regex in self.regexes:
which += 1
matchobj = re.search(regex, line)
if matchobj:
self.whichmatch = which
self._debug("Matched: %s" % line)
return line
elif timeout > 0 and end < time.time():
timeout = 0
for f in self._file_list:
f.set_end()
else:
self.__get_lines()
if not self._line_cache and end < time.time():
self._debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines))
return None
self._debug("Waiting: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), len(self._line_cache)))
time.sleep(1)
self._debug("How did we get here")
return None
def look_for_all(self, allow_multiple_matches=False, silent=False):
""" Like look(), but looks for matches for multiple regexes. This function
returns when the timeout is reached or all regexes were matched. As a
side effect, self.unmatched will contain regexes that were not matched.
This can be inspected by the caller.
Arguments:
allow_multiple_matches -- If True, allow each regex to match more than
once. If False (the default), once a regex
matches a line, it will no longer be searched
for.
silent -- If False, log extra information
Returns:
If all regexes are matched, return the matching lines. Otherwise, return
None.
"""
save_regexes = self.regexes
result = []
if not silent:
self._debug("starting search: timeout=%d" % self._timeout)
while self.regexes:
one_result = self.look(self._timeout)
if not one_result:
self.unmatched = self.regexes
self.regexes = save_regexes
self.end()
return None
result.append(one_result)
if not allow_multiple_matches:
del self.regexes[self.whichmatch]
else:
# Allow multiple regexes to match a single line
tmp_regexes = self.regexes
self.regexes = []
for regex in tmp_regexes:
matchobj = re.search(regex, one_result)
if not matchobj:
self.regexes.append(regex)
self.unmatched = None
self.regexes = save_regexes
return result

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 10, 2:02 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2009653
Default Alt Text
(104 KB)

Event Timeline