diff --git a/cts/CTSaudits.py.in b/cts/CTSaudits.py.in index 2ee9056c33..badd7e5d94 100755 --- a/cts/CTSaudits.py.in +++ b/cts/CTSaudits.py.in @@ -1,863 +1,863 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Audit module ''' __copyright__=''' Copyright (C) 2000, 2001,2005 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import time, os, popen2, string, re import CTS import os import popen2 class ClusterAudit: def __init__(self, cm): self.CM = cm def __call__(self): raise ValueError("Abstract Class member (__call__)") def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def name(self): raise ValueError("Abstract Class member (name)") AllAuditClasses = [ ] class LogAudit(ClusterAudit): def name(self): return "LogAudit" def __init__(self, cm): self.CM = cm def RestartClusterLogging(self): self.CM.log("WARN: Restarting logging on cluster nodes") for node in self.CM.Env["nodes"]: cmd=self.CM.Env["logrestartcmd"] if self.CM.rsh.noBlock(node, cmd) != 0: self.CM.log ("ERROR: Cannot restart logging on %s [%s failed]" % (node, cmd)) def TestLogging(self): patterns= [] prefix="Test message from " for node in self.CM.Env["nodes"]: patterns.append(prefix + node) watch = CTS.LogWatcher(self.CM.Env["LogFileName"], patterns, 30 + len(self.CM.Env["nodes"])) watch.setwatch() for node in self.CM.Env["nodes"]: cmd="logger -p %s.info %s%s" % (self.CM.Env["logfacility"], prefix, node) if self.CM.rsh.noBlock(node, cmd) != 0: self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) watch_result = watch.lookforall() if watch.unmatched: self.CM.log("ERROR: Remote logging is not working.") for regex in watch.unmatched: self.CM.log ("ERROR: Test message [%s] not found in logs." % (regex)) return 0 return 1 def __call__(self): max=3 attempt=0 while attempt <= max and self.TestLogging() == 0: attempt = attempt + 1 self.RestartClusterLogging() time.sleep(60*attempt) if attempt > max: self.CM.log("Cluster logging unrecoverable.") return 0 if attempt > 0: self.CM.log("NOTE: Cluster logging now working.") return 1 def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 return 1 class DiskAudit(ClusterAudit): def name(self): return "DiskspaceAudit" def __init__(self, cm): self.CM = cm def __call__(self): result=1 dfcmd="df -k /var/log | tail -1 | tr -s ' ' | cut -d' ' -f2" for node in self.CM.Env["nodes"]: dfout=self.CM.rsh.readaline(node, dfcmd) if not dfout: self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) else: try: idfout = int(dfout) except (ValueError, TypeError): self.CM.log("Warning: df output from %s was invalid [%s]" % (node, dfout)) else: if idfout == 0: self.CM.log("CRIT: Completely out of log disk space on %s" % node) result=None elif idfout <= 1000: self.CM.log("WARN: Low on log disk space (%d Mbytes) on %s" % (idfout, node)) return result def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 return 1 class AuditResource: def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.parent = fields[3] self.managed = fields[4] self.needs_quorum = fields[5] if self.parent == "NA": self.parent = None class AuditConstraint: def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): def name(self): return "PrimitiveAudit" def __init__(self, cm): self.CM = cm def doResourceAudit(self, resource): rc=1 active = self.CM.ResourceLocation(resource.id) if len(active) > 1: rc=0 self.CM.log("Resource %s is active multiple times: %s" % (resource.id, repr(active))) elif len(active) == 1: if self.CM.HasQuorum(None): self.CM.debug("Resource %s active on %s" % (resource.id, repr(active))) elif resource.needs_quorum == 1: rc=0 self.CM.log("Resource %s active without quorum: %s (%s)" % (resource.id, repr(active), resource.line)) - elif self.CM.HasQuorum(None) or not resource.needs_quorum: - self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" - % (resource.id, repr(self.inactive_nodes))) - - else: - self.CM.debug("Resource %s not served anywhere (Inactive nodes: %s)" - % (resource.id, repr(self.inactive_nodes))) + elif self.CM.Env["warn-inactive"] == 1: + if self.CM.HasQuorum(None) or not resource.needs_quorum: + self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self.inactive_nodes))) + else: + self.CM.debug("Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self.inactive_nodes))) if resource.managed == "0": self.CM.log("Resource %s not managed: faking success" % resource.id) rc = 1 return rc def setup(self): self.target = None self.resources = [] self.constraints = [] self.active_nodes = [] self.inactive_nodes = [] self.CM.debug("Do Audit %s"%self.name()) for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.active_nodes.append(node) else: self.inactive_nodes.append(node) for node in self.CM.Env["nodes"]: if self.target == None and self.CM.ShouldBeStatus[node] == self.CM["up"]: self.target = node if not self.target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.CM.debug("No nodes active - skipping %s" % self.name()) return 0 (rc, lines) = self.CM.rsh.remote_py(self.target, "os", "system", "@sbindir@/crm_resource -c") for line in lines: if re.search("^Resource", line): self.resources.append(AuditResource(self.CM, line)) elif re.search("^Constraint", line): self.constraints.append(AuditConstraint(self.CM, line)) else: self.CM.log("Unknown entry: %s" % line); return 1 def __call__(self): rc = 1 if not self.setup(): return 1 for resource in self.resources: if resource.type == "primitive": if self.doResourceAudit(resource) == 0: rc = 0 return rc def is_applicable(self): if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["ResCanStop"] == 0: return 1 if self.CM["Name"] == "crm-ais" and self.CM.Env["ResCanStop"] == 0: return 1 return 0 class GroupAudit(PrimitiveAudit): def name(self): return "GroupAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for group in self.resources: if group.type == "group": first_match = 1 group_location = None for child in self.resources: if child.parent == group.id: nodes = self.CM.ResourceLocation(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = 0 if len(nodes) > 1: rc = 0 self.CM.log("Child %s of %s is active more than once: %s" % (child.id, group.id, repr(nodes))) elif len(nodes) == 0: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.CM.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: rc = 0 self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.CM.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return rc class CloneAudit(PrimitiveAudit): def name(self): return "CloneAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for clone in self.resources: if clone.type == "clone": for child in self.resources: if child.parent == clone.id and child.type == "primitive": self.CM.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return rc class ColocationAudit(PrimitiveAudit): def name(self): return "ColocationAudit" def crm_location(self, resource): (rc, lines) = self.CM.rsh.remote_py( self.target, "os", "system", "@sbindir@/crm_resource -W -r %s -Q"%resource) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): rc = 1 if not self.setup(): return 1 for coloc in self.constraints: if coloc.type == "rsc_colocation": source = self.crm_location(coloc.rsc) target = self.crm_location(coloc.target) if len(source) == 0: self.CM.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: rc = 0 self.CM.log("Colocation audit (%s): %s running on %s (not in %s)" % (coloc.id, coloc.rsc, node, repr(target))) else: self.CM.debug("Colocation audit (%s): %s running on %s (in %s)" % (coloc.id, coloc.rsc, node, repr(target))) return rc class ResourceAudit(ClusterAudit): def name(self): return "ResourceAudit" def _doauditRsc(self, resource): ResourceNodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if resource.IsRunningOn(node): ResourceNodes.append(node) return ResourceNodes def _doaudit(self): '''Check to see if all resources are running in exactly one place in the cluster. We also verify that the members of a resource group are all running on the same node in the cluster, and we monitor that they are all running "properly". ''' Fatal = 0 result = [] # Thought: use self.CM.find_partitions() and make this audit # aware of partitions. Since in a split cluster one # partition may have quorum (and permission to run resources) # and the other not. Groups = self.CM.ResourceGroups() for group in Groups: GrpServedBy = None lastResource = None for resource in group: # # _doauditRsc returns the set of nodes serving # the given resource. This is normally a single node. # ResourceNodes = self._doauditRsc(resource) # Is the resource served without quorum present? if not self.CM.HasQuorum(None) and len(ResourceNodes) != 0 and resource.needs_quorum: result.append("Resource " + repr(resource) + " active without Quorum: " + repr(ResourceNodes)) # Is the resource served at all? elif len(ResourceNodes) == 0 and self.CM.HasQuorum(None): result.append("Resource " + repr(resource) + " not served anywhere.") # Is the resource served too many times? elif len(ResourceNodes) > 1: result.append("Resource " + repr(resource) + " served too many times: " + repr(ResourceNodes)) self.CM.log("Resource " + repr(resource) + " served too many times: " + repr(ResourceNodes)) Fatal = 1 elif GrpServedBy == None: GrpServedBy = ResourceNodes # Are all the members of the Rsc Grp served by the same node? elif GrpServedBy != ResourceNodes: result.append("Resource group resources" + repr(resource) + " running on different nodes: " + repr(ResourceNodes)+" vs "+repr(GrpServedBy) + "(otherRsc = " + repr(lastResource) + ")") self.CM.log("Resource group resources" + repr(resource) + " running on different nodes: " + repr(ResourceNodes)+" vs "+repr(GrpServedBy) + "(otherRsc = " + repr(lastResource) + ")") Fatal = 1 if self.CM.Env.has_key("SuppressMonitoring") and \ self.CM.Env["SuppressMonitoring"]: continue # Is the resource working correctly ? if not Fatal and len(ResourceNodes) == 1: beforearpchild = popen2.Popen3("date;/sbin/arp -n|cut -c1-15,26-50,75-" , None) beforearpchild.tochild.close() # /dev/null if not resource.IsWorkingCorrectly(ResourceNodes[0]): afterarpchild = popen2.Popen3("/sbin/arp -n|cut -c1-15,26-50,75-" , None) afterarpchild.tochild.close() # /dev/null result.append("Resource " + repr(resource) + " not operating properly." + " Resource is running on " + ResourceNodes[0]); Fatal = 1 self.CM.log("ARP table before failure ========"); for line in beforearpchild.fromchild.readlines(): self.CM.log(line) self.CM.log("ARP table after failure ========"); for line in afterarpchild.fromchild.readlines(): self.CM.log(line) self.CM.log("End of ARP tables ========"); try: beforearpchild.wait() afterarpchild.wait() except OSError: pass afterarpchild.fromchild.close() beforearpchild.fromchild.close() lastResource = resource if (Fatal): result.insert(0, "FATAL") # Kludgy. return result def __call__(self): # # Audit the resources. Since heartbeat doesn't really # know when resource acquisition is complete, we will # poll until things get stable. # # Having a resource duplicately implemented is a Fatal Error # with no tolerance granted. # audresult = self._doaudit() # # Probably the constant below should be a CM parameter. # Then it could be 0 for FailSafe. # Of course, it really depends on what resources # you have in the test suite, and how long it takes # for them to settle. # Recently, we've changed heartbeat so we know better when # resource acquisition is done. # audcount=5; while(audcount > 0): audresult = self._doaudit() if (len(audresult) <= 0 or audresult[0] == "FATAL"): audcount=0 else: audcount = audcount - 1 if (audcount > 0): time.sleep(1) if (len(audresult) > 0): self.CM.log("Fatal Audit error: " + repr(audresult)) return (len(audresult) == 0) def is_applicable(self): if self.CM["Name"] == "heartbeat": return 1 return 0 class CrmdStateAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 up_are_down = 0 down_are_up = 0 unstable_list = [] self.CM.debug("Do Audit %s"%self.name()) for node in self.CM.Env["nodes"]: should_be = self.CM.ShouldBeStatus[node] rc = self.CM.test_node_CM(node) if rc > 0: if should_be == self.CM["down"]: down_are_up = down_are_up + 1 if rc == 1: unstable_list.append(node) elif should_be == self.CM["up"]: up_are_down = up_are_down + 1 if len(unstable_list) > 0: passed = 0 self.CM.log("Cluster is not stable: %d (of %d): %s" %(len(unstable_list), self.CM.upcount(), repr(unstable_list))) if up_are_down > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be up were down." %(up_are_down, len(self.CM.Env["nodes"]))) if down_are_up > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be down were up." %(down_are_up, len(self.CM.Env["nodes"]))) return passed def name(self): return "CrmdStateAudit" def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 class CIBAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): self.CM.debug("Do Audit %s"%self.name()) passed = 1 ccm_partitions = self.CM.find_partitions() if len(ccm_partitions) == 0: self.CM.debug("\tNo partitions to audit") return 1 for partition in ccm_partitions: self.CM.debug("\tAuditing CIB consistency for: %s" %partition) partition_passed = 0 if self.audit_cib_contents(partition) == 0: passed = 0 return passed def audit_cib_contents(self, hostlist): passed = 1 first_host = None first_host_xml = "" partition_hosts = hostlist.split() for a_host in partition_hosts: if first_host == None: first_host = a_host first_host_xml = self.store_remote_cib(a_host) #self.CM.debug("Retrieved CIB: %s" % first_host_xml) else: a_host_xml = self.store_remote_cib(a_host) diff_cmd="@sbindir@/crm_diff -c -VV -f -N \'%s\' -O '%s'" % (a_host_xml, first_host_xml) infile, outfile, errfile = os.popen3(diff_cmd) diff_lines = outfile.readlines() for line in diff_lines: if not re.search("", line): passed = 0 self.CM.log("CibDiff[%s-%s]: %s" % (first_host, a_host, line)) else: self.CM.debug("CibDiff[%s-%s] Ignoring: %s" % (first_host, a_host, line)) diff_lines = errfile.readlines() for line in diff_lines: passed = 0 self.CM.log("CibDiff[%s-%s] ERROR: %s" % (first_host, a_host, line)) return passed def store_remote_cib(self, node): combined = "" first_line = 1 extra_debug = 0 #self.CM.debug("\tRetrieving CIB from: %s" % node) lines = self.CM.rsh.readlines(node, self.CM["CibQuery"]) if extra_debug: self.CM.debug("Start Cib[%s]" % node) for line in lines: combined = combined + line[:-1] if first_line: self.CM.debug("[Cib]" + line) first_line = 0 elif extra_debug: self.CM.debug("[Cib]" + line) if extra_debug: self.CM.debug("End Cib[%s]" % node) #self.CM.debug("Complete CIB: %s" % combined) return combined def name(self): return "CibAudit" def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 class PartitionAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} self.NodeEpoche={} self.NodeState={} self.NodeQuorum={} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): self.CM.debug("Do Audit %s"%self.name()) passed = 1 ccm_partitions = self.CM.find_partitions() if ccm_partitions == None or len(ccm_partitions) == 0: return 1 if len(ccm_partitions) > 1: self.CM.log("ERROR: %d cluster partitions detected:" %len(ccm_partitions)) passed = 0 for partition in ccm_partitions: self.CM.log("\t %s" %partition) for partition in ccm_partitions: partition_passed = 0 if self.audit_partition(partition) == 0: passed = 0 return passed def trim_string(self, avalue): if not avalue: return None if len(avalue) > 1: return avalue[:-1] def trim2int(self, avalue): if not avalue: return None if len(avalue) > 1: return int(avalue[:-1]) def audit_partition(self, partition): passed = 1 dc_found = [] dc_allowed_list = [] lowest_epoche = None node_list = partition.split() self.CM.debug("Auditing partition: %s" %(partition)) for node in node_list: if self.CM.ShouldBeStatus[node] != self.CM["up"]: self.CM.log("Warn: Node %s appeared out of nowhere" %(node)) self.CM.ShouldBeStatus[node] = self.CM["up"] # not in itself a reason to fail the audit (not what we're # checking for in this audit) self.NodeState[node] = self.CM.rsh.readaline( node, self.CM["StatusCmd"]%node) self.NodeEpoche[node] = self.CM.rsh.readaline( node, self.CM["EpocheCmd"]) self.NodeQuorum[node] = self.CM.rsh.readaline( node, self.CM["QuorumCmd"]) self.CM.debug("Node %s: %s - %s - %s." %(node, self.NodeState[node], self.NodeEpoche[node], self.NodeQuorum[node])) self.NodeState[node] = self.trim_string(self.NodeState[node]) self.NodeEpoche[node] = self.trim2int(self.NodeEpoche[node]) self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) if not self.NodeEpoche[node]: self.CM.log("Warn: Node %s dissappeared: cant determin epoche" %(node)) self.CM.ShouldBeStatus[node] = self.CM["down"] # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoche == None or self.NodeEpoche[node] < lowest_epoche: lowest_epoche = self.NodeEpoche[node] if not lowest_epoche: self.CM.log("Lowest epoche not determined in %s" % (partition)) passed = 0 for node in node_list: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if self.CM.is_node_dc(node, self.NodeState[node]): dc_found.append(node) if self.NodeEpoche[node] == lowest_epoche: self.CM.debug("%s: OK" % node) elif not self.NodeEpoche[node]: self.CM.debug("Check on %s ignored: no node epoche" % node) elif not lowest_epoche: self.CM.debug("Check on %s ignored: no lowest epoche" % node) else: self.CM.log("DC %s is not the oldest node (%d vs. %d)" %(node, self.NodeEpoche[node], lowest_epoche)) passed = 0 if len(dc_found) == 0: self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" %(len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self.CM.log("%d DCs (%s) found in cluster partition: %s" %(len(dc_found), str(dc_found), str(node_list))) passed = 0 if passed == 0: for node in node_list: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.log("epoche %s : %s" %(self.NodeEpoche[node], self.NodeState[node])) return passed def name(self): return "PartitionAudit" def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 if self.CM["Name"] == "crm-ais": return 1 return 0 AllAuditClasses.append(DiskAudit) AllAuditClasses.append(LogAudit) AllAuditClasses.append(CrmdStateAudit) AllAuditClasses.append(PartitionAudit) AllAuditClasses.append(ResourceAudit) AllAuditClasses.append(PrimitiveAudit) AllAuditClasses.append(GroupAudit) AllAuditClasses.append(CloneAudit) AllAuditClasses.append(ColocationAudit) AllAuditClasses.append(CIBAudit) def AuditList(cm): result = [] for auditclass in AllAuditClasses: result.append(auditclass(cm)) return result diff --git a/cts/CTSlab.py.in b/cts/CTSlab.py.in index 40bcb0c0aa..46a1d2b59c 100755 --- a/cts/CTSlab.py.in +++ b/cts/CTSlab.py.in @@ -1,837 +1,841 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Lab environment module ''' __copyright__=''' Copyright (C) 2001,2005 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. from UserDict import UserDict import sys, time, types, string, syslog, random, os, string, signal, traceback from CTS import ClusterManager from CM_hb import HeartbeatCM from CTStests import BSC_AddResource from socket import gethostbyname_ex tests = None cm = None old_handler = None DefaultFacility = "daemon" def sig_handler(signum, frame) : if cm != None: cm.log("Interrupted by signal %d"%signum) if signum == 10 and tests != None : tests.summarize() if signum == 15 : sys.exit(1) class ResetMechanism: def reset(self, node): raise ValueError("Abstract class member (reset)") class Stonith(ResetMechanism): def __init__(self, sttype="external/ssh", pName=None, pValue=None , path="@sbindir@/stonith"): self.pathname=path self.configName=pName self.configValue=pValue self.stonithtype=sttype def reset(self, node): if self.configValue == None : config=node else: config=self.configValue cmdstring = "%s -t '%s' -p '%s' '%s' 2>/dev/null" % (self.pathname , self.stonithtype, config, node) return (os.system(cmdstring) == 0) class Stonithd(ResetMechanism): def __init__(self, nodes, sttype = 'external/ssh'): self.sttype = sttype self.nodes = nodes self.query_cmd_pat = '@libdir@/heartbeat/stonithdtest/apitest 0 %s 20000 0' self.reset_cmd_pat = '@libdir@/heartbeat/stonithdtest/apitest 1 %s 20000 0' self.poweron_cmd_pat = '@libdir@/heartbeat/stonithdtest/apitest 2 %s 20000 0' self.poweroff_cmd_pat= '@libdir@/heartbeat/stonithdtest/apitest 3 %s 20000 0' self.lrmd_add_pat = '@libdir@/heartbeat/lrmadmin -A %s stonith ' + sttype + ' NULL hostlist=%s' self.lrmd_start_pat = '@libdir@/heartbeat/lrmadmin -E %s start 0 0 EVERYTIME' self.lrmd_stop_pat = '@libdir@/heartbeat/lrmadmin -E %s stop 0 0 EVERYTIME' self.lrmd_del_pat = '@libdir@/heartbeat/lrmadmin -D %s' self.rsc_id = 'my_stonithd_id' self.command = "@SSH@ -l root -n -x" self.command_noblock = "@SSH@ -f -l root -n -x" self.stonithd_started_nodes = [] self.fail_reason = '' def _remote_exec(self, node, cmnd): return (os.system("%s %s %s > /dev/null" % (self.command, node, cmnd)) == 0) def _remote_readlines(self, node, cmnd): f = os.popen("%s %s %s" % (self.command, node, cmnd)) return f.readlines() def _stonithd_started(self, node): return node in self.stonithd_started_nodes def _start_stonithd(self, node, hosts): hostlist = string.join(hosts, ',') lrmd_add_cmd = self.lrmd_add_pat % (self.rsc_id, hostlist) ret = self._remote_exec(node, lrmd_add_cmd) if not ret:return ret lrmd_start_cmd = self.lrmd_start_pat % self.rsc_id ret = self._remote_exec(node, lrmd_start_cmd) if not ret:return ret self.stonithd_started_nodes.append(node) return 1 def _stop_stonithd(self, node): lrmd_stop_cmd = self.lrmd_stop_pat % self.rsc_id ret = self._remote_exec(node, lrmd_stop_cmd) if not ret:return ret lrmd_del_cmd = self.lrmd_del_pat % self.rsc_id ret = self._remote_exec(node, lrmd_del_cmd) if not ret:return ret self.stonithd_started_nodes.remove(node) return 1 def _do_stonith(self, init_node, target_node, action): stonithd_started = self._stonithd_started(init_node) if not stonithd_started: ret = self._start_stonithd(init_node, [target_node]) if not ret: self.fail_reason = "failed to start stonithd on node %s" % init_node return ret command = "" if action == "RESET": command = self.reset_cmd_pat % target_node elif action == "POWEROFF": command = self.poweroff_cmd_pat % target_node elif action == "POWERON": command = self.poweron_cmd_pat % target_node else: self.fail_reason = "unknown opration type %s" % action return 0 lines = self._remote_readlines(init_node, command) result = "".join(lines) if not stonithd_started: self._stop_stonithd(init_node) index = result.find("result=0") if index == -1: self.fail_reason = "unexpected stonithd status: %s" % result return 0 return 1 # Should we randomly choose a node as init_node here if init_node not specified? def reset(self, init_node, target_node): return self._do_stonith(init_node, target_node, "RESET") def poweron(self, init_node, target_node): return self._do_stonith(init_node, target_node, "POWERON") def poweroff(self, init_node, target_node): return self._do_stonith(init_node, target_node, "POWEROFF") class Logger: TimeFormat = "%b %d %H:%M:%S\t" def __call__(self, lines): raise ValueError("Abstract class member (__call__)") def write(self, line): return self(line.rstrip()) def writelines(self, lines): for s in lines: self.write(s) return 1 def flush(self): return 1 def isatty(self): return None class SysLog(Logger): # http://docs.python.org/lib/module-syslog.html defaultsource="CTS" map = { "kernel": syslog.LOG_KERN, "user": syslog.LOG_USER, "mail": syslog.LOG_MAIL, "daemon": syslog.LOG_DAEMON, "auth": syslog.LOG_AUTH, "lpr": syslog.LOG_LPR, "news": syslog.LOG_NEWS, "uucp": syslog.LOG_UUCP, "cron": syslog.LOG_CRON, "local0": syslog.LOG_LOCAL0, "local1": syslog.LOG_LOCAL1, "local2": syslog.LOG_LOCAL2, "local3": syslog.LOG_LOCAL3, "local4": syslog.LOG_LOCAL4, "local5": syslog.LOG_LOCAL5, "local6": syslog.LOG_LOCAL6, "local7": syslog.LOG_LOCAL7, } def __init__(self, labinfo): if labinfo.has_key("syslogsource"): self.source=labinfo["syslogsource"] else: self.source=SysLog.defaultsource if labinfo.has_key("SyslogFacility"): self.facility=labinfo["SyslogFacility"] else: self.facility=DefaultFacility if SysLog.map.has_key(self.facility): self.facility=SysLog.map[self.facility] syslog.openlog(self.source, 0, self.facility) def setfacility(self, facility): self.facility = facility if SysLog.map.has_key(self.facility): self.facility=SysLog.map[self.facility] syslog.closelog() syslog.openlog(self.source, 0, self.facility) def __call__(self, lines): if isinstance(lines, types.StringType): syslog.syslog(lines) else: for line in lines: syslog.syslog(line) def name(self): return "Syslog" class StdErrLog(Logger): def __init__(self, labinfo): pass def __call__(self, lines): t = time.strftime(Logger.TimeFormat, time.localtime(time.time())) if isinstance(lines, types.StringType): sys.__stderr__.writelines([t, lines, "\n"]) else: for line in lines: sys.__stderr__.writelines([t, line, "\n"]) sys.__stderr__.flush() def name(self): return "StdErrLog" class FileLog(Logger): def __init__(self, labinfo, filename=None): if filename == None: filename=labinfo["LogFileName"] self.logfile=filename import os self.hostname = os.uname()[1]+" " self.source = "CTS: " def __call__(self, lines): fd = open(self.logfile, "a") t = time.strftime(Logger.TimeFormat, time.localtime(time.time())) if isinstance(lines, types.StringType): fd.writelines([t, self.hostname, self.source, lines, "\n"]) else: for line in lines: fd.writelines([t, self.hostname, self.source, line, "\n"]) fd.close() def name(self): return "FileLog" class CtsLab(UserDict): '''This class defines the Lab Environment for the Cluster Test System. It defines those things which are expected to change from test environment to test environment for the same cluster manager. It is where you define the set of nodes that are in your test lab what kind of reset mechanism you use, etc. This class is derived from a UserDict because we hold many different parameters of different kinds, and this provides provide a uniform and extensible interface useful for any kind of communication between the user/administrator/tester and CTS. At this point in time, it is the intent of this class to model static configuration and/or environmental data about the environment which doesn't change as the tests proceed. Well-known names (keys) are an important concept in this class. The HasMinimalKeys member function knows the minimal set of well-known names for the class. The following names are standard (well-known) at this time: nodes An array of the nodes in the cluster reset A ResetMechanism object logger An array of objects that log strings... CMclass The type of ClusterManager we are running (This is a class object, not a class instance) RandSeed Random seed. It is a triple of bytes. (optional) HAdir Base directory for HA installation The CTS code ignores names it doesn't know about/need. The individual tests have access to this information, and it is perfectly acceptable to provide hints, tweaks, fine-tuning directions or other information to the tests through this mechanism. ''' def __init__(self, nodes): self.data = {} self["nodes"] = nodes self.MinimalKeys=["nodes", "reset", "logger", "CMclass", "HAdir"] def HasMinimalKeys(self): 'Return TRUE if our object has the minimal set of keys/values in it' result = 1 for key in self.MinimalKeys: if not self.has_key(key): result = None return result def SupplyDefaults(self): if not self.has_key("logger"): self["logger"] = (SysLog(self), StdErrLog(self)) if not self.has_key("reset"): self["reset"] = Stonith() if not self.has_key("CMclass"): self["CMclass"] = HeartbeatCM if not self.has_key("LogFileName"): self["LogFileName"] = "@HA_VARLOGDIR@/ha-log" if not self.has_key("logrestartcmd"): self["logrestartcmd"] = "@INITDIR@/syslog restart" if not self.has_key("logfacility"): LogFacility = DefaultFacility # # Now set up our random number generator... # self.RandomGen = random.Random() # Get a random seed for the random number generator. if self.has_key("RandSeed"): randseed = self["RandSeed"] self.log("Random seed is: " + str(randseed)) self.RandomGen.seed(str(randseed)) else: randseed = int(time.time()) self.log("Random seed is: " + str(randseed)) self.RandomGen.seed(str(randseed)) def log(self, args): "Log using each of the supplied logging methods" for logfcn in self._logfunctions: logfcn(string.strip(args)) def debug(self, args): "Log using each of the supplied logging methods" for logfcn in self._logfunctions: if logfcn.name() != "StdErrLog": logfcn("debug: %s" % string.strip(args)) def __setitem__(self, key, value): '''Since this function gets called whenever we modify the dictionary (object), we can (and do) validate those keys that we know how to validate. For the most part, we know how to validate the "MinimalKeys" elements. ''' # # List of nodes in the system # if key == "nodes": self.Nodes = {} for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: self.Nodes[node] = gethostbyname_ex(node) except: print node+" not found in DNS... aborting" raise # # Reset Mechanism # elif key == "reset": if not issubclass(value.__class__, ResetMechanism): raise ValueError("'reset' Value must be a subclass" " of ResetMechanism") # # List of Logging Mechanism(s) # elif key == "logger": if len(value) < 1: raise ValueError("Must have at least one logging mechanism") for logger in value: if not callable(logger): raise ValueError("'logger' elements must be callable") self._logfunctions = value # # Cluster Manager Class # elif key == "CMclass": if not issubclass(value, ClusterManager): raise ValueError("'CMclass' must be a subclass of" " ClusterManager") # # Initial Random seed... # #elif key == "RandSeed": # if len(value) != 3: # raise ValueError("'Randseed' must be a 3-element list/tuple") # for elem in value: # if not isinstance(elem, types.IntType): # raise ValueError("'Randseed' list must all be ints") self.data[key] = value def IsValidNode(self, node): 'Return TRUE if the given node is valid' return self.Nodes.has_key(node) def __CheckNode(self, node): "Raise a ValueError if the given node isn't valid" if not self.IsValidNode(node): raise ValueError("Invalid node [%s] in CheckNode" % node) def RandomNode(self): '''Choose a random node from the cluster''' return self.RandomGen.choice(self["nodes"]) def ResetNode(self, node): "Reset a node, (normally) using a hardware mechanism" self.__CheckNode(node) return self["reset"].reset(node) def ResetNode2(self, init_node, target_node, reasons): self.__CheckNode(target_node) stonithd = Stonithd(self["nodes"]) ret = stonithd.reset(init_node, target_node) if not ret: reasons.append(stonithd.fail_reason) return ret def usage(arg): print "Illegal argument " + arg print "usage: " + sys.argv[0] \ + " --directory config-directory" \ + " -D config-directory" \ + " --logfile system-logfile-name" \ + " --trunc (truncate logfile before starting)" \ + " -L system-logfile-name" \ + " --limit-nodes maxnumnodes" \ + " --xmit-loss lost-rate(0.0-1.0)" \ + " --recv-loss lost-rate(0.0-1.0)" \ + " --suppressmonitoring" \ + " --syslog-facility syslog-facility" \ + " --facility syslog-facility" \ + " --choose testcase-name" \ + " --test-ip-base ip" \ + " --oprofile \"whitespace separated list of nodes to oprofile\"" \ + " (-2 |"\ + " -v2 |"\ + " --crm |"\ + " --classic)"\ + " (--populate-resources | -r)" \ + " --resource-can-stop" \ + " --stonith (1 | 0 | yes | no)" \ + " --stonith-type type" \ + " --stonith-args name=value" \ + " --standby (1 | 0 | yes | no)" \ + " --fencing (1 | 0 | yes | no)" \ + " --suppress_cib_writes (1 | 0 | yes | no)" \ + " -lstests" \ + " --seed" \ + " [number-of-iterations]" sys.exit(1) # # A little test code... # if __name__ == '__main__': from CTSaudits import AuditList from CTStests import TestList,RandomTests from CTS import Scenario, InitClusterManager, PingFest, PacketLoss, BasicSanityCheck import CM_hb HAdir = "@sysconfdir@/ha.d" LogFile = "@HA_VARLOGDIR@/ha-log-"+DefaultFacility DoStonith = 1 DoStandby = 1 DoFencing = 1 NumIter = 500 SuppressMonitoring = None Version = 1 CIBfilename = None CIBResource = 0 ClobberCIB = 0 LimitNodes = 0 TestCase = None LogFacility = None TruncateLog = 0 ResCanStop = 0 XmitLoss = "0.0" RecvLoss = "0.0" IPBase = "127.0.0.10" SuppressCib = 1 DoBSC = 0 ListTests = 0 HaveSeed = 0 oprofile = None + warn_inactive = 1 StonithType = "ssh" StonithParams = None StonithParams = "hostlist=dynamic".split('=') # # The values of the rest of the parameters are now properly derived from # the configuration files. # # Stonith is configurable because it's slow, I have a few machines which # don't reboot very reliably, and it can mild damage to your machine if # you're using a real power switch. # # Standby is configurable because the test is very heartbeat specific # and I haven't written the code to set it properly yet. Patches are # being accepted... # Set the signal handler signal.signal(15, sig_handler) signal.signal(10, sig_handler) # Process arguments... skipthis=None args=sys.argv[1:] for i in range(0, len(args)): if skipthis: skipthis=None continue elif args[i] == "-D" or args[i] == "--directory": skipthis=1 HAdir = args[i+1] elif args[i] == "-l" or args[i] == "--limit-nodes": skipthis=1 LimitNodes = int(args[i+1]) elif args[i] == "-r" or args[i] == "--populate-resources": CIBResource = 1 elif args[i] == "-L" or args[i] == "--logfile": skipthis=1 LogFile = args[i+1] elif args[i] == "--test-ip-base": skipthis=1 IPBase = args[i+1] elif args[i] == "--oprofile": skipthis=1 oprofile = args[i+1].split(' ') elif args[i] == "--trunc": TruncateLog=1 elif args[i] == "-v2": Version=2 elif args[i] == "-lstests": ListTests=1 elif args[i] == "--stonith": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": DoStonith=1 elif args[i+1] == "0" or args[i+1] == "no": DoStonith=0 else: usage(args[i+1]) elif args[i] == "--stonith-type": StonithType = args[i+1] skipthis=1 elif args[i] == "--stonith-args": StonithParams = args[i+1].split('=') skipthis=1 elif args[i] == "--suppress-cib-writes": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": SuppressCib=1 elif args[i+1] == "0" or args[i+1] == "no": SuppressCib=0 else: usage(args[i+1]) elif args[i] == "--bsc": DoBSC=1 elif args[i] == "--standby": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": DoStandby=1 elif args[i+1] == "0" or args[i+1] == "no": DoStandby=0 else: usage(args[i+1]) elif args[i] == "--fencing": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": DoFencing=1 elif args[i+1] == "0" or args[i+1] == "no": DoFencing=0 else: usage(args[i+1]) elif args[i] == "--suppressmonitoring": SuppressMonitoring = 1 elif args[i] == "--resource-can-stop": ResCanStop = 1 elif args[i] == "-3" or args[i] == "--ais": Version = 3 elif args[i] == "-2" or args[i] == "--crm": Version = 2 elif args[i] == "-1" or args[i] == "--classic": Version = 1 elif args[i] == "--clobber-cib" or args[i] == "-c": ClobberCIB = 1 elif args[i] == "--cib-filename": skipthis=1 CIBfilename = args[i+1] elif args[i] == "--xmit-loss": try: float(args[i+1]) except ValueError: print ("--xmit-loss parameter should be float") usage(args[i+1]) skipthis=1 XmitLoss = args[i+1] elif args[i] == "--recv-loss": try: float(args[i+1]) except ValueError: print ("--recv-loss parameter should be float") usage(args[i+1]) skipthis=1 RecvLoss = args[i+1] elif args[i] == "--choose": skipthis=1 TestCase = args[i+1] elif args[i] == "--syslog-facility" or args[i] == "--facility": skipthis=1 LogFacility = args[i+1] elif args[i] == "--seed": skipthis=1 Seed=args[i+1] HaveSeed = 1 + elif args[i] == "--no-inactive": + warn_inactive = 0 else: NumIter=int(args[i]) if not oprofile: oprofile = [] # # This reading of HBconfig here is ugly, and I suppose ought to # be done by the Cluster manager. This would probably mean moving the # list of cluster nodes into the ClusterManager class. A good thought # for our Copious Spare Time in the future... # config = CM_hb.HBConfig(HAdir) node_list = config.Parameters["node"] if DoBSC: NumIter = 2 Version = 2 while len(node_list) > 1: node_list.pop(len(node_list)-1) if LogFacility == None: if config.Parameters.has_key("logfacility"): LogFacility = config.Parameters["logfacility"][0] else: LogFacility = DefaultFacility if LimitNodes > 0: if len(node_list) > LimitNodes: print("Limiting the number of nodes configured=%d (max=%d)" %(len(node_list), LimitNodes)) while len(node_list) > LimitNodes: node_list.pop(len(node_list)-1) if StonithParams[0] == "hostlist": StonithParams[1] = string.join(node_list, " ") # alt_list = [] # for node in node_list: # alt_list.append(string.lower(node)) # node_list = alt_list Environment = CtsLab(node_list) Environment["HAdir"] = HAdir Environment["ClobberCIB"] = ClobberCIB Environment["CIBfilename"] = CIBfilename Environment["CIBResource"] = CIBResource Environment["LogFileName"] = LogFile Environment["DoStonith"] = DoStonith Environment["SyslogFacility"] = LogFacility Environment["DoStandby"] = DoStandby Environment["DoFencing"] = DoFencing Environment["ResCanStop"] = ResCanStop Environment["SuppressMonitoring"] = SuppressMonitoring Environment["XmitLoss"] = XmitLoss Environment["RecvLoss"] = RecvLoss Environment["IPBase"] = IPBase Environment["SuppressCib"] = SuppressCib Environment["DoBSC"] = 0 Environment["use_logd"] = 0 Environment["logfacility"] = LogFacility Environment["oprofile"] = oprofile + Environment["warn-inactive"] = warn_inactive if config.Parameters.has_key("use_logd"): Environment["use_logd"] = 1 if Version == 2: from CM_LinuxHAv2 import LinuxHAv2 Environment['CMclass']=LinuxHAv2 if Version == 3: from CM_ais import crm_ais Environment['CMclass'] = crm_ais Environment["DoStonith"] = 0 Environment["DoFencing"] = 0 Environment["use_logd"] = 0 if HaveSeed: Environment["RandSeed"] = Seed Environment["reset"] = Stonith(sttype=StonithType, pName=StonithParams[0], pValue=StonithParams[1]) if DoBSC: Environment["DoBSC"] = 1 Environment["ClobberCIB"] = 1 Environment["CIBResource"] = 0 Environment["logger"] = (FileLog(Environment), StdErrLog(Environment)) scenario = Scenario([ BasicSanityCheck(Environment) ]) else: scenario = Scenario( [ InitClusterManager(Environment), PacketLoss(Environment)]) Environment.SupplyDefaults() # Your basic start up the world type of test scenario... #scenario = Scenario( #[ InitClusterManager(Environment) #, PingFest(Environment)]) # Create the Cluster Manager object cm = Environment['CMclass'](Environment) if TruncateLog: cm.log("Truncating %s" % LogFile) lf = open(LogFile, "w"); if lf != None: lf.truncate(0) lf.close() cm.log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TESTS ") cm.log("Version: %d" % Version) cm.log("HA configuration directory: " + Environment["HAdir"]) cm.log("System log files: " + Environment["LogFileName"]) cm.log("Enable Stonith: %d" % Environment["DoStonith"]) cm.log("Enable Fencing: %d" % Environment["DoFencing"]) cm.log("Enable Standby: %d" % Environment["DoStandby"]) cm.log("Enable Resources: %d" % Environment["CIBResource"]) if Environment.has_key("SuppressMonitoring") \ and Environment["SuppressMonitoring"]: cm.log("Resource Monitoring is disabled") cm.ns.WaitForAllNodesToComeUp(config.Parameters["node"]) cm.log("Cluster nodes: ") for node in config.Parameters["node"]: (rc, lines) = cm.rsh.remote_py(node, "os", "system", "@sbindir@/crm_uuid") if not lines: cm.log(" * %s: __undefined_uuid__" % node) else: out=lines[0] out = out[:-1] cm.log(" * %s: %s" % (node, out)) Audits = AuditList(cm) Tests = [] if Environment["DoBSC"]: test = BSC_AddResource(cm) Tests.append(test) elif TestCase != None: for test in TestList(cm): if test.name == TestCase: Tests.append(test) if Tests == []: usage("--choose: No applicable/valid tests chosen") else: Tests = TestList(cm) if ListTests == 1 : cm.log("Total %d tests"%len(Tests)) for test in Tests : cm.log(str(test.name)); sys.exit(0) tests = RandomTests(scenario, cm, Tests, Audits) Environment.RandomTests = tests try : overall, detailed = tests.run(NumIter) except : cm.Env.log("Exception by %s" % sys.exc_info()[0]) for logmethod in Environment["logger"]: traceback.print_exc(50, logmethod) tests.summarize() if tests.Stats["failure"] > 0: sys.exit(tests.Stats["failure"]) elif tests.Stats["success"] != NumIter: cm.Env.log("No failure count but success != requested iterations") sys.exit(1)