diff --git a/cts/lab/CTSaudits.py b/cts/lab/CTSaudits.py index 40a65c7d93..eb97d3e039 100755 --- a/cts/lab/CTSaudits.py +++ b/cts/lab/CTSaudits.py @@ -1,875 +1,878 @@ """ Auditing classes for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import time, re, uuid from pacemaker.buildoptions import BuildOptions -from pacemaker._cts.watcher import LogWatcher +from pacemaker._cts.watcher import LogKind, LogWatcher class ClusterAudit(object): def __init__(self, cm): self.CM = cm def __call__(self): raise ValueError("Abstract Class member (__call__)") def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def log(self, args): self.CM.log("audit: %s" % args) def debug(self, args): self.CM.debug("audit: %s" % args) def name(self): raise ValueError("Abstract Class member (name)") AllAuditClasses = [ ] class LogAudit(ClusterAudit): def name(self): return "LogAudit" def __init__(self, cm): self.CM = cm - self.kinds = [ "combined syslog", "journal", "remote" ] def RestartClusterLogging(self, nodes=None): if not nodes: nodes = self.CM.Env["nodes"] self.CM.debug("Restarting logging on: %s" % repr(nodes)) for node in nodes: if self.CM.Env["have_systemd"]: (rc, _) = self.CM.rsh(node, "systemctl stop systemd-journald.socket") if rc != 0: self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) (rc, _) = self.CM.rsh(node, "systemctl start systemd-journald.service") if rc != 0: self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) (rc, _) = self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"]) if rc != 0: self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node)) def TestLogging(self): patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self.CM.Env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) watch_pref = self.CM.Env["LogWatcher"] - if watch_pref == "any": - for k in self.kinds: + if watch_pref == LogKind.ANY: + for k in LogKind: watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, self.CM.Env["nodes"], k, "LogAudit", 5, silent=True) watch[k].set_watch() else: k = watch_pref watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, self.CM.Env["nodes"], k, "LogAudit", 5, silent=True) watch[k].set_watch() - if watch_pref == "any": self.CM.log("Writing log with key: %s" % (suffix)) + if watch_pref == LogKind.ANY: + self.CM.log("Writing log with key: %s" % (suffix)) + for node in self.CM.Env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix) (rc, _) = self.CM.rsh(node, cmd, synchronous=False, verbose=0) if rc != 0: self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) - for k in self.kinds: + for k in LogKind: if k in watch: w = watch[k] - if watch_pref == "any": self.CM.log("Testing for %s logs" % (k)) + if watch_pref == LogKind.ANY: + self.CM.log("Testing for %s logs" % (k)) + w.look_for_all(silent=True) if not w.unmatched: - if watch_pref == "any": + if watch_pref == LogKind.ANY: self.CM.log ("Continuing with %s-based log reader" % (w.kind)) self.CM.Env["LogWatcher"] = w.kind return 1 for k in list(watch.keys()): w = watch[k] if w.unmatched: for regex in w.unmatched: self.CM.log ("Test message [%s] not found in %s logs." % (regex, w.kind)) return 0 def __call__(self): max = 3 attempt = 0 self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) while attempt <= max and self.TestLogging() == 0: attempt = attempt + 1 self.RestartClusterLogging() time.sleep(60*attempt) if attempt > max: self.CM.log("ERROR: Cluster logging unrecoverable.") return 0 return 1 def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 if self.CM.Env["LogAuditDisabled"]: return 0 return 1 class DiskAudit(ClusterAudit): def name(self): return "DiskspaceAudit" def __init__(self, cm): self.CM = cm def __call__(self): result = 1 # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM " + BuildOptions.LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'" self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: (_, dfout) = self.CM.rsh(node, dfcmd, verbose=1) if not dfout: self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) else: dfout = dfout[0].strip() try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = None if self.CM.Env["continue"]: answer = "Y" else: try: answer = input('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("Disk full on %s" % (node)) elif remaining_mb < 100 or used_percent > 90: self.CM.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 return 1 class FileAudit(ClusterAudit): def name(self): return "FileAudit" def __init__(self, cm): self.CM = cm self.known = [] def __call__(self): result = 1 self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = 0 self.known.append(line) self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = 0 self.known.append(line) self.CM.log("Warning: Corosync core file on %s: %s" % (node, line)) if node in self.CM.ShouldBeStatus and self.CM.ShouldBeStatus[node] == "down": clean = 0 (_, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) for line in lsout: result = 0 clean = 1 self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (_, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) for line in lsout: self.CM.debug("ps[%s]: %s" % (node, line)) self.CM.rsh(node, "rm -rf /dev/shm/qb-*") else: self.CM.debug("Skipping %s" % node) return result def is_applicable(self): return 1 class AuditResource(object): def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None def unique(self): if self.flags & int("0x00000020", 16): return 1 return 0 def orphan(self): if self.flags & int("0x00000001", 16): return 1 return 0 def managed(self): if self.flags & int("0x00000002", 16): return 1 return 0 class AuditConstraint(object): def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): def name(self): return "PrimitiveAudit" def __init__(self, cm): self.CM = cm def doResourceAudit(self, resource, quorum): rc = 1 active = self.CM.ResourceLocation(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %s" % (resource.id, repr(active))) elif resource.needs_quorum == 1: self.CM.log("Resource %s active without quorum: %s" % (resource.id, repr(active))) rc = 0 elif not resource.managed(): self.CM.log("Resource %s not managed. Active on %s" % (resource.id, repr(active))) elif not resource.unique(): # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %s" % (resource.id, repr(active))) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self.CM.log("Resource %s is active multiple times: %s" % (resource.id, repr(active))) rc = 0 elif resource.orphan(): self.debug("Resource %s is an inactive orphan" % resource.id) elif len(self.inactive_nodes) == 0: self.CM.log("WARN: Resource %s not served anywhere" % resource.id) rc = 0 elif self.CM.Env["warn-inactive"]: if quorum or not resource.needs_quorum: self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) else: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) return rc def setup(self): self.target = None self.resources = [] self.constraints = [] self.active_nodes = [] self.inactive_nodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == "up": self.active_nodes.append(node) else: self.inactive_nodes.append(node) for node in self.CM.Env["nodes"]: if self.target == None and self.CM.ShouldBeStatus[node] == "up": self.target = node if not self.target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name()) return 0 (_, lines) = self.CM.rsh(self.target, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): self.resources.append(AuditResource(self.CM, line)) elif re.search("^Constraint", line): self.constraints.append(AuditConstraint(self.CM, line)) else: self.CM.log("Unknown entry: %s" % line); return 1 def __call__(self): rc = 1 if not self.setup(): return 1 quorum = self.CM.HasQuorum(None) for resource in self.resources: if resource.type == "primitive": if self.doResourceAudit(resource, quorum) == 0: rc = 0 return rc def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 class GroupAudit(PrimitiveAudit): def name(self): return "GroupAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for group in self.resources: if group.type == "group": first_match = 1 group_location = None for child in self.resources: if child.parent == group.id: nodes = self.CM.ResourceLocation(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = 0 if len(nodes) > 1: rc = 0 self.CM.log("Child %s of %s is active more than once: %s" % (child.id, group.id, repr(nodes))) elif len(nodes) == 0: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: rc = 0 self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return rc class CloneAudit(PrimitiveAudit): def name(self): return "CloneAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for clone in self.resources: if clone.type == "clone": for child in self.resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return rc class ColocationAudit(PrimitiveAudit): def name(self): return "ColocationAudit" def crm_location(self, resource): (rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, verbose=1) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): rc = 1 if not self.setup(): return 1 for coloc in self.constraints: if coloc.type == "rsc_colocation": source = self.crm_location(coloc.rsc) target = self.crm_location(coloc.target) if len(source) == 0: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: rc = 0 self.CM.log("Colocation audit (%s): %s running on %s (not in %s)" % (coloc.id, coloc.rsc, node, repr(target))) else: self.debug("Colocation audit (%s): %s running on %s (in %s)" % (coloc.id, coloc.rsc, node, repr(target))) return rc class ControllerStateAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self.CM.Env["nodes"]: should_be = self.CM.ShouldBeStatus[node] rc = self.CM.test_node_CM(node) if rc > 0: if should_be == "down": down_are_up = down_are_up + 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down = up_are_down + 1 if len(unstable_list) > 0: passed = 0 self.CM.log("Cluster is not stable: %d (of %d): %s" % (len(unstable_list), self.CM.upcount(), repr(unstable_list))) if up_are_down > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self.CM.Env["nodes"]))) if down_are_up > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self.CM.Env["nodes"]))) return passed def name(self): return "ControllerStateAudit" def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 class CIBAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 ccm_partitions = self.CM.find_partitions() if len(ccm_partitions) == 0: self.debug("\tNo partitions to audit") return 1 for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) partition_passed = 0 if self.audit_cib_contents(partition) == 0: passed = 0 return passed def audit_cib_contents(self, hostlist): passed = 1 node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self.store_remote_cib(node, node0) if node_xml == None: self.CM.log("Could not perform audit: No configuration from %s" % node) passed = 0 elif node0 == None: node0 = node node0_xml = node_xml elif node0_xml == None: self.CM.log("Could not perform audit: No configuration from %s" % node0) passed = 0 else: (rc, result) = self.CM.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) if rc != 0: self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = 0 for line in result: if not re.search("", line): passed = 0 self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) # self.CM.rsh(node0, "rm -f %s" % node_xml) # self.CM.rsh(node0, "rm -f %s" % node0_xml) return passed def store_remote_cib(self, node, target): combined = "" filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], verbose=1) if rc != 0: self.CM.log("Could not retrieve configuration") return None self.CM.rsh("localhost", "rm -f %s" % filename) for line in lines: self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) if self.CM.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self.CM.log("Could not store configuration") return None return filename def name(self): return "CibAudit" def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 class PartitionAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} self.NodeEpoch = {} self.NodeState = {} self.NodeQuorum = {} def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 ccm_partitions = self.CM.find_partitions() if ccm_partitions == None or len(ccm_partitions) == 0: return 1 self.CM.cluster_stable(double_check=True) if len(ccm_partitions) != self.CM.partitions_expected: self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) passed = 0 for partition in ccm_partitions: self.CM.log("\t %s" % partition) for partition in ccm_partitions: partition_passed = 0 if self.audit_partition(partition) == 0: passed = 0 return passed def trim_string(self, avalue): if not avalue: return None if len(avalue) > 1: return avalue[:-1] def trim2int(self, avalue): if not avalue: return None if len(avalue) > 1: return int(avalue[:-1]) def audit_partition(self, partition): passed = 1 dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % (partition)) for node in node_list: if self.CM.ShouldBeStatus[node] != "up": self.CM.log("Warn: Node %s appeared out of nowhere" % (node)) self.CM.ShouldBeStatus[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) (_, out) = self.CM.rsh(node, self.CM["StatusCmd"] % node, verbose=1) self.NodeState[node] = out[0].strip() (_, out) = self.CM.rsh(node, self.CM["EpochCmd"], verbose=1) self.NodeEpoch[node] = out[0].strip() (_, out) = self.CM.rsh(node, self.CM["QuorumCmd"], verbose=1) self.NodeQuorum[node] = out[0].strip() self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoch[node], self.NodeQuorum[node])) self.NodeState[node] = self.trim_string(self.NodeState[node]) self.NodeEpoch[node] = self.trim2int(self.NodeEpoch[node]) self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) if not self.NodeEpoch[node]: self.CM.log("Warn: Node %s dissappeared: cant determin epoch" % (node)) self.CM.ShouldBeStatus[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch == None or self.NodeEpoch[node] < lowest_epoch: lowest_epoch = self.NodeEpoch[node] if not lowest_epoch: self.CM.log("Lowest epoch not determined in %s" % (partition)) passed = 0 for node in node_list: if self.CM.ShouldBeStatus[node] == "up": if self.CM.is_node_dc(node, self.NodeState[node]): dc_found.append(node) if self.NodeEpoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self.NodeEpoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self.CM.log("DC %s is not the oldest node (%d vs. %d)" % (node, self.NodeEpoch[node], lowest_epoch)) passed = 0 if len(dc_found) == 0: self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self.CM.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = 0 if passed == 0: for node in node_list: if self.CM.ShouldBeStatus[node] == "up": self.CM.log("epoch %s : %s" % (self.NodeEpoch[node], self.NodeState[node])) return passed def name(self): return "PartitionAudit" def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 AllAuditClasses.append(DiskAudit) AllAuditClasses.append(FileAudit) AllAuditClasses.append(LogAudit) AllAuditClasses.append(ControllerStateAudit) AllAuditClasses.append(PartitionAudit) AllAuditClasses.append(PrimitiveAudit) AllAuditClasses.append(GroupAudit) AllAuditClasses.append(CloneAudit) AllAuditClasses.append(ColocationAudit) AllAuditClasses.append(CIBAudit) def AuditList(cm): result = [] for auditclass in AllAuditClasses: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index 6fc902b17e..9ee99f1a92 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,645 +1,646 @@ """ Test environment classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["EnvFactory"] __copyright__ = "Copyright 2014-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse import os import random import socket import sys import time from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory +from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment, consisting largely of processing and storing command line parameters """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). It's possible we could fix this with type annotations, # but those were introduced with python 3.5 and we only support python 3.4. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like has_key, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} self._nodes = [] # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["IPagent"] = "IPaddr2" self["DoFencing"] = True self["ClobberCIB"] = False self["CIBfilename"] = None self["CIBResource"] = False - self["LogWatcher"] = "any" + self["LogWatcher"] = LogKind.ANY self["node-limit"] = 0 self["scenario"] = "random" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._target = "localhost" self._seed_random() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def _seed_random(self, seed=None): """ Initialize the random number generator with the given seed, or use the current time if None """ if not seed: seed = int(time.time()) self["RandSeed"] = seed self.random_gen.seed(str(seed)) def dump(self): """ Print the current environment """ keys = [] for key in list(self.data.keys()): keys.append(key) keys.sort() for key in keys: s = "Environment[%s]" % key self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key]))) def keys(self): """ Return a list of all environment keys stored in this instance """ return list(self.data.keys()) def has_key(self, key): """ Does the given environment key exist? """ if key == "nodes": return True return key in self.data def __getitem__(self, key): """ Return the given environment key, or None if it does not exist """ if str(key) == "0": raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead") if key == "nodes": return self._nodes if key == "Name": return self._get_stack_short() if key in self.data: return self.data[key] return None def __setitem__(self, key, value): """ Set the given environment key to the given value, overriding any previous value """ if key == "Stack": self._set_stack(value) elif key == "node-limit": self.data[key] = value self._filter_nodes() elif key == "nodes": self._nodes = [] for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: n = node.strip() socket.gethostbyname_ex(n) self._nodes.append(n) except: self._logger.log("%s not found in DNS... aborting" % node) raise self._filter_nodes() else: self.data[key] = value def random_node(self): """ Choose a random node from the cluster """ return self.random_gen.choice(self["nodes"]) def _set_stack(self, name): """ Normalize the given cluster stack name """ if name in ["corosync", "cs", "mcp"]: self.data["Stack"] = "corosync 2+" else: raise ValueError("Unknown stack: %s" % name) def _get_stack_short(self): """ Return the short name for the currently set cluster stack """ if "Stack" not in self.data: return "unknown" if self.data["Stack"] == "corosync 2+": return "crm-corosync" LogFactory().log("Unknown stack: %s" % self["stack"]) raise ValueError("Unknown stack: %s" % self["stack"]) def _detect_syslog(self): """ Detect the syslog variant in use on the target node """ if "syslogd" not in self.data: if self["have_systemd"]: # Systemd (_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) self["syslogd"] = lines[0].strip() else: # SYS-V (_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) self["syslogd"] = lines[0].strip() if "syslogd" not in self.data or not self["syslogd"]: # default self["syslogd"] = "rsyslog" def disable_service(self, node, service): """ Disable the given service on the given node """ if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl disable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s off" % service) return rc def enable_service(self, node, service): """ Enable the given service on the given node """ if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, "systemctl enable %s" % service) return rc # SYS-V (rc, _) = self._rsh(node, "chkconfig %s on" % service) return rc def service_is_enabled(self, node, service): """ Is the given service enabled on the given node? """ if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service) return rc == 0 # SYS-V (rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service) return rc == 0 def _detect_at_boot(self): """ Detect if the cluster starts at boot """ if "at-boot" not in self.data: self["at-boot"] = self.service_is_enabled(self._target, "corosync") \ or self.service_is_enabled(self._target, "pacemaker") def _detect_ip_offset(self): """ Detect the offset for IPaddr resources """ if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) return # pylint thinks self["IPBase"] is a list, not a string, which causes it # to error out because a list doesn't have split(). # pylint: disable=no-member if int(self["IPBase"].split('.')[3]) >= 240: self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s" % (self["IPBase"], self["IPBase"].split('.')[3])) self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) def _filter_nodes(self): """ If --limit-nodes is given, keep that many nodes from the front of the list of cluster nodes and drop the rest """ if self["node-limit"] > 0: if len(self["nodes"]) > self["node-limit"]: # pylint thinks self["node-limit"] is a list even though we initialize # it as an int in __init__ and treat it as an int everywhere. # pylint: disable=bad-string-format-type self._logger.log("Limiting the number of nodes configured=%d (max=%d)" %(len(self["nodes"]), self["node-limit"])) while len(self["nodes"]) > self["node-limit"]: self["nodes"].pop(len(self["nodes"])-1) def _validate(self): """ Were we given all the required command line parameters? """ if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """ Probe cluster nodes to figure out how to log and manage services """ self._target = random.Random().choice(self["nodes"]) exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser if "have_systemd" not in self.data: (rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 self._detect_syslog() self._detect_at_boot() self._detect_ip_offset() def _parse_args(self, argv): """ Parse and validate command line parameters, setting the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0]) grp1 = parser.add_argument_group("Common options") grp1.add_argument("-g", "--dsh-group", "--group", metavar="GROUP", dest="group", help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)") grp1.add_argument("-l", "--limit-nodes", type=int, default=0, metavar="MAX", help="Only use the first MAX cluster nodes supplied with --nodes") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", metavar="NODES", help="List of cluster nodes separated by whitespace") grp1.add_argument("--stack", default="corosync", metavar="STACK", help="Which cluster stack is installed") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes") grp2.add_argument("--at-boot", "--cluster-starts-at-boot", choices=["1", "0", "yes", "no"], help="Does the cluster software start at boot time?") grp2.add_argument("--facility", "--syslog-facility", default="daemon", metavar="NAME", help="Which syslog facility to log to") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named test") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--bsc", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--container-tests", action="store_true", help="Include pacemaker_remote tests that run in lxc container resources") grp4.add_argument("--experimental-tests", action="store_true", help="Include experimental tests") grp4.add_argument("--loop-minutes", type=int, default=60, help="") grp4.add_argument("--no-loop-tests", action="store_true", help="Don't run looping/time-based tests") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--oprofile", metavar="NODES", help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--qarsh", action="store_true", help="Use QARSH to access nodes instead of SSH") grp4.add_argument("--schema", metavar="SCHEMA", default="pacemaker-3.0", help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--set", action="append", metavar="ARG", default=[], help="Set key=value pairs (can be specified multiple times)") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--valgrind-procs", metavar="PROCS", default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd", help="Run valgrind against the given space-separated list of processes") grp4.add_argument("--valgrind-tests", action="store_true", help="Include tests using valgrind") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") parser.add_argument("iterations", type=int, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. They get a default from the add_argument # calls, only do one thing, and they do not have any side effects. self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["Stack"] = args.stack self["SyslogFacility"] = args.facility self["TruncateLog"] = args.truncate self["at-boot"] = args.at_boot in ["1", "yes"] self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["container-tests"] = args.container_tests self["experimental-tests"] = args.experimental_tests self["iterations"] = args.iterations self["loop-minutes"] = args.loop_minutes self["loop-tests"] = not args.no_loop_tests self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["node-limit"] = args.limit_nodes self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["valgrind-procs"] = args.valgrind_procs self["valgrind-tests"] = args.valgrind_tests self["warn-inactive"] = args.warn_inactive # Nodes and groups are mutually exclusive, so their defaults cannot be # set in their add_argument calls. Additionally, groups does more than # just set a value. Here, set nodes first and then if a group is # specified, override the previous nodes value. if args.nodes: self["nodes"] = args.nodes.split(" ") else: self["nodes"] = [] if args.group: self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group) LogFactory().add_file(self["OutputFile"], "CTS") dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group) if os.path.isfile(dsh_file): self["nodes"] = [] with open(dsh_file, "r", encoding="utf-8") as f: for line in f: l = line.strip() if not l.startswith('#'): self["nodes"].append(l) else: print("Unknown DSH group: %s" % args.dsh_group) # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.bsc: self["DoBSC"] = True self["scenario"] = "basic-sanity" if args.cib_filename: self["CIBfilename"] = args.cib_filename else: self["CIBfilename"] = None if args.choose: self["scenario"] = "sequence" self["tests"].append(args.choose) if args.fencing: if args.fencing in ["0", "no"]: self["DoFencing"] = False else: self["DoFencing"] = True if args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % ( os.environ['OS_REGION_NAME'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL'], os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'] ) elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile - self["LogWatcher"] = "remote" + self["LogWatcher"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.oprofile: self["oprofile"] = args.oprofile.split(" ") else: self["oprofile"] = [] if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True if args.qarsh: self._rsh.enable_qarsh() for kv in args.set: (name, value) = kv.split("=") self[name] = value print("Setting %s = %s" % (name, value)) class EnvFactory: """ A class for constructing a singleton instance of an Environment object """ instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Returns the previously created instance of Environment, or creates a new instance if one does not already exist. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance diff --git a/python/pacemaker/_cts/watcher.py b/python/pacemaker/_cts/watcher.py index a0fc356c5c..9f528f176f 100644 --- a/python/pacemaker/_cts/watcher.py +++ b/python/pacemaker/_cts/watcher.py @@ -1,388 +1,405 @@ """ Log searching classes for Pacemaker's Cluster Test Suite (CTS) """ -__all__ = ["LogWatcher"] +__all__ = ["LogKind", "LogWatcher"] __copyright__ = "Copyright 2014-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" +from enum import Enum, unique import re import time import threading from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory LOG_WATCHER_BIN = BuildOptions.DAEMON_DIR + "/cts-log-watcher" +@unique +class LogKind(Enum): + """ The various kinds of log files that can be watched """ + + ANY = 0 + FILE = 1 + REMOTE_FILE = 2 + JOURNAL = 3 + + def __str__(self): + if self.value == 0: + return "any" + if self.value == 1: + return "combined syslog" + if self.value == 2: + return "remote" + + return "journal" + class SearchObj: def __init__(self, filename, host=None, name=None): self.cache = [] self.filename = filename self.limit = None self.logger = LogFactory() self.name = name self.offset = "EOF" self.rsh = RemoteFactory().getInstance() if host: self.host = host else: self.host = "localhost" def __str__(self): if self.host: return "%s:%s" % (self.host, self.filename) return self.filename def log(self, args): message = "lw: %s: %s" % (self, args) self.logger.log(message) def debug(self, args): message = "lw: %s: %s" % (self, args) self.logger.debug(message) def harvest(self, delegate=None): async_task = self.harvest_async(delegate) async_task.join() def harvest_async(self, delegate=None): raise NotImplementedError def end(self): self.debug("Unsetting the limit") self.limit = None class FileObj(SearchObj): def __init__(self, filename, host=None, name=None): SearchObj.__init__(self, filename, host, name) self._delegate = None self.harvest() def async_complete(self, pid, returncode, out, err): for line in out: match = re.search(r"^CTSwatcher:Last read: (\d+)", line) if match: self.offset = match.group(1) self.debug("Got %d lines, new offset: %s %s" % (len(out), self.offset, repr(self._delegate))) elif re.search(r"^CTSwatcher:.*truncated", line): self.log(line) elif re.search(r"^CTSwatcher:", line): self.debug("Got control line: %s" % line) else: self.cache.append(line) if self._delegate: self._delegate.async_complete(pid, returncode, self.cache, err) def harvest_async(self, delegate=None): self._delegate = delegate self.cache = [] if self.limit and (self.offset == "EOF" or int(self.offset) > self.limit): if self._delegate: self._delegate.async_complete(-1, -1, [], []) return None return self.rsh.call_async(self.host, "%s -t %s -p CTSwatcher: -l 200 -f %s -o %s" % (LOG_WATCHER_BIN, self.name, self.filename, self.offset), delegate=self) def set_end(self): if self.limit: return # pylint: disable=not-callable (_, lines) = self.rsh(self.host, "%s -t %s -p CTSwatcher: -l 2 -f %s -o %s" % (LOG_WATCHER_BIN, self.name, self.filename, "EOF"), verbose=0) for line in lines: match = re.search(r"^CTSwatcher:Last read: (\d+)", line) if match: self.limit = int(match.group(1)) self.debug("Set limit to: %d" % self.limit) class JournalObj(SearchObj): def __init__(self, host=None, name=None): SearchObj.__init__(self, name, host, name) self._delegate = None self._hit_limit = False self.harvest() def async_complete(self, pid, returncode, out, err): found_cursor = False for line in out: match = re.search(r"^-- cursor: ([^.]+)", line) if match: found_cursor = True self.offset = match.group(1).strip() self.debug("Got %d lines, new cursor: %s" % (len(out), self.offset)) else: self.cache.append(line) if self.limit and not found_cursor: self._hit_limit = True self.debug("Got %d lines but no cursor: %s" % (len(out), self.offset)) # Get the current cursor # pylint: disable=not-callable (_, out) = self.rsh(self.host, "journalctl -q -n 0 --show-cursor", verbose=0) for line in out: match = re.search(r"^-- cursor: ([^.]+)", line) if match: self.offset = match.group(1).strip() self.debug("Got %d lines, new cursor: %s" % (len(out), self.offset)) else: self.log("Not a new cursor: %s" % line) self.cache.append(line) if self._delegate: self._delegate.async_complete(pid, returncode, self.cache, err) def harvest_async(self, delegate=None): self._delegate = delegate self.cache = [] # Use --lines to prevent journalctl from overflowing the Popen input buffer if self.limit and self._hit_limit: return None if self.offset == "EOF": command = "journalctl -q -n 0 --show-cursor" elif self.limit: command = "journalctl -q --after-cursor='%s' --until '%s' --lines=200 --show-cursor" % (self.offset, self.limit) else: command = "journalctl -q --after-cursor='%s' --lines=200 --show-cursor" % (self.offset) return self.rsh.call_async(self.host, command, delegate=self) def set_end(self): if self.limit: return self._hit_limit = False # pylint: disable=not-callable (rc, lines) = self.rsh(self.host, "date +'%Y-%m-%d %H:%M:%S'", verbose=0) if rc == 0 and len(lines) == 1: self.limit = lines[0].strip() self.debug("Set limit to: %s" % self.limit) else: self.debug("Unable to set limit for %s because date returned %d lines with status %d" % (self.host, len(lines), rc)) class LogWatcher: '''This class watches logs for messages that fit certain regular expressions. Watching logs for events isn't the ideal way to do business, but it's better than nothing :-) On the other hand, this class is really pretty cool ;-) The way you use this class is as follows: Construct a LogWatcher object Call set_watch() when you want to start watching the log Call look() to scan the log looking for the patterns ''' - def __init__(self, log, regexes, hosts, kind, name="Anon", timeout=10, silent=False): + def __init__(self, log, regexes, hosts, kind=LogKind.ANY, name="Anon", timeout=10, silent=False): '''This is the constructor for the LogWatcher class. It takes a log name to watch, and a list of regular expressions to watch for." ''' self.filename = log self.hosts = hosts self.kind = kind self.name = name self.regexes = regexes self.unmatched = None self.whichmatch = -1 self._cache_lock = threading.Lock() self._file_list = [] self._line_cache = [] self._logger = LogFactory() self._timeout = int(timeout) # Validate our arguments. Better sooner than later ;-) for regex in regexes: re.compile(regex) if not self.hosts: raise ValueError("LogWatcher requires hosts argument") - if not self.kind: - raise ValueError("LogWatcher requires kind argument") - if not self.filename: raise ValueError("LogWatcher requires log argument") if not silent: for regex in self.regexes: self._debug("Looking for regex: %s" % regex) def _debug(self, args): message = "lw: %s: %s" % (self.name, args) self._logger.debug(message) def set_watch(self): '''Mark the place to start watching the log from. ''' - if self.kind == "remote": + if self.kind == LogKind.REMOTE_FILE: for node in self.hosts: self._file_list.append(FileObj(self.filename, node, self.name)) - elif self.kind == "journal": + elif self.kind == LogKind.JOURNAL: for node in self.hosts: self._file_list.append(JournalObj(node, self.name)) else: self._file_list.append(FileObj(self.filename)) def async_complete(self, pid, returncode, out, err): # It's not clear to me whether this function ever gets called as # delegate somewhere, which is what would pass returncode and err # as parameters. Just disable the warning for now. # pylint: disable=unused-argument # TODO: Probably need a lock for updating self._line_cache self._logger.debug("%s: Got %d lines from %d (total %d)" % (self.name, len(out), pid, len(self._line_cache))) if out: with self._cache_lock: self._line_cache.extend(out) def __get_lines(self): if not self._file_list: raise ValueError("No sources to read from") pending = [] for f in self._file_list: t = f.harvest_async(self) if t: pending.append(t) for t in pending: t.join(60.0) if t.is_alive(): self._logger.log("%s: Aborting after 20s waiting for %s logging commands" % (self.name, repr(t))) return def end(self): for f in self._file_list: f.end() def look(self, timeout=None): '''Examine the log looking for the given patterns. It starts looking from the place marked by set_watch(). This function looks in the file in the fashion of tail -f. It properly recovers from log file truncation, but not from removing and recreating the log. It would be nice if it recovered from this as well :-) We return the first line which matches any of our patterns. ''' if not timeout: timeout = self._timeout lines = 0 begin = time.time() end = begin + timeout + 1 if not self.regexes: self._debug("Nothing to look for") return None if timeout == 0: for f in self._file_list: f.set_end() while True: if self._line_cache: lines += 1 with self._cache_lock: line = self._line_cache[0] self._line_cache.remove(line) which = -1 if re.search("CTS:", line): continue for regex in self.regexes: which += 1 matchobj = re.search(regex, line) if matchobj: self.whichmatch = which self._debug("Matched: %s" % line) return line elif timeout > 0 and end < time.time(): timeout = 0 for f in self._file_list: f.set_end() else: self.__get_lines() if not self._line_cache and end < time.time(): self._debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines)) return None self._debug("Waiting: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), len(self._line_cache))) time.sleep(1) self._debug("How did we get here") return None def look_for_all(self, allow_multiple_matches=False, silent=False): '''Examine the log looking for ALL of the given patterns. It starts looking from the place marked by set_watch(). We return when the timeout is reached, or when we have found ALL of the regexes that were part of the watch ''' save_regexes = self.regexes result = [] if not silent: self._debug("starting search: timeout=%d" % self._timeout) while self.regexes: one_result = self.look(self._timeout) if not one_result: self.unmatched = self.regexes self.regexes = save_regexes self.end() return None result.append(one_result) if not allow_multiple_matches: del self.regexes[self.whichmatch] else: # Allow multiple regexes to match a single line tmp_regexes = self.regexes self.regexes = [] for regex in tmp_regexes: matchobj = re.search(regex, one_result) if not matchobj: self.regexes.append(regex) self.unmatched = None self.regexes = save_regexes return result