diff --git a/cts/CTSaudits.py b/cts/CTSaudits.py index d13416eb5e..902558b194 100755 --- a/cts/CTSaudits.py +++ b/cts/CTSaudits.py @@ -1,862 +1,864 @@ '''CTS: Cluster Testing System: Audit module ''' __copyright__ = ''' Copyright (C) 2000, 2001,2005 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. import time, os, string, re, uuid import CTS from watcher import LogWatcher class ClusterAudit: def __init__(self, cm): self.CM = cm def __call__(self): raise ValueError("Abstract Class member (__call__)") def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def log(self, args): self.CM.log("audit: %s" % args) def debug(self, args): self.CM.debug("audit: %s" % args) def name(self): raise ValueError("Abstract Class member (name)") AllAuditClasses = [ ] class LogAudit(ClusterAudit): def name(self): return "LogAudit" def __init__(self, cm): self.CM = cm self.kinds = [ "combined syslog", "journal", "remote" ] def RestartClusterLogging(self, nodes=None): if not nodes: nodes = self.CM.Env["nodes"] self.CM.debug("Restarting logging on: %s" % repr(nodes)) for node in nodes: if self.CM.Env["have_systemd"]: if self.CM.rsh(node, "systemctl stop systemd-journald.socket") != 0: self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) if self.CM.rsh(node, "systemctl start systemd-journald.service") != 0: self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) if self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"]) != 0: self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node)) def TestLogging(self): patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self.CM.Env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) watch_pref = self.CM.Env["LogWatcher"] if watch_pref == "any": for k in self.kinds: watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, "LogAudit", 5, silent=True, hosts=self.CM.Env["nodes"], kind=k) watch[k].setwatch() else: k = watch_pref watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, "LogAudit", 5, silent=True, hosts=self.CM.Env["nodes"], kind=k) watch[k].setwatch() if watch_pref == "any": self.CM.log("Writing log with key: %s" % (suffix)) for node in self.CM.Env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix) if self.CM.rsh(node, cmd, synchronous=0, silent=True) != 0: self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in self.kinds: if watch.has_key(k): w = watch[k] if watch_pref == "any": self.CM.log("Testing for %s logs" % (k)) w.lookforall(silent=True) if not w.unmatched: if watch_pref == "any": self.CM.log ("Continuing with %s-based log reader" % (w.kind)) self.CM.Env["LogWatcher"] = w.kind return 1 for k in watch.keys(): w = watch[k] if w.unmatched: for regex in w.unmatched: self.CM.log ("Test message [%s] not found in %s logs." % (regex, w.kind)) return 0 def __call__(self): max = 3 attempt = 0 self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) while attempt <= max and self.TestLogging() == 0: attempt = attempt + 1 self.RestartClusterLogging() time.sleep(60*attempt) if attempt > max: self.CM.log("ERROR: Cluster logging unrecoverable.") return 0 return 1 def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 + if self.CM.Env["LogAuditDisabled"]: + return 0 return 1 class DiskAudit(ClusterAudit): def name(self): return "DiskspaceAudit" def __init__(self, cm): self.CM = cm def __call__(self): result = 1 dfcmd = "df -BM /var/log | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'" self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: dfout = self.CM.rsh(node, dfcmd, 1) if not dfout: self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) else: try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMb)" % (node, used_percent, remaining_mb)) result = None answer = raw_input('Continue? [nY] ') if answer and answer == "n": raise ValueError("Disk full on %s" % (node)) ret = 0 elif remaining_mb < 100 or used_percent > 90: self.CM.log("WARN: Low on log disk space (%d Mbytes) on %s" % (remaining_mb, node)) return result def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 return 1 class FileAudit(ClusterAudit): def name(self): return "FileAudit" def __init__(self, cm): self.CM = cm self.known = [] def __call__(self): result = 1 self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: (rc, lsout) = self.CM.rsh(node, "ls -al /var/lib/heartbeat/cores/* | grep core.[0-9]", None) for line in lsout: line = line.strip() if line not in self.known: result = 0 self.known.append(line) self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (rc, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", None) for line in lsout: line = line.strip() if line not in self.known: result = 0 self.known.append(line) self.CM.log("Warning: Corosync core file on %s: %s" % (node, line)) if self.CM.ShouldBeStatus.has_key(node) and self.CM.ShouldBeStatus[node] == "down": clean = 0 (rc, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", None) for line in lsout: result = 0 clean = 1 self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (rc, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", None) for line in lsout: self.CM.debug("ps[%s]: %s" % (node, line)) self.CM.rsh(node, "rm -f /dev/shm/qb-*") else: self.CM.debug("Skipping %s" % node) return result def is_applicable(self): return 1 class AuditResource: def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None def unique(self): if self.flags & int("0x00000020", 16): return 1 return 0 def orphan(self): if self.flags & int("0x00000001", 16): return 1 return 0 def managed(self): if self.flags & int("0x00000002", 16): return 1 return 0 class AuditConstraint: def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): def name(self): return "PrimitiveAudit" def __init__(self, cm): self.CM = cm def doResourceAudit(self, resource, quorum): rc = 1 active = self.CM.ResourceLocation(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %s" % (resource.id, repr(active))) elif resource.needs_quorum == 1: self.CM.log("Resource %s active without quorum: %s" % (resource.id, repr(active))) rc = 0 elif not resource.managed(): self.CM.log("Resource %s not managed. Active on %s" % (resource.id, repr(active))) elif not resource.unique(): # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %s" % (resource.id, repr(active))) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self.CM.log("Resource %s is active multiple times: %s" % (resource.id, repr(active))) rc = 0 elif resource.orphan(): self.debug("Resource %s is an inactive orphan" % resource.id) elif len(self.inactive_nodes) == 0: self.CM.log("WARN: Resource %s not served anywhere" % resource.id) rc = 0 elif self.CM.Env["warn-inactive"] == 1: if quorum or not resource.needs_quorum: self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) else: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) return rc def setup(self): self.target = None self.resources = [] self.constraints = [] self.active_nodes = [] self.inactive_nodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == "up": self.active_nodes.append(node) else: self.inactive_nodes.append(node) for node in self.CM.Env["nodes"]: if self.target == None and self.CM.ShouldBeStatus[node] == "up": self.target = node if not self.target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name()) return 0 (rc, lines) = self.CM.rsh(self.target, "crm_resource -c", None) for line in lines: if re.search("^Resource", line): self.resources.append(AuditResource(self.CM, line)) elif re.search("^Constraint", line): self.constraints.append(AuditConstraint(self.CM, line)) else: self.CM.log("Unknown entry: %s" % line); return 1 def __call__(self): rc = 1 if not self.setup(): return 1 quorum = self.CM.HasQuorum(None) for resource in self.resources: if resource.type == "primitive": if self.doResourceAudit(resource, quorum) == 0: rc = 0 return rc def is_applicable(self): if self.CM["Name"] == "crm-lha": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 class GroupAudit(PrimitiveAudit): def name(self): return "GroupAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for group in self.resources: if group.type == "group": first_match = 1 group_location = None for child in self.resources: if child.parent == group.id: nodes = self.CM.ResourceLocation(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = 0 if len(nodes) > 1: rc = 0 self.CM.log("Child %s of %s is active more than once: %s" % (child.id, group.id, repr(nodes))) elif len(nodes) == 0: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: rc = 0 self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return rc class CloneAudit(PrimitiveAudit): def name(self): return "CloneAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for clone in self.resources: if clone.type == "clone": for child in self.resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return rc class ColocationAudit(PrimitiveAudit): def name(self): return "ColocationAudit" def crm_location(self, resource): (rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, None) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): rc = 1 if not self.setup(): return 1 for coloc in self.constraints: if coloc.type == "rsc_colocation": source = self.crm_location(coloc.rsc) target = self.crm_location(coloc.target) if len(source) == 0: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: rc = 0 self.CM.log("Colocation audit (%s): %s running on %s (not in %s)" % (coloc.id, coloc.rsc, node, repr(target))) else: self.debug("Colocation audit (%s): %s running on %s (in %s)" % (coloc.id, coloc.rsc, node, repr(target))) return rc class CrmdStateAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self.CM.Env["nodes"]: should_be = self.CM.ShouldBeStatus[node] rc = self.CM.test_node_CM(node) if rc > 0: if should_be == "down": down_are_up = down_are_up + 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down = up_are_down + 1 if len(unstable_list) > 0: passed = 0 self.CM.log("Cluster is not stable: %d (of %d): %s" % (len(unstable_list), self.CM.upcount(), repr(unstable_list))) if up_are_down > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self.CM.Env["nodes"]))) if down_are_up > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self.CM.Env["nodes"]))) return passed def name(self): return "CrmdStateAudit" def is_applicable(self): if self.CM["Name"] == "crm-lha": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 class CIBAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 ccm_partitions = self.CM.find_partitions() if len(ccm_partitions) == 0: self.debug("\tNo partitions to audit") return 1 for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) partition_passed = 0 if self.audit_cib_contents(partition) == 0: passed = 0 return passed def audit_cib_contents(self, hostlist): passed = 1 node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self.store_remote_cib(node, node0) if node_xml == None: self.CM.log("Could not perform audit: No configuration from %s" % node) passed = 0 elif node0 == None: node0 = node node0_xml = node_xml elif node0_xml == None: self.CM.log("Could not perform audit: No configuration from %s" % node0) passed = 0 else: (rc, result) = self.CM.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), None) if rc != 0: self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = 0 for line in result: if not re.search("", line): passed = 0 self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) # self.CM.rsh(node0, "rm -f %s" % node_xml) # self.CM.rsh(node0, "rm -f %s" % node0_xml) return passed def store_remote_cib(self, node, target): combined = "" filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], None) if rc != 0: self.CM.log("Could not retrieve configuration") return None self.CM.rsh("localhost", "rm -f %s" % filename) for line in lines: self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), silent=True) if self.CM.rsh.cp(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self.CM.log("Could not store configuration") return None return filename def name(self): return "CibAudit" def is_applicable(self): if self.CM["Name"] == "crm-lha": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 class PartitionAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} self.NodeEpoche = {} self.NodeState = {} self.NodeQuorum = {} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 ccm_partitions = self.CM.find_partitions() if ccm_partitions == None or len(ccm_partitions) == 0: return 1 self.CM.cluster_stable(double_check=True) if len(ccm_partitions) != self.CM.partitions_expected: self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) passed = 0 for partition in ccm_partitions: self.CM.log("\t %s" % partition) for partition in ccm_partitions: partition_passed = 0 if self.audit_partition(partition) == 0: passed = 0 return passed def trim_string(self, avalue): if not avalue: return None if len(avalue) > 1: return avalue[:-1] def trim2int(self, avalue): if not avalue: return None if len(avalue) > 1: return int(avalue[:-1]) def audit_partition(self, partition): passed = 1 dc_found = [] dc_allowed_list = [] lowest_epoche = None node_list = partition.split() self.debug("Auditing partition: %s" % (partition)) for node in node_list: if self.CM.ShouldBeStatus[node] != "up": self.CM.log("Warn: Node %s appeared out of nowhere" % (node)) self.CM.ShouldBeStatus[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) self.NodeState[node] = self.CM.rsh(node, self.CM["StatusCmd"] % node, 1) self.NodeEpoche[node] = self.CM.rsh(node, self.CM["EpocheCmd"], 1) self.NodeQuorum[node] = self.CM.rsh(node, self.CM["QuorumCmd"], 1) self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoche[node], self.NodeQuorum[node])) self.NodeState[node] = self.trim_string(self.NodeState[node]) self.NodeEpoche[node] = self.trim2int(self.NodeEpoche[node]) self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) if not self.NodeEpoche[node]: self.CM.log("Warn: Node %s dissappeared: cant determin epoche" % (node)) self.CM.ShouldBeStatus[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoche == None or self.NodeEpoche[node] < lowest_epoche: lowest_epoche = self.NodeEpoche[node] if not lowest_epoche: self.CM.log("Lowest epoche not determined in %s" % (partition)) passed = 0 for node in node_list: if self.CM.ShouldBeStatus[node] == "up": if self.CM.is_node_dc(node, self.NodeState[node]): dc_found.append(node) if self.NodeEpoche[node] == lowest_epoche: self.debug("%s: OK" % node) elif not self.NodeEpoche[node]: self.debug("Check on %s ignored: no node epoche" % node) elif not lowest_epoche: self.debug("Check on %s ignored: no lowest epoche" % node) else: self.CM.log("DC %s is not the oldest node (%d vs. %d)" % (node, self.NodeEpoche[node], lowest_epoche)) passed = 0 if len(dc_found) == 0: self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self.CM.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = 0 if passed == 0: for node in node_list: if self.CM.ShouldBeStatus[node] == "up": self.CM.log("epoche %s : %s" % (self.NodeEpoche[node], self.NodeState[node])) return passed def name(self): return "PartitionAudit" def is_applicable(self): if self.CM["Name"] == "crm-lha": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 AllAuditClasses.append(DiskAudit) AllAuditClasses.append(FileAudit) AllAuditClasses.append(LogAudit) AllAuditClasses.append(CrmdStateAudit) AllAuditClasses.append(PartitionAudit) AllAuditClasses.append(PrimitiveAudit) AllAuditClasses.append(GroupAudit) AllAuditClasses.append(CloneAudit) AllAuditClasses.append(ColocationAudit) AllAuditClasses.append(CIBAudit) def AuditList(cm): result = [] for auditclass in AllAuditClasses: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/cts/environment.py b/cts/environment.py index 0e505ee537..00b7502cd1 100644 --- a/cts/environment.py +++ b/cts/environment.py @@ -1,656 +1,662 @@ ''' Classes related to producing and searching logs ''' __copyright__=''' Copyright (C) 2014 Andrew Beekhof Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. import types, string, select, sys, time, re, os, struct, signal, socket import time, syslog, random, traceback, base64, pickle, binascii, fcntl from cts.remote import * class Environment: def __init__(self, args): print repr(self) self.data = {} self.Nodes = [] self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["IPagent"] = "IPaddr2" self["DoStandby"] = 1 self["DoFencing"] = 1 self["XmitLoss"] = "0.0" self["RecvLoss"] = "0.0" self["ClobberCIB"] = 0 self["CIBfilename"] = None self["CIBResource"] = 0 self["DoBSC"] = 0 self["use_logd"] = 0 self["oprofile"] = [] self["warn-inactive"] = 0 self["ListTests"] = 0 self["benchmark"] = 0 self["LogWatcher"] = "any" self["SyslogFacility"] = "daemon" self["LogFileName"] = "/var/log/messages" self["Schema"] = "pacemaker-2.0" self["Stack"] = "openais" self["stonith-type"] = "external/ssh" self["stonith-params"] = "hostlist=all,livedangerously=yes" self["loop-minutes"] = 60 self["valgrind-prefix"] = None self["valgrind-procs"] = "cib crmd attrd pengine stonith-ng" self["valgrind-opts"] = """--leak-check=full --show-reachable=yes --trace-children=no --num-callers=25 --gen-suppressions=all --suppressions="""+CTSvars.CTS_home+"""/cts.supp""" self["experimental-tests"] = 0 self["container-tests"] = 0 self["valgrind-tests"] = 0 self["unsafe-tests"] = 1 self["loop-tests"] = 1 self["scenario"] = "random" self["stats"] = 0 self.RandomGen = random.Random() self.logger = LogFactory() self.SeedRandom() self.rsh = RemoteFactory().getInstance() self.target = "localhost" self.parse_args(args) self.discover() self.validate() def SeedRandom(self, seed=None): if not seed: seed = int(time.time()) if self.has_key("RandSeed"): self.logger.log("New random seed is: " + str(seed)) else: self.logger.log("Random seed is: " + str(seed)) self["RandSeed"] = seed self.RandomGen.seed(str(seed)) def dump(self): keys = [] for key in self.data.keys(): keys.append(key) keys.sort() for key in keys: self.logger.debug("Environment["+key+"]:\t"+str(self[key])) def keys(self): return self.data.keys() def has_key(self, key): if key == "nodes": return True return self.data.has_key(key) def __getitem__(self, key): if key == "nodes": return self.Nodes elif key == "Name": return self.get_stack_short() elif self.data.has_key(key): return self.data[key] else: return None def __setitem__(self, key, value): if key == "Stack": self.set_stack(value) elif key == "node-limit": self.data[key] = value self.filter_nodes() elif key == "nodes": self.Nodes = [] for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: n = node.strip() gethostbyname_ex(n) self.Nodes.append(n) except: self.logger.log(node+" not found in DNS... aborting") raise self.filter_nodes() else: self.data[key] = value def RandomNode(self): '''Choose a random node from the cluster''' return self.RandomGen.choice(self["nodes"]) def set_stack(self, name): # Normalize stack names if name == "heartbeat" or name == "lha": self.data["Stack"] = "heartbeat" elif name == "openais" or name == "ais" or name == "whitetank": self.data["Stack"] = "openais (whitetank)" elif name == "corosync" or name == "cs" or name == "mcp": self.data["Stack"] = "corosync 2.x" elif name == "cman": self.data["Stack"] = "corosync (cman)" elif name == "v1": self.data["Stack"] = "corosync (plugin v1)" elif name == "v0": self.data["Stack"] = "corosync (plugin v0)" else: print "Unknown stack: "+name sys.exit(1) def get_stack_short(self): # Create the Cluster Manager object if not self.data.has_key("Stack"): return "unknown" elif self.data["Stack"] == "heartbeat": return "crm-lha" elif self.data["Stack"] == "corosync 2.x": return "crm-mcp" elif self.data["Stack"] == "corosync (cman)": return "crm-cman" elif self.data["Stack"] == "corosync (plugin v1)": return "crm-plugin-v1" elif self.data["Stack"] == "corosync (plugin v0)": return "crm-plugin-v0" else: LogFactory().log("Unknown stack: "+self.data["stack"]) sys.exit(1) def detect_syslog(self): # Detect syslog variant if not self.has_key("syslogd"): if self["have_systemd"]: # Systemd self["syslogd"] = self.rsh(self.target, "systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", stdout=1).strip() else: # SYS-V self["syslogd"] = self.rsh(self.target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", stdout=1).strip() if not self.has_key("syslogd") or not self["syslogd"]: # default self["syslogd"] = "rsyslog" def detect_at_boot(self): # Detect if the cluster starts at boot if not self.has_key("at-boot"): atboot = 0 if self["have_systemd"]: # Systemd atboot = atboot or not self.rsh(self.target, "systemctl is-enabled heartbeat.service") atboot = atboot or not self.rsh(self.target, "systemctl is-enabled corosync.service") atboot = atboot or not self.rsh(self.target, "systemctl is-enabled pacemaker.service") else: # SYS-V atboot = atboot or not self.rsh(self.target, "chkconfig --list | grep -e corosync.*on -e heartbeat.*on -e pacemaker.*on") self["at-boot"] = atboot def detect_ip_offset(self): # Try to determin an offset for IPaddr resources if self["CIBResource"] and not self.has_key("IPBase"): network=self.rsh(self.target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", stdout=1).strip() self["IPBase"] = self.rsh(self.target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, stdout=1).strip() if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self.logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self.logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) elif int(self["IPBase"].split('.')[3]) >= 240: self.logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s" % (self["IPBase"], self["IPBase"].split('.')[3])) self["IPBase"] = " fe80::1234:56:7890:1000" self.logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) def filter_nodes(self): if self["node-limit"] > 0: if len(self["nodes"]) > self["node-limit"]: self.logger.log("Limiting the number of nodes configured=%d (max=%d)" %(len(self["nodes"]), self["node-limit"])) while len(self["nodes"]) > self["node-limit"]: self["nodes"].pop(len(self["nodes"])-1) def validate(self): if len(self["nodes"]) < 1: print "No nodes specified!" sys.exit(1) def discover(self): self.target = random.Random().choice(self["nodes"]) master = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(master)[2]: if ip != "127.0.0.1": master = ip break; self["cts-master"] = master if self.has_key("have_systemd"): self["have_systemd"] = not rsh(discover, "systemctl list-units") self.detect_syslog() self.detect_at_boot() self.detect_ip_offset() self.validate() def parse_args(self, args): skipthis=None if not args: args=sys.argv[1:] for i in range(0, len(args)): if skipthis: skipthis=None continue elif args[i] == "-l" or args[i] == "--limit-nodes": skipthis=1 self["node-limit"] = int(args[i+1]) elif args[i] == "-r" or args[i] == "--populate-resources": self["CIBResource"] = 1 self["ClobberCIB"] = 1 - elif args[i] == "-L" or args[i] == "--logfile" or args[i] == "--outputfile": + elif args[i] == "--outputfile": skipthis=1 LogFactory().add_file(args[i+1]) + elif args[i] == "-L" or args[i] == "--logfile": + skipthis=1 + self["LogWatcher"] = "remote" + self["LogAuditDisabled"] = 1 + self["LogFileName"] = args[i+1] + elif args[i] == "--ip" or args[i] == "--test-ip-base": skipthis=1 self["IPBase"] = args[i+1] self["CIBResource"] = 1 self["ClobberCIB"] = 1 elif args[i] == "--oprofile": skipthis=1 self["oprofile"] = args[i+1].split(' ') elif args[i] == "--trunc": self["TruncateLog"]=1 elif args[i] == "--list-tests" or args[i] == "--list" : self["ListTests"]=1 elif args[i] == "--benchmark": self["benchmark"]=1 elif args[i] == "--bsc": self["DoBSC"] = 1 self["scenario"] = "basic-sanity" elif args[i] == "--qarsh": RemoteFactory().enable_qarsh() elif args[i] == "--stonith" or args[i] == "--fencing": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": self["DoFencing"]=1 elif args[i+1] == "0" or args[i+1] == "no": self["DoFencing"]=0 elif args[i+1] == "rhcs" or args[i+1] == "xvm" or args[i+1] == "virt": self["DoStonith"]=1 self["stonith-type"] = "fence_xvm" self["stonith-params"] = "pcmk_arg_map=domain:uname,delay=0" elif args[i+1] == "scsi": self["DoStonith"]=1 self["stonith-type"] = "fence_scsi" self["stonith-params"] = "delay=0" elif args[i+1] == "ssh" or args[i+1] == "lha": self["DoStonith"]=1 self["stonith-type"] = "external/ssh" self["stonith-params"] = "hostlist=all,livedangerously=yes" elif args[i+1] == "north": self["DoStonith"]=1 self["stonith-type"] = "fence_apc" self["stonith-params"] = "ipaddr=north-apc,login=apc,passwd=apc,pcmk_host_map=north-01:2;north-02:3;north-03:4;north-04:5;north-05:6;north-06:7;north-07:9;north-08:10;north-09:11;north-10:12;north-11:13;north-12:14;north-13:15;north-14:18;north-15:17;north-16:19;" elif args[i+1] == "south": self["DoStonith"]=1 self["stonith-type"] = "fence_apc" self["stonith-params"] = "ipaddr=south-apc,login=apc,passwd=apc,pcmk_host_map=south-01:2;south-02:3;south-03:4;south-04:5;south-05:6;south-06:7;south-07:9;south-08:10;south-09:11;south-10:12;south-11:13;south-12:14;south-13:15;south-14:18;south-15:17;south-16:19;" elif args[i+1] == "east": self["DoStonith"]=1 self["stonith-type"] = "fence_apc" self["stonith-params"] = "ipaddr=east-apc,login=apc,passwd=apc,pcmk_host_map=east-01:2;east-02:3;east-03:4;east-04:5;east-05:6;east-06:7;east-07:9;east-08:10;east-09:11;east-10:12;east-11:13;east-12:14;east-13:15;east-14:18;east-15:17;east-16:19;" elif args[i+1] == "west": self["DoStonith"]=1 self["stonith-type"] = "fence_apc" self["stonith-params"] = "ipaddr=west-apc,login=apc,passwd=apc,pcmk_host_map=west-01:2;west-02:3;west-03:4;west-04:5;west-05:6;west-06:7;west-07:9;west-08:10;west-09:11;west-10:12;west-11:13;west-12:14;west-13:15;west-14:18;west-15:17;west-16:19;" elif args[i+1] == "openstack": self["DoStonith"]=1 self["stonith-type"] = "fence_openstack" print "Obtaining OpenStack credentials from the current environment" self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % ( os.environ['OS_REGION_NAME'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL'], os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'] ) elif args[i+1] == "rhevm": self["DoStonith"]=1 self["stonith-type"] = "fence_rhevm" print "Obtaining RHEV-M credentials from the current environment" self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) else: self.usage(args[i+1]) elif args[i] == "--stonith-type": self["stonith-type"] = args[i+1] skipthis=1 elif args[i] == "--stonith-args": self["stonith-params"] = args[i+1] skipthis=1 elif args[i] == "--standby": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": self["DoStandby"] = 1 elif args[i+1] == "0" or args[i+1] == "no": self["DoStandby"] = 0 else: self.usage(args[i+1]) elif args[i] == "--clobber-cib" or args[i] == "-c": self["ClobberCIB"] = 1 elif args[i] == "--cib-filename": skipthis=1 self["CIBfilename"] = args[i+1] elif args[i] == "--xmit-loss": try: float(args[i+1]) except ValueError: print ("--xmit-loss parameter should be float") self.usage(args[i+1]) skipthis=1 self["XmitLoss"] = args[i+1] elif args[i] == "--recv-loss": try: float(args[i+1]) except ValueError: print ("--recv-loss parameter should be float") self.usage(args[i+1]) skipthis=1 self["RecvLoss"] = args[i+1] elif args[i] == "--choose": skipthis=1 self["tests"].append(args[i+1]) self["scenario"] = "sequence" elif args[i] == "--nodes": skipthis=1 self["nodes"] = args[i+1].split(' ') elif args[i] == "-g" or args[i] == "--group" or args[i] == "--dsh-group": skipthis=1 self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args[i+1]) LogFactory().add_file(self["OutputFile"], "CTS") dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args[i+1]) # Hacks to make my life easier if args[i+1] == "r6": self["Stack"] = "cman" self["DoStonith"]=1 self["stonith-type"] = "fence_xvm" self["stonith-params"] = "delay=0" self["IPBase"] = " fe80::1234:56:7890:4000" elif args[i+1] == "virt1": self["Stack"] = "corosync" self["DoStonith"]=1 self["stonith-type"] = "fence_xvm" self["stonith-params"] = "delay=0" self["IPBase"] = " fe80::1234:56:7890:1000" elif args[i+1] == "east16" or args[i+1] == "nsew": self["Stack"] = "corosync" self["DoStonith"]=1 self["stonith-type"] = "fence_apc" self["stonith-params"] = "ipaddr=east-apc,login=apc,passwd=apc,pcmk_host_map=east-01:2;east-02:3;east-03:4;east-04:5;east-05:6;east-06:7;east-07:9;east-08:10;east-09:11;east-10:12;east-11:13;east-12:14;east-13:15;east-14:18;east-15:17;east-16:19;" self["IPBase"] = " fe80::1234:56:7890:2000" if args[i+1] == "east16": # Requires newer python than available via nsew self["IPagent"] = "Dummy" elif args[i+1] == "corosync8": self["Stack"] = "corosync" self["DoStonith"]=1 self["stonith-type"] = "fence_rhevm" print "Obtaining RHEV-M credentials from the current environment" self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) self["IPBase"] = " fe80::1234:56:7890:3000" if os.path.isfile(dsh_file): self["nodes"] = [] f = open(dsh_file, 'r') for line in f: l = line.strip().rstrip() if not l.startswith('#'): self["nodes"].append(l) f.close() else: print("Unknown DSH group: %s" % args[i+1]) elif args[i] == "--syslog-facility" or args[i] == "--facility": skipthis=1 self["SyslogFacility"] = args[i+1] elif args[i] == "--seed": skipthis=1 self.SeedRandom(args[i+1]) elif args[i] == "--warn-inactive": self["warn-inactive"] = 1 elif args[i] == "--schema": skipthis=1 self["Schema"] = args[i+1] elif args[i] == "--ais": self["Stack"] = "openais" elif args[i] == "--at-boot" or args[i] == "--cluster-starts-at-boot": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": self["at-boot"] = 1 elif args[i+1] == "0" or args[i+1] == "no": self["at-boot"] = 0 else: self.usage(args[i+1]) elif args[i] == "--heartbeat" or args[i] == "--lha": self["Stack"] = "heartbeat" elif args[i] == "--hae": self["Stack"] = "openais" self["Schema"] = "hae" elif args[i] == "--stack": if args[i+1] == "fedora" or args[i+1] == "fedora-17" or args[i+1] == "fedora-18": self["Stack"] = "corosync" elif args[i+1] == "rhel-6": self["Stack"] = "cman" elif args[i+1] == "rhel-7": self["Stack"] = "corosync" else: self["Stack"] = args[i+1] skipthis=1 elif args[i] == "--once": self["scenario"] = "all-once" elif args[i] == "--boot": self["scenario"] = "boot" elif args[i] == "--valgrind-tests": self["valgrind-tests"] = 1 elif args[i] == "--no-loop-tests": self["loop-tests"] = 0 elif args[i] == "--loop-minutes": skipthis=1 try: self["loop-minutes"]=int(args[i+1]) except ValueError: self.usage(args[i]) elif args[i] == "--no-unsafe-tests": self["unsafe-tests"] = 0 elif args[i] == "--experimental-tests": self["experimental-tests"] = 1 elif args[i] == "--container-tests": self["container-tests"] = 1 elif args[i] == "--set": skipthis=1 (name, value) = args[i+1].split('=') self[name] = value print "Setting %s = %s" % (name, value) elif args[i] == "--": break else: try: NumIter=int(args[i]) self["iterations"] = NumIter except ValueError: self.usage(args[i]) def usage(arg, status=1): print "Illegal argument " + arg print "usage: " + sys.argv[0] +" [options] number-of-iterations" print "\nCommon options: " print "\t [--nodes 'node list'] list of cluster nodes separated by whitespace" print "\t [--group | -g 'name'] use the nodes listed in the named DSH group (~/.dsh/groups/$name)" print "\t [--limit-nodes max] only use the first 'max' cluster nodes supplied with --nodes" print "\t [--stack (v0|v1|cman|corosync|heartbeat|openais)] which cluster stack is installed" print "\t [--list-tests] list the valid tests" print "\t [--benchmark] add the timing information" print "\t " print "Options that CTS will usually auto-detect correctly: " print "\t [--logfile path] where should the test software look for logs from cluster nodes" print "\t [--syslog-facility name] which syslog facility should the test software log to" print "\t [--at-boot (1|0)] does the cluster software start at boot time" print "\t [--test-ip-base ip] offset for generated IP address resources" print "\t " print "Options for release testing: " print "\t [--populate-resources | -r] generate a sample configuration" print "\t [--choose name] run only the named test" print "\t [--stonith (1 | 0 | yes | no | rhcs | ssh)]" print "\t [--once] run all valid tests once" print "\t " print "Additional (less common) options: " print "\t [--clobber-cib | -c ] erase any existing configuration" print "\t [--outputfile path] optional location for the test software to write logs to" print "\t [--trunc] truncate logfile before starting" print "\t [--xmit-loss lost-rate(0.0-1.0)]" print "\t [--recv-loss lost-rate(0.0-1.0)]" print "\t [--standby (1 | 0 | yes | no)]" print "\t [--fencing (1 | 0 | yes | no | rhcs | lha | openstack )]" print "\t [--stonith-type type]" print "\t [--stonith-args name=value]" print "\t [--bsc]" print "\t [--no-loop-tests] dont run looping/time-based tests" print "\t [--no-unsafe-tests] dont run tests that are unsafe for use with ocfs2/drbd" print "\t [--valgrind-tests] include tests using valgrind" print "\t [--experimental-tests] include experimental tests" print "\t [--container-tests] include pacemaker_remote tests that run in lxc container resources" print "\t [--oprofile 'node list'] list of cluster nodes to run oprofile on]" print "\t [--qarsh] use the QARSH backdoor to access nodes instead of SSH" print "\t [--seed random_seed]" print "\t [--set option=value]" print "\t " print "\t Example: " print "\t python sys.argv[0] -g virt1 --stack cs -r --stonith ssh --schema pacemaker-1.0 500" sys.exit(status) class EnvFactory: instance = None def __init__(self): pass def getInstance(self, args=None): if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance