diff --git a/cts/CM_LinuxHAv2.py.in b/cts/CM_LinuxHAv2.py.in index a753bcd17a..c60ccc8fd1 100755 --- a/cts/CM_LinuxHAv2.py.in +++ b/cts/CM_LinuxHAv2.py.in @@ -1,703 +1,703 @@ #!@PYTHON@ '''CTS: Cluster Testing System: LinuxHA v2 dependent modules... ''' __copyright__=''' Author: Huang Zhen Copyright (C) 2004 International Business Machines Additional Audits, Revised Start action, Default Configuration: Copyright (C) 2004 Andrew Beekhof ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import os,sys,CTS,CTSaudits,CTStests, warnings from CTS import * from CM_hb import HeartbeatCM from CTSaudits import ClusterAudit from CTStests import * from CIB import * try: from xml.dom.minidom import * except ImportError: sys.__stdout__.write("Python module xml.dom.minidom not found\n") sys.__stdout__.write("Please install python-xml or similar before continuing\n") sys.__stdout__.flush() sys.exit(1) ####################################################################### # # LinuxHA v2 dependent modules # ####################################################################### class LinuxHAv2(HeartbeatCM): ''' The linux-ha version 2 cluster manager class. It implements the things we need to talk to and manipulate linux-ha version 2 clusters ''' def __init__(self, Environment, randseed=None): HeartbeatCM.__init__(self, Environment, randseed=randseed) self.clear_cache = 0 self.cib_installed = 0 self.config = None self.cluster_monitor = 0 self.use_short_names = 1 self.update({ "Name" : "linux-ha-v2", "DeadTime" : 300, "StartTime" : 300, # Max time to start up "StableTime" : 30, "StartCmd" : "@INITDIR@/heartbeat@INIT_EXT@ start > /dev/null 2>&1", "StopCmd" : "@INITDIR@/heartbeat@INIT_EXT@ stop > /dev/null 2>&1", "ElectionCmd" : "@sbindir@/crmadmin -E %s", "StatusCmd" : "@sbindir@/crmadmin -S %s 2>/dev/null", "EpocheCmd" : "@sbindir@/ccm_tool -e", "QuorumCmd" : "@sbindir@/ccm_tool -q", "CibQuery" : "@sbindir@/cibadmin -Ql", "ParitionCmd" : "@sbindir@/ccm_tool -p", "IsRscRunning" : "@libdir@/heartbeat/lrmadmin -E %s monitor 0 0 EVERYTIME 2>/dev/null|grep return", "ExecuteRscOp" : "@libdir@/heartbeat/lrmadmin -n %s -E %s %s 0 %d EVERYTIME 2>/dev/null", "CIBfile" : "%s:@HA_VARLIBDIR@/heartbeat/crm/cib.xml", "TmpDir" : "/tmp", "BreakCommCmd2" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm break-communication %s>/dev/null 2>&1", "IsIPAddrRscRunning" : "", "StandbyCmd" : "@sbindir@/crm_standby -U %s -v %s 2>/dev/null", "UUIDQueryCmd" : "@sbindir@/crmadmin -N", "StandbyQueryCmd" : "@sbindir@/crm_standby -GQ -U %s 2>/dev/null", # Patterns to look for in the log files for various occasions... "Pat:DC_IDLE" : "crmd.*State transition.*-> S_IDLE", # This wont work if we have multiple partitions # Use: "Pat:They_started" : "%s crmd:.*State transition.*-> S_NOT_DC", "Pat:They_started" : "Updating node state to member for %s", "Pat:We_started" : "%s crmd:.* State transition.*-> S_IDLE", "Pat:We_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete", "Pat:Logd_stopped" : "%s logd:.*Exiting write process", "Pat:They_stopped" : "%s crmd:.*LOST:.* %s ", "Pat:All_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete", "Pat:They_dead" : "node %s.*: is dead", "Pat:TransitionComplete" : "Transition status: Complete: complete", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"ERROR:", r"CRIT:", r"Shutting down\.", r"Forcing shutdown\.", r"Timer I_TERMINATE just popped", r"input=I_ERROR", r"input=I_FAIL", r"input=I_INTEGRATED cause=C_TIMER_POPPED", r"input=I_FINALIZED cause=C_TIMER_POPPED", r"input=I_ERROR", r", exiting\.", r"WARN.*Ignoring HA message.*vote.*not in our membership list", r"pengine.*Attempting recovery of resource", r"tengine.*is taking more than 2x its timeout", r"Confirm not received from", r"Welcome reply not received from", r"Attempting to schedule .* after a stop", r"Resource .* was active at shutdown", r"duplicate entries for call_id", r"Search terminated:", r"No need to invoke the TE", r":global_timer_callback", r"Faking parameter digest creation", r"Parameters to .* action changed:", r"apply_xml_diff: Diff application failed!", r"Parameters to .* changed", ), }) del self["Standby"] if self.Env["DoBSC"]: del self["Pat:They_stopped"] del self["Pat:Logd_stopped"] self.Env["use_logd"] = 0 self.check_transitions = 0 self.check_elections = 0 self.CIBsync = {} self.default_cts_cib=CIB(self).cib() self.debug(self.default_cts_cib) def errorstoignore(self): # At some point implement a more elegant solution that # also produces a report at the end '''Return list of errors which are known and very noisey should be ignored''' if 1: return [ "crmadmin:", "ERROR: Message hist queue is filling up" ] return [] def install_config(self, node): if not self.ns.WaitForNodeToComeUp(node): self.log("Node %s is not up." % node) return None if not self.CIBsync.has_key(node) and self.Env["ClobberCIB"] == 1: self.CIBsync[node] = 1 self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml") self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml.sig") # Only install the CIB on the first node, all the other ones will pick it up from there if self.cib_installed == 1: return None self.cib_installed = 1 if self.Env["CIBfilename"] == None: self.debug("Installing Generated CIB on node %s" %(node)) warnings.filterwarnings("ignore") cib_file=os.tmpnam() warnings.resetwarnings() os.system("rm -f "+cib_file) self.debug("Creating new CIB for " + node + " in: " + cib_file) os.system("echo \'" + self.default_cts_cib + "\' > " + cib_file) if 0!=self.rsh.echo_cp(None, cib_file, node, "@HA_VARLIBDIR@/heartbeat/crm/cib.xml"): raise ValueError("Can not create CIB on %s "%node) os.system("rm -f "+cib_file) else: self.debug("Installing CIB (%s) on node %s" %(self.Env["CIBfilename"], node)) if 0!=self.rsh.cp(self.Env["CIBfilename"], "root@" + (self["CIBfile"]%node)): raise ValueError("Can not scp file to %s "%node) self.rsh.remote_py(node, "os", "system", "chown @HA_CCMUSER@ @HA_VARLIBDIR@/heartbeat/crm/cib.xml") def prepare(self): '''Finish the Initialization process. Prepare to test...''' for node in self.Env["nodes"]: self.ShouldBeStatus[node] = "" self.StataCM(node) def test_node_CM(self, node): '''Report the status of the cluster manager on a given node''' watchpats = [ ] watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") watchpats.append(self["Pat:They_started"]%node) idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats) idle_watch.setwatch() out=self.rsh.readaline(node, self["StatusCmd"]%node) ret= (string.find(out, 'ok') != -1) self.debug("Node %s status: %s" %(node, out)) if not ret: if self.ShouldBeStatus[node] == self["up"]: self.log( "Node status for %s is %s but we think it should be %s" %(node, self["down"], self.ShouldBeStatus[node])) self.ShouldBeStatus[node]=self["down"] return 0 if self.ShouldBeStatus[node] == self["down"]: self.log( "Node status for %s is %s but we think it should be %s: %s" %(node, self["up"], self.ShouldBeStatus[node], out)) self.ShouldBeStatus[node]=self["up"] # check the output first - because syslog-ng looses messages if string.find(out, 'S_NOT_DC') != -1: # Up and stable return 2 if string.find(out, 'S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" %(node, out)) return 1 # Up and stable return 2 # Is the node up or is the node down def StataCM(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) > 0: return 1 return None # Being up and being stable is not the same question... def node_stable(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) == 2: return 1 self.log("Warn: Node %s not stable" %(node)) return None def cluster_stable(self, timeout=None): watchpats = [ ] watchpats.append("Current ping state: S_IDLE") watchpats.append(self["Pat:DC_IDLE"]) if timeout == None: timeout = self["DeadTime"] idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats, timeout) idle_watch.setwatch() any_up = 0 for node in self.Env["nodes"]: # have each node dump its current state if self.ShouldBeStatus[node] == self["up"]: self.rsh.readaline(node, (self["StatusCmd"] %node) ) any_up = 1 if any_up == 0: self.debug("Cluster is inactive") return 1 ret = idle_watch.look() if ret: self.debug(ret) return 1 self.log("Warn: Cluster Master not IDLE after %ds" % timeout) return None def is_node_dc(self, node, status_line=None): rc = 0 if not status_line: status_line = self.rsh.readaline(node, self["StatusCmd"]%node) if not status_line: rc = 0 elif string.find(status_line, 'S_IDLE') != -1: rc = 1 elif string.find(status_line, 'S_INTEGRATION') != -1: rc = 1 elif string.find(status_line, 'S_FINALIZE_JOIN') != -1: rc = 1 elif string.find(status_line, 'S_POLICY_ENGINE') != -1: rc = 1 elif string.find(status_line, 'S_TRANSITION_ENGINE') != -1: rc = 1 if rc == 1: self.debug("%s _is_ the DC" % node) return rc def active_resources(self, node): (rc, output) = self.rsh.remote_py( node, "os", "system", """@sbindir@/crm_mon -1 | grep "Started %s" """ % node) resources = [] for line in output: fields = line.split() resources.append(fields[0]) return resources def ResourceOp(self, resource, op, node, interval=0, app="lrmadmin"): ''' Execute an operation on a resource ''' self.rsh.readaline(node, self["ExecuteRscOp"] % (app, resource, op, interval)) return self.rsh.lastrc def ResourceLocation(self, rid): ResourceNodes = [] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: if self.ResourceOp(rid, "monitor", node) == 0: ResourceNodes.append(node) return ResourceNodes def isolate_node(self, node, allowlist): '''isolate the communication between the nodes''' rc = self.rsh(node, self["BreakCommCmd2"]%allowlist) if rc == 0: return 1 else: self.log("Could not break the communication from node: %s",node) return None def Configuration(self): if self.config: return self.config.getElementsByTagName('configuration')[0] warnings.filterwarnings("ignore") cib_file=os.tmpnam() warnings.resetwarnings() os.system("rm -f "+cib_file) if self.Env["ClobberCIB"] == 1: if self.Env["CIBfilename"] == None: self.debug("Creating new CIB in: " + cib_file) os.system("echo \'"+ self.default_cts_cib +"\' > "+ cib_file) else: os.system("cp "+self.Env["CIBfilename"]+" "+cib_file) else: if 0 != self.rsh.echo_cp( self.Env["nodes"][0], "@HA_VARLIBDIR@/heartbeat/crm/cib.xml", None, cib_file): raise ValueError("Can not copy file to %s, maybe permission denied"%cib_file) self.config = parse(cib_file) os.remove(cib_file) return self.config.getElementsByTagName('configuration')[0] def Resources(self): ResourceList = [] #read resources in cib configuration = self.Configuration() resources = configuration.getElementsByTagName('resources')[0] rscs = configuration.getElementsByTagName('primitive') incs = configuration.getElementsByTagName('clone') groups = configuration.getElementsByTagName('group') for rsc in rscs: if rsc in resources.childNodes: ResourceList.append(HAResource(self,rsc)) for grp in groups: for rsc in rscs: if rsc in grp.childNodes: if self.use_short_names: resource = HAResource(self,rsc) else: resource = HAResource(self,rsc,grp.getAttribute('id')) ResourceList.append(resource) for inc in incs: max = 0 inc_name = inc.getAttribute("id") instance_attributes = inc.getElementsByTagName('instance_attributes')[0] attributes = instance_attributes.getElementsByTagName('attributes')[0] nvpairs = attributes.getElementsByTagName('nvpair') for nvpair in nvpairs: if nvpair.getAttribute("name") == "clone_max": max = int(nvpair.getAttribute("value")) inc_rsc = inc.getElementsByTagName('primitive')[0] for i in range(0,max): rsc = HAResource(self,inc_rsc) rsc.inc_no = i rsc.inc_name = inc_name rsc.inc_max = max if self.use_short_names: rsc.rid = rsc.rid + ":%d"%i else: rsc.rid = inc_name+":"+rsc.rid + ":%d"%i rsc.Instance = rsc.rid ResourceList.append(rsc) return ResourceList def ResourceGroups(self): GroupList = [] #read resources in cib configuration = self.Configuration() groups = configuration.getElementsByTagName('group') rscs = configuration.getElementsByTagName('primitive') for grp in groups: group = [] GroupList.append(group) for rsc in rscs: if rsc in grp.childNodes: if self.use_short_names: resource = HAResource(self,rsc) else: resource = HAResource(self,rsc,grp.getAttribute('id')) group.append(resource) return GroupList def Dependencies(self): DependencyList = [] #read dependency in cib configuration=self.Configuration() constraints=configuration.getElementsByTagName('constraints')[0] rsc_to_rscs=configuration.getElementsByTagName('rsc_to_rsc') for node in rsc_to_rscs: dependency = {} dependency["id"]=node.getAttribute('id') dependency["from"]=node.getAttribute('from') dependency["to"]=node.getAttribute('to') dependency["type"]=node.getAttribute('type') dependency["strength"]=node.getAttribute('strength') DependencyList.append(dependency) return DependencyList def find_partitions(self): ccm_partitions = [] for node in self.Env["nodes"]: self.debug("Retrieving partition details for %s" %node) if self.ShouldBeStatus[node] == self["up"]: partition = self.rsh.readaline(node, self["ParitionCmd"]) if not partition: self.log("no partition details for %s" %node) elif len(partition) > 2: partition = partition[:-1] found=0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug("Adding partition from %s: %s" %(node, partition)) ccm_partitions.append(partition) else: self.log("bad partition details for %s" %node) return ccm_partitions def HasQuorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] for node in node_list: if self.ShouldBeStatus[node] == self["up"]: quorum = self.rsh.readaline(node, self["QuorumCmd"]) if string.find(quorum, "1") != -1: return 1 elif string.find(quorum, "0") != -1: return 0 else: self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum) return 0 def Components(self): complist = [] common_ignore = [ "Pending action:", "ERROR: crm_log_message_adv:", "ERROR: MSG: No message to dump", "pending LRM operations at shutdown", "Lost connection to the CIB service", "Connection to the CIB terminated...", "Sending message to CIB service FAILED", "apply_xml_diff: Diff application failed!", "crmd: .*Action A_RECOVER .* not supported", "send_ipc_message: IPC Channel to .* is not connected", "unconfirmed_actions: Waiting on .* unconfirmed actions", "cib_native_msgready: Message pending on command channel", "crmd:.*do_exit: Performing A_EXIT_1 - forcefully exiting the CRMd", "verify_stopped: Resource .* was active at shutdown. You may ignore this error if it is unmanaged.", ] stonith_ignore = [ "ERROR: stonithd_signon: ", "update_failcount: Updating failcount for child_DoFencing", "ERROR: te_connect_stonith: Sign-in failed: triggered a retry", ] stonith_ignore.extend(common_ignore) complist.append(Process("ccm", 0, [ "State transition S_IDLE", "Respawning client .*cib", "Respawning client .*crmd", "CCM connection appears to have failed", - "Client .*cib exited with return code 2.", - "Client .*crmd exited with return code 2.", + ".*cib .*returned rc 2.", + ".*crmd .*returned rc 2.", "crmd: .*Action A_RECOVER .* not supported", "crmd: .*Input I_TERMINATE from do_recover", "Exiting to recover from CCM connection failure", "crmd:.*do_exit: Could not recover from internal error", "crmd: .*I_ERROR.*(ccm_dispatch|crmd_cib_connection_destroy)", # "WARN: determine_online_status: Node .* is unclean", # "Scheduling Node .* for STONITH", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", "State transition S_STARTING -> S_PENDING", ], [], common_ignore, self)) complist.append(Process("cib", 0, [ "State transition S_IDLE", "Respawning client .*crmd", "Lost connection to the CIB service", "Connection to the CIB terminated...", - "Client .*crmd exited with return code 2.", + ".*crmd .*returned rc 2.", "State transition S_STARTING -> S_PENDING", "crmd: .*Input I_TERMINATE from do_recover", "crmd: .*I_ERROR.*crmd_cib_connection_destroy", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self)) complist.append(Process("lrmd", 0, [ "State transition S_IDLE", "LRM Connection failed", "Respawning client .*crmd", "crmd: .*I_ERROR.*lrm_dispatch", "State transition S_STARTING -> S_PENDING", - "Client .*crmd exited with return code 2.", + ".*crmd .*returned rc 2.", "crmd: .*Input I_TERMINATE from do_recover", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self)) complist.append(Process("crmd", 0, [ # "WARN: determine_online_status: Node .* is unclean", # "Scheduling Node .* for STONITH", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", "State transition S_IDLE", "State transition S_STARTING -> S_PENDING", ], [ "tengine: .*ERROR: subsystem_msg_dispatch: The server .* has left us: Shutting down...NOW", "pengine: .*ERROR: subsystem_msg_dispatch: The server .* has left us: Shutting down...NOW", ], common_ignore, self)) complist.append(Process("pengine", 1, [ "State transition S_IDLE", "Respawning client .*crmd", - "Client .*crmd exited with return code 2.", + ".*crmd .*returned rc 2.", "crmd: .*Input I_TERMINATE from do_recover", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self)) complist.append(Process("tengine", 1, [ "State transition S_IDLE", "Respawning client .*crmd", - "Client .*crmd exited with return code 2.", + ".*crmd .*returned rc 2.", "crmd: .*Input I_TERMINATE from do_recover", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self)) if self.Env["DoFencing"] == 1 : complist.append(Process("stonithd", 0, [], [ "tengine_stonith_connection_destroy: Fencing daemon has left us", "Attempting connection to fencing daemon", "te_connect_stonith: Connected", ], stonith_ignore, self)) # complist.append(Process("heartbeat", 0, [], [], [], self)) return complist def NodeUUID(self, node): lines = self.rsh.readlines(node, self["UUIDQueryCmd"]) for line in lines: self.debug("UUIDLine:"+ line) m = re.search(r'%s.+\((.+)\)' % node, line) if m: return m.group(1) return "" def StandbyStatus(self, node): out=self.rsh.readaline(node, self["StandbyQueryCmd"]%node) if not out: return "off" out = out[:-1] self.debug("Standby result: "+out) return out # status == "on" : Enter Standby mode # status == "off": Enter Active mode def SetStandbyMode(self, node, status): current_status = self.StandbyStatus(node) cmd = self["StandbyCmd"] % (node, status) ret = self.rsh(node, cmd) return True class HAResource(Resource): def __init__(self, cm, node, group=None): ''' Get information from xml node ''' if group == None : self.rid = str(node.getAttribute('id')) else : self.rid = group + ":" + str(node.getAttribute('id')) self.rclass = str(node.getAttribute('class')) self.rtype = str(node.getAttribute('type')) self.inc_name = None self.inc_no = -1 self.inc_max = -1 self.rparameters = {} nvpairs = [] list = node.getElementsByTagName('instance_attributes') if len(list) > 0: attributes = list[0] list = attributes.getElementsByTagName('attributes') if len(list) > 0: parameters = list[0] nvpairs = parameters.getElementsByTagName('nvpair') for nvpair in nvpairs: name=nvpair.getAttribute('name') value=nvpair.getAttribute('value') self.rparameters[name]=value # This should normally be called first... FIXME! Resource.__init__(self, cm, self.rtype, self.rid) # resources that dont need quorum will have: # ops = node.getElementsByTagName('op') for op in ops: if op.getAttribute('name') == "start" and op.getAttribute('prereq') == "nothing": self.needs_quorum = 0 def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' rc = self.CM.ResourceOp(self.rid, "monitor", nodename) return (rc == 0) def RunningNodes(self): return self.CM.ResourceLocation(self.rid) def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self.CM.ResourceOp(self.rid, "start", nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self.CM.ResourceOp(self.rid, "stop", nodename) def IsWorkingCorrectly(self, nodename): return self.IsRunningOn(nodename) ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': pass diff --git a/cts/CTStests.py.in b/cts/CTStests.py.in index 4d8834e0b2..0b7249a273 100644 --- a/cts/CTStests.py.in +++ b/cts/CTStests.py.in @@ -1,2486 +1,2486 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Tests module There are a few things we want to do here: ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. Add RecourceRecover testcase Zhao Kai ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # SPECIAL NOTE: # # Tests may NOT implement any cluster-manager-specific code in them. # EXTEND the ClusterManager object to provide the base capabilities # the test needs if you need to do something that the current CM classes # do not. Otherwise you screw up the whole point of the object structure # in CTS. # # Thank you. # import CTS from CM_hb import HBConfig import CTSaudits import time, os, re, types, string, tempfile, sys from CTSaudits import * from stat import * # List of all class objects for tests which we ought to # consider running. class RandomTests: ''' A collection of tests which are run at random. ''' def __init__(self, scenario, cm, tests, Audits): self.CM = cm self.Env = cm.Env self.Scenario = scenario self.Tests = [] self.Audits = [] self.ns=CTS.NodeStatus(self.Env) for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") if test.is_applicable(): self.Tests.append(test) if not scenario.IsApplicable(): raise ValueError("Scenario not applicable in" " given Environment") self.Stats = {"success":0, "failure":0, "BadNews":0} self.IndividualStats= {} for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be a subclass of ClusterAudit") if audit.is_applicable(): self.Audits.append(audit) def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def audit(self, BadNews, test): errcount=0 BadNewsDebug=0 #BadNews.debug=1 ignorelist = [] ignorelist.append(" CTS: ") ignorelist.append("BadNews:") ignorelist.extend(self.CM.errorstoignore()) if test: ignorelist.extend(test.errorstoignore()) while errcount < 1000: if BadNewsDebug: print "Looking for BadNews" match=BadNews.look(0) if match: if BadNewsDebug: print "BadNews found: "+match add_err = 1 for ignore in ignorelist: if add_err == 1 and re.search(ignore, match): if BadNewsDebug: print "Ignoring based on pattern: ("+ignore+")" add_err = 0 if add_err == 1: self.CM.log("BadNews: " + match) self.incr("BadNews") errcount=errcount+1 else: break else: self.CM.log("Big problems. Shutting down.") self.CM.stopall() self.summarize() raise ValueError("Looks like we hit the jackpot! :-)") for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " FAILED.") self.incr("auditfail") if test: test.incr("auditfail") def summarize(self): self.CM.log("****************") self.CM.log("Overall Results:" + repr(self.Stats)) self.CM.log("****************") self.CM.log("Detailed Results") for test in self.Tests: self.CM.log("Test %s: \t%s" %(test.name, repr(test.Stats))) self.CM.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def run(self, max=1): ( ''' Set up the given scenario, then run the selected tests at random for the selected number of iterations. ''') BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"] , timeout=0) BadNews.setwatch() self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.log("Enabling oprofile on %s" % node) self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init") self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start") if not self.Scenario.SetUp(self.CM): return None for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.rsh.remote_py( node, "os", "system", "opcontrol --save=cts.setup") testcount=1 time.sleep(30) # This makes sure everything is stabilized before starting... self.audit(BadNews, None) while testcount <= max: test = self.Env.RandomGen.choice(self.Tests) # Some tests want a node as an argument. nodechoice = self.Env.RandomNode() #logsize = os.stat(self.CM["LogFileName"])[ST_SIZE] #self.CM.log("Running test %s (%s) \t[%d : %d]" # % (test.name, nodechoice, testcount, logsize)) self.CM.log("Running test %s (%s) \t[%d]" % (test.name, nodechoice, testcount)) testcount = testcount + 1 starttime=time.time() test.starttime=starttime ret=test(nodechoice) for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.rsh.remote_py( node, "os", "system", "opcontrol --save=cts.%d" % (testcount-1)) if ret: self.incr("success") else: self.incr("failure") self.CM.log("Test %s (%s) \t[FAILED]" %(test.name,nodechoice)) # Better get the current info from the cluster... self.CM.statall() # Make sure logging is working and we have enough disk space... if not self.CM.Env["DoBSC"]: if not self.CM.TestLogging(): sys.exit(1) if not self.CM.CheckDf(): sys.exit(1) stoptime=time.time() elapsed_time = stoptime - starttime test_time = stoptime - test.starttime if not test.has_key("min_time"): test["elapsed_time"] = elapsed_time test["min_time"] = test_time test["max_time"] = test_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if test_time < test["min_time"]: test["min_time"] = test_time if test_time > test["max_time"]: test["max_time"] = test_time self.audit(BadNews, test) self.Scenario.TearDown(self.CM) for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.log("Disabling oprofile on %s" % node) self.CM.rsh.remote_py(node, "os", "system", "opcontrol --shutdown") self.audit(BadNews, None) for test in self.Tests: self.IndividualStats[test.name] = test.Stats return self.Stats, self.IndividualStats AllTestClasses = [ ] class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.timeout=120 self.starttime=0 def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def failure(self, reason="none"): '''Increment the failure count''' self.incr("failure") self.CM.log("Test " + self.name + " failed [reason:" + reason + "]") return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def canrunnow(self): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] ################################################################### class StopTest(CTSTest): ################################################################### '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="Stop" self.uspat = self.CM["Pat:We_stopped"] self.thempat = self.CM["Pat:They_stopped"] def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: return self.skipped() patterns = [] # Technically we should always be able to notice ourselves stopping patterns.append(self.CM["Pat:We_stopped"] % node) if self.CM.Env["use_logd"]: patterns.append(self.CM["Pat:Logd_stopped"] % node) # Any active node needs to notice this one left # NOTE: This wont work if we have multiple partitions for other in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node: patterns.append(self.CM["Pat:They_stopped"] %(other, node)) #self.debug("Checking %s will notice %s left"%(other, node)) watch = CTS.LogWatcher( self.CM["LogFileName"], patterns, self.CM["DeadTime"]) watch.setwatch() if node == self.CM.OurNode: self.incr("us") else: if self.CM.upcount() <= 1: self.incr("all") else: self.incr("them") self.CM.StopaCM(node) watch_result = watch.lookforall() failreason=None UnmatchedList = "||" if watch.unmatched: for regex in watch.unmatched: self.CM.log ("ERROR: Shutdown pattern not found: %s" % (regex)) UnmatchedList += regex + "||"; failreason="Missing shutdown pattern" self.CM.cluster_stable(self.CM["DeadTime"]) if not watch.unmatched or self.CM.upcount() == 0: return self.success() elif len(watch.unmatched) >= self.CM.upcount(): return self.failure("no match against (%s)" % UnmatchedList) if failreason == None: return self.success() else: return self.failure(failreason) # # We don't register StopTest because it's better when called by # another test... # ################################################################### class StartTest(CTSTest): ################################################################### '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name="start" self.debug = debug self.uspat = self.CM["Pat:We_started"] self.thempat = self.CM["Pat:They_started"] def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self.CM.upcount() == 0: self.incr("us") else: self.incr("them") if self.CM.ShouldBeStatus[node] != self.CM["down"]: return self.skipped() elif self.CM.StartaCM(node): return self.success() else: return self.failure("Startup %s on node %s failed" %(self.CM["Name"], node)) def is_applicable(self): '''StartTest is always applicable''' return 1 # # We don't register StartTest because it's better when called by # another test... # ################################################################### class FlipTest(CTSTest): ################################################################### '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'Flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("stopped") ret = self.stop(node) type="up->down" # Give the cluster time to recognize it's gone... time.sleep(self.CM["StableTime"]) elif self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("started") ret = self.start(node) type="down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) def is_applicable(self): '''FlipTest is always applicable''' return 1 # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) ################################################################### class RestartTest(CTSTest): ################################################################### '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Restart" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) ret1 = 1 if self.CM.StataCM(node): self.incr("WasStopped") if not self.start(node): return self.failure("start (setup) failure: "+node) self.starttime=time.time() if not self.stop(node): return self.failure("stop failure: "+node) if not self.start(node): return self.failure("start failure: "+node) return self.success() def is_applicable(self): '''RestartTest is always applicable''' return 1 # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) ################################################################### class StonithTest(CTSTest): ################################################################### '''Reboot a node by whacking it with stonith.''' def __init__(self, cm, timeout=900): CTSTest.__init__(self,cm) self.name="Stonith" self.theystopped = self.CM["Pat:They_dead"] self.allstopped = self.CM["Pat:All_stopped"] self.usstart = self.CM["Pat:We_started"] self.themstart = self.CM["Pat:They_started"] self.timeout = timeout self.ssherror = False def _reset(self, node): StonithWorked=False for tries in 1,2,3,4,5: if self.CM.Env.ResetNode(node): StonithWorked=True break return StonithWorked def setup(self, target_node): # nothing to do return 1 def __call__(self, node): '''Perform the 'stonith' test. (whack the node)''' self.incr("calls") stopwatch = 0 rc = 0 if not self.setup(node): return self.failure("Setup failed") # Figure out what log message to look for when/if it goes down # # Any active node needs to notice this one left # NOTE: This wont work if we have multiple partitions stop_patterns = [] for other in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node: stop_patterns.append(self.CM["Pat:They_stopped"] %(other, node)) stopwatch = 1 #self.debug("Checking %s will notice %s left"%(other, node)) if self.CM.ShouldBeStatus[node] == self.CM["down"]: # actually no-one will notice this node die since HA isnt running stopwatch = 0 # Figure out what log message to look for when it comes up if self.CM.upcount() == 1 and self.CM.ShouldBeStatus[node] == self.CM["up"]: uppat = (self.usstart % node) else: uppat = (self.themstart % node) upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat] , timeout=self.timeout) if stopwatch == 1: watch = CTS.LogWatcher(self.CM["LogFileName"], stop_patterns , timeout=self.CM["DeadTime"]+10) watch.setwatch() # Reset (stonith) the node self.CM.debug("Resetting: "+node) StonithWorked = self._reset(node) if not StonithWorked: return self.failure("Stonith didn't work") if self.ssherror == True: self.CM.log("NOTE: Stonith command reported success but node %s did not restart (atd, reboot or ssh error)" % node) return self.success() upwatch.setwatch() # Look() and see if the machine went down if stopwatch == 0: # Allow time for the node to die time.sleep(self.CM["DeadTime"]+10) elif not watch.lookforall(): if watch.unmatched: for regex in watch.unmatched: self.CM.log("Warn: STONITH pattern not found: %s"%regex) # !!no-one!! saw this node die if len(watch.unmatched) == len(stop_patterns): return self.failure("No-one saw %s die" %node) # else: syslog* lost a message # Alas I dont think this check is plausable (beekhof) # # Check it really stopped... #self.CM.ShouldBeStatus[node] = self.CM["down"] #if self.CM.StataCM(node) == 1: # ret1=0 # Look() and see if the machine came back up rc=0 if upwatch.look(): self.CM.debug("Startup pattern found: %s" %uppat) rc=1 else: self.CM.log("Warn: Startup pattern not found: %s" %uppat) # Check it really started... self.CM.ShouldBeStatus[node] = self.CM["up"] if rc == 0 and self.CM.StataCM(node) == 1: rc=1 # wait for the cluster to stabilize self.CM.cluster_stable() if node in self.CM.Env["oprofile"]: self.CM.log("Enabling oprofile on %s" % node) self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init") self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start") # return case processing if rc == 0: return self.failure("Node %s did not restart" %node) else: return self.success() def is_applicable(self): '''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE''' # for v2, stonithd test is a better test to run. if self.CM["Name"] == "linux-ha-v2": return None if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 # Register StonithTest as a good test to run AllTestClasses.append(StonithTest) ################################################################### class StonithdTest(StonithTest): ################################################################### def __init__(self, cm, timeout=600): StonithTest.__init__(self, cm, timeout=600) self.name="Stonithd" self.startall = SimulStartLite(cm) self.start = StartTest(cm) self.stop = StopTest(cm) self.init_node = None def _reset(self, target_node): if len(self.CM.Env["nodes"]) < 2: return self.skipped() StonithWorked = False SshNotWork = 0 for tries in range(1,5): # For some unknown reason, every now and then the ssh plugin just # can't kill the target_node - everything works fine with stonithd # and the plugin, but atd, reboot or ssh (or maybe something else) # doesn't do its job and target_node remains alive. So look for # the indicative messages and bubble-up the error via ssherror watchpats = [] watchpats.append("Initiating ssh-reset") watchpats.append("CRIT: still able to ping") watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+30) watch.setwatch() fail_reasons = [] if self.CM.Env.ResetNode2(self.init_node, target_node, fail_reasons): StonithWorked = True break if watch.lookforall(): SshNotWork = SshNotWork + 1 continue for reason in fail_reasons: self.CM.log(reason) if StonithWorked == False and SshNotWork == tries: StonithWorked = True self.ssherror = True return StonithWorked def setup(self, target_node): if len(self.CM.Env["nodes"]) < 2: return 1 self.init_node = self.CM.Env.RandomNode() while self.init_node == target_node: self.init_node = self.CM.Env.RandomNode() if not self.startall(None): return self.failure("Test setup failed") return 1 def is_applicable(self): if not self.CM["Name"] == "linux-ha-v2": return 0 if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 AllTestClasses.append(StonithdTest) ################################################################### class IPaddrtest(CTSTest): ################################################################### '''Find the machine supporting a particular IP address, and knock it down. [Hint: This code isn't finished yet...] ''' def __init__(self, cm, IPaddrs): CTSTest.__init__(self,cm) self.name="IPaddrtest" self.IPaddrs = IPaddrs self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, IPaddr): ''' Perform the IPaddr test... ''' self.incr("calls") node = self.CM.Env.RandomNode() self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["StableTime"]) ret2 = self.start(node) if not ret1: return self.failure("Could not stop") if not ret2: return self.failure("Could not start") return self.success() def is_applicable(self): '''IPaddrtest is always applicable (but shouldn't be)''' return 1 ################################################################### class StartOnebyOne(CTSTest): ################################################################### '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StartOnebyOne" self.stopall = SimulStopLite(cm) self.start = StartTest(cm) self.ns=CTS.NodeStatus(cm.Env) def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Test setup failed") failed=[] self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.start(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to start: " + repr(failed)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''StartOnebyOne is always applicable''' return 1 # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) ################################################################### class SimulStart(CTSTest): ################################################################### '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStart" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Setup failed") self.CM.clear_all_caches() if not self.startall(None): return self.failure("Startall failed") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''SimulStart is always applicable''' return 1 # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) ################################################################### class SimulStop(CTSTest): ################################################################### '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStop" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.stopall(None): return self.failure("Stopall failed") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''SimulStop is always applicable''' return 1 # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) ################################################################### class StopOnebyOne(CTSTest): ################################################################### '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StopOnebyOne" self.startall = SimulStartLite(cm) self.stop = StopTest(cm) def __call__(self, dummy): '''Perform the 'StopOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") failed=[] self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.stop(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to stop: " + repr(failed)) self.CM.clear_all_caches() return self.success() def is_applicable(self): '''StopOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(StopOnebyOne) ################################################################### class RestartOnebyOne(CTSTest): ################################################################### '''Restart all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="RestartOnebyOne" self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'RestartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") did_fail=[] self.starttime=time.time() self.restart = RestartTest(self.CM) for node in self.CM.Env["nodes"]: if not self.restart(node): did_fail.append(node) if did_fail: return self.failure("Could not restart %d nodes: %s" %(len(did_fail), repr(did_fail))) return self.success() def is_applicable(self): '''RestartOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(RestartOnebyOne) ################################################################### class PartialStart(CTSTest): ################################################################### '''Start a node - but tell it to stop before it finishes starting up''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="PartialStart" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, node): '''Perform the 'PartialStart' test. ''' self.incr("calls") ret = self.stopall(None) if not ret: return self.failure("Setup failed") # FIXME! This should use the CM class to get the pattern # then it would be applicable in general watchpats = [] watchpats.append("Starting crmd") watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats, timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.StartaCMnoBlock(node) ret = watch.lookforall() if not ret: self.CM.log("Patterns not found: " + repr(watch.unmatched)) return self.failure("Setup of %s failed" % node) ret = self.stopall(None) if not ret: return self.failure("%s did not stop in time" % node) return self.success() def is_applicable(self): '''Partial is always applicable''' if self.CM["Name"] == "linux-ha-v2": return 1 else: return 0 # Register StopOnebyOne as a good test to run AllTestClasses.append(PartialStart) ################################################################### class StandbyTest(CTSTest): ################################################################### '''Put a node in standby mode''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby" self.successpat = self.CM["Pat:StandbyOK"] self.nostandbypat = self.CM["Pat:StandbyNONE"] self.transient = self.CM["Pat:StandbyTRANSIENT"] def __call__(self, node): '''Perform the 'standby' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() if self.CM.upcount() < 2: self.incr("nostandby") pat = self.nostandbypat else: self.incr("standby") pat = self.successpat # # You could make a good argument that the cluster manager # ought to give us good clues on when its a bad time to # switch over to the other side, but heartbeat doesn't... # It could also queue the request. But, heartbeat # doesn't do that either :-) # retrycount=0 while (retrycount < 10): watch = CTS.LogWatcher(self.CM["LogFileName"] , [pat, self.transient] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.rsh(node, self.CM["Standby"]) match = watch.look() if match: if re.search(self.transient, match): self.incr("retries") time.sleep(2) retrycount=retrycount+1 else: return self.success() else: break # No point in retrying... return self.failure("did not find pattern " + pat) def is_applicable(self): '''StandbyTest is applicable when the CM has a Standby command''' if not self.CM.has_key("Standby"): return None else: #if self.CM.Env.has_key("DoStandby"): #flag=self.CM.Env["DoStandby"] #if type(flag) == types.IntType: #return flag #if not re.match("[yt]", flag, re.I): #return None # # We need to strip off everything after the first blank # cmd=self.CM["Standby"] cmd = cmd.split()[0] if not os.access(cmd, os.X_OK): return None cf = self.CM.cf if not cf.Parameters.has_key("auto_failback"): return None elif cf.Parameters["auto_failback"][0] == "legacy": return None return 1 # Register StandbyTest as a good test to run AllTestClasses.append(StandbyTest) ####################################################################### class StandbyTest2(CTSTest): ####################################################################### '''Standby with CRM of HA release 2''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby2" self.start = StartTest(cm) self.startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resouces, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): self.incr("calls") ret=self.startall(None) if not ret: return self.failure("Start all nodes failed") self.CM.debug("Make sure node %s is active" % node) if self.CM.StandbyStatus(node) != "off": if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.CM.debug("Getting resources running on node %s" % node) rsc_on_node = [] for rsc in self.CM.Resources(): if rsc.IsRunningOn(node): rsc_on_node.append(rsc) self.CM.debug("Setting node %s to standby mode" % node) if not self.CM.SetStandbyMode(node, "on"): return self.failure("can't set node %s to standby mode" % node) time.sleep(30) # Allow time for the update to be applied and cause something self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "on": return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status)) self.CM.debug("Checking resources") for rsc in self.CM.Resources(): if rsc.IsRunningOn(node): return self.failure("%s set to standby, %s is still running on it" % (node, rsc.rid)) self.CM.debug("Setting node %s to active mode" % node) if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) time.sleep(30) # Allow time for the update to be applied and cause something self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.CM.debug("Checking resources") for rsc in rsc_on_node: if not rsc.IsRunningOn(node): return self.failure("%s set to active but %s is NOT back" % (node, rsc.rid)) return self.success() def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllTestClasses.append(StandbyTest2) ####################################################################### class Fastdetection(CTSTest): ####################################################################### '''Test the time which one node find out the other node is killed very quickly''' def __init__(self,cm,timeout=60): CTSTest.__init__(self, cm) self.name = "DetectionTime" self.they_stopped = self.CM["Pat:They_stopped"] self.timeout = timeout self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.standby = StandbyTest(cm) self.__setitem__("min", 0) self.__setitem__("max", 0) self.__setitem__("totaltime", 0) def __call__(self, node): '''Perform the fastfailureDetection test''' self.incr("calls") ret=self.startall(None) if not ret: return self.failure("Test setup failed") if self.CM.upcount() < 2: return self.skipped() # Make sure they're not holding any resources ret = self.standby(node) if not ret: return ret stoppat = (self.they_stopped % ("", node)) stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout) stopwatch.setwatch() # # This test is CM-specific - FIXME!! # if self.CM.rsh(node, "killall -9 heartbeat")==0: Starttime = os.times()[4] if stopwatch.look(): Stoptime = os.times()[4] # This test is CM-specific - FIXME!! self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") Detectiontime = Stoptime-Starttime detectms = int(Detectiontime*1000+0.5) self.CM.log("...failure detection time: %d ms" % detectms) self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime if self.Stats["min"] == 0: self.Stats["min"] = Detectiontime if Detectiontime > self.Stats["max"]: self.Stats["max"] = Detectiontime if Detectiontime < self.Stats["min"]: self.Stats["min"] = Detectiontime self.CM.ShouldBeStatus[node] = self.CM["down"] self.start(node) return self.success() else: # This test is CM-specific - FIXME!! self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") self.CM.ShouldBeStatus[node] = self.CM["down"] ret=self.start(node) return self.failure("Didn't find the log message") else: return self.failure("Couldn't kill cluster manager") def is_applicable(self): '''This test is applicable when auto_failback != legacy''' return self.standby.is_applicable() # This test is CM-specific - FIXME!! def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ "ccm.*ERROR: ccm_control_process:failure to send protoversion request" , "ccm.*ERROR: Lost connection to heartbeat service. Need to bail out" ] AllTestClasses.append(Fastdetection) ############################################################################## class BandwidthTest(CTSTest): ############################################################################## # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which heartbeat uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) self.tempfile = tempfile.mktemp(".cts") self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount()<1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] port = int(port) ret = self.startall(None) if not ret: return self.failure("Test setup failed") time.sleep(5) # We get extra messages right after startup. fstmpfile = "/var/run/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile) rc = self.CM.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) self.CM.rsh.cp(farfile, self.tempfile) Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.CM.log("Could not compute bandwidth.") return self.success() intband = int(Bandwidth + 0.5) self.CM.log("...bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.CM.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): fp = open(file, "r") fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count=count+1 linesplit = string.split(line," ") for j in range(len(linesplit)-1): if linesplit[j]=="udp": break if linesplit[j]=="length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = string.split(T1,":") time2split = string.split(timesplit[2],".") time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = string.split(line," ") for j in range(len(linessplit)-1): if linessplit[j] =="udp": break if linesplit[j]=="length:": break try: sum=int(linessplit[j+1])+sum except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = string.split(T2,":") time2split = string.split(timesplit[2],".") time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return (sum*8)/time def is_applicable(self): '''BandwidthTest is always applicable''' return 0 AllTestClasses.append(BandwidthTest) ########################################################################## class RedundantpathTest(CTSTest): ########################################################################## '''In heartbeat, it has redundant path to communicate between the cluster''' # # Tests should not be cluster-manager specific # One needs to isolate what you need from the cluster manager and then # add a (new) API to do it. # def __init__(self,cm,timeout=60): CTSTest.__init__(self,cm) self.name = "RedundantpathTest" self.timeout = timeout def PathCount(self): '''Return number of communication paths''' Path = self.CM.InternalCommConfig() cf = self.CM.cf eths = [] serials = [] num = 0 for interface in Path["interface"]: if re.search("eth",interface): eths.append(interface) num = num + 1 if re.search("/dev",interface): serials.append(interface) num = num + 1 return (num, eths, serials) def __call__(self,node): '''Perform redundant path test''' self.incr("calls") if self.CM.ShouldBeStatus[node]!=self.CM["up"]: return self.skipped() (num, eths, serials) = self.PathCount() for eth in eths: if self.CM.rsh(node,"ifconfig %s down" % eth)==0: PathDown = "OK" break if PathDown != "OK": for serial in serials: if self.CM.rsh(node,"setserial %s uart none" % serial)==0: PathDown = "OK" break if PathDown != "OK": return self.failure("Cannot break the path") time.sleep(self.timeout) for audit in CTSaudits.AuditList(self.CM): if not audit(): for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.failure("Redundant path fail") for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.success() def is_applicable(self): '''It is applicable when you have more than one connection''' return self.PathCount()[0] > 1 # FIXME!! Why is this one commented out? #AllTestClasses.append(RedundantpathTest) ########################################################################## class DRBDTest(CTSTest): ########################################################################## '''In heartbeat, it provides replicated storage.''' def __init__(self,cm, timeout=10): CTSTest.__init__(self,cm) self.name = "DRBD" self.timeout = timeout def __call__(self, dummy): '''Perform the 'DRBD' test.''' self.incr("calls") for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() # Note: All these special cases with Start/Stop/StatusDRBD # should be reworked to use resource objects instead of # being hardwired to bypass the objects here. for node in self.CM.Env["nodes"]: done=time.time()+self.timeout+1 while (time.time()done: return self.failure("Can't start drbd, please check it") device={} for node in self.CM.Env["nodes"]: device[node]=self.getdevice(node) node = self.CM.Env["nodes"][0] done=time.time()+self.timeout+1 while 1: if (time.time()>done): return self.failure("the drbd could't sync") self.CM.rsh(node,"cp /proc/drbd /var/run >/dev/null 2>&1") if self.CM.rsh.cp("%s:/var/run/drbd" % node,"/var/run"): line = open("/tmp/var/run").readlines()[2] p = line.find("Primary") s1 = line.find("Secondary") s2 = line.rfind("Secondary") if s1!=s2: if self.CM.rsh(node,"drbdsetup %s primary" % device[node]): pass if p!=-1: if p/dev/null" % (self.rid, node)) watch.lookforall() self.CM.cluster_stable() recovernode=self.CM.ResourceLocation(self.rid) if len(recovernode)==1: self.CM.debug("Recovered: %s is running on %s" %(self.rid, recovernode[0])) if not watch.unmatched: return self.success() else: return self.failure("Patterns not found: %s" % repr(watch.unmatched)) elif len(recovernode)==0: return self.failure("%s was not recovered and is inactive" % self.rid) else: return self.failure("%s is now active on more than one node: %s" %(self.rid, str(recovernode))) def is_applicable(self): '''ResourceRecover is applicable only when there are resources running on our cluster and environment is linux-ha-v2''' if self.CM["Name"] == "linux-ha-v2": resourcelist=self.CM.Resources() if len(resourcelist)==0: self.CM.log("No resources on this cluster") return 0 else: return 1 return 0 def errorstoignore(self): '''Return list of errors which should be ignored''' return [ """Updating failcount for %s""" % self.rid, """Unknown operation: fail""", """ERROR: sending stonithRA op to stonithd failed.""", """ERROR: process_lrm_event: LRM operation %s_%s_%d""" % (self.rid, self.action, self.interval), """ERROR: process_graph_event: Action %s_%s_%d initiated outside of a transition""" % (self.rid, self.action, self.interval), ] AllTestClasses.append(ResourceRecover) ################################################################### class ComponentFail(CTSTest): ################################################################### def __init__(self, cm): CTSTest.__init__(self,cm) self.name="ComponentFail" self.startall = SimulStartLite(cm) self.complist = cm.Components() self.theystart = cm["Pat:They_started"] self.patterns = [] def __call__(self, node): '''Perform the 'ComponentFail' test. ''' self.incr("calls") self.patterns = [] # start all nodes ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.CM.cluster_stable(self.CM["StableTime"]): return self.failure("Setup failed - unstable") node_is_dc = self.CM.is_node_dc(node, None) # select a component to kill chosen = self.CM.Env.RandomGen.choice(self.complist) while chosen.dc_only == 1 and node_is_dc == 0: chosen = self.CM.Env.RandomGen.choice(self.complist) self.CM.debug("Killing %s (dc=%d)" % (chosen.name, node_is_dc)) self.incr(chosen.name) self.patterns.extend(chosen.pats) if chosen.dc_only: self.patterns.append("%s crmd:.*Process %s:.* exited" %(node, chosen.name)) self.patterns.append("%s crmd:.*I_ERROR.*crmdManagedChildDied" %node) self.patterns.append("%s crmd:.*The %s subsystem terminated unexpectedly" %(node, chosen.name)) else: self.patterns.append("%s heartbeat.*Respawning client.*%s" %(node, chosen.name)) - self.patterns.append("%s heartbeat.*Client.*%s.*killed by signal 9" %(node, chosen.name)) + self.patterns.append("%s heartbeat.*%s.*killed by signal 9" %(node, chosen.name)) if node_is_dc: self.patterns.extend(chosen.dc_pats) # supply a copy so self.patterns doesnt end up empty tmpPats = [] tmpPats.extend(self.patterns) self.patterns.extend(chosen.badnews_ignore) # set the watch for stable watch = CTS.LogWatcher( self.CM["LogFileName"], tmpPats, self.CM["DeadTime"] + self.CM["StableTime"] + self.CM["StartTime"]) watch.setwatch() # kill the component chosen.kill(node) # check to see Heartbeat noticed matched = watch.lookforall() if not matched: self.CM.log("Patterns not found: " + repr(watch.unmatched)) self.CM.cluster_stable(self.CM["StartTime"]) return self.failure("Didn't find all expected patterns") self.CM.debug("Found: "+ repr(matched)) # now watch it recover... for attempt in (1, 2, 3, 4, 5): self.CM.debug("Waiting for the cluster to recover...") if self.CM.cluster_stable(self.CM["StartTime"]): return self.success() return self.failure("Cluster did not become stable") def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 def errorstoignore(self): '''Return list of errors which should be ignored''' return self.patterns AllTestClasses.append(ComponentFail) #################################################################### class Split_brainTest2(CTSTest): #################################################################### '''It is used to test split-brain. when the path between the two nodes break check the two nodes both take over the resource''' def __init__(self,cm): CTSTest.__init__(self,cm) self.name = "Split_brain2" self.start = StartTest(cm) self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform split-brain test''' self.incr("calls") ret = self.startall(None) if not ret: return self.failure("Setup failed") count1 = self.CM.Env.RandomGen.randint(1,len(self.CM.Env["nodes"])-1) partition1 = [] while len(partition1) < count1: select = self.CM.Env.RandomGen.choice(self.CM.Env["nodes"]) if not select in partition1: partition1.append(select) partition2 = [] for member in self.CM.Env["nodes"]: if not member in partition1: partition2.append(member) allownodes1 = "" for member in partition1: allownodes1 += member + " " allownodes2 = "" for member in partition2: allownodes2 += member + " " self.CM.log("Partition1: " + str(partition1)) self.CM.log("Partition2: " + str(partition2)) '''isolate nodes, Look for node is dead message''' watchdeadpats = [ ] deadpat = self.CM["Pat:They_dead"] for member in self.CM.Env["nodes"]: thispat = (deadpat % member) watchdeadpats.append(thispat) watchdead = CTS.LogWatcher(self.CM["LogFileName"], watchdeadpats\ , timeout=self.CM["DeadTime"]+60) watchdead.ReturnOnlyMatch() watchdead.setwatch() for member in partition1: if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 : self.CM.savecomm_node(node) if not self.CM.isolate_node(member,allownodes1): return self.failure("Could not isolate the nodes") for member in partition2: if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 : self.CM.savecomm_node(node) if not self.CM.isolate_node(member,allownodes2): return self.failure("Could not isolate the nodes") if not watchdead.lookforall(): for member in self.CM.Env["nodes"]: self.CM.unisolate_node(member) self.CM.log("Patterns not found: " + repr(watchdead.unmatched)) return self.failure("Didn't find the log 'dead' message") dcnum=0 while dcnum < 2: dcnum = 0 for member in self.CM.Env["nodes"]: if self.CM.is_node_dc(member): dcnum += 1 time.sleep(1) ''' Unisolate the node, look for the return partition message and check whether they restart ''' watchpartitionpats = [self.CM["Pat:DC_IDLE"]] partitionpat = self.CM["Pat:Return_partition"] for member in self.CM.Env["nodes"]: thispat = (partitionpat % member) watchpartitionpats.append(thispat) watchpartition = CTS.LogWatcher(self.CM["LogFileName"], watchpartitionpats\ , timeout=self.CM["DeadTime"]+60) watchpartition.setwatch() for member in self.CM.Env["nodes"]: if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 : self.CM.restorecomm_node(node) self.CM.unisolate_node(member) if not watchpartition.lookforall(): self.CM.log("Patterns not found: " + repr(watchpartition.unmatched)) return self.failure("Didn't find return from partition messages") return self.success() def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ "ERROR:.*Both machines own.*resources" , "ERROR:.*lost a lot of packets!" , "ERROR: Cannot rexmit pkt .*: seqno too low" , "ERROR: Irretrievably lost packet: node" ] #AllTestClasses.append(Split_brainTest2) #################################################################### class MemoryTest(CTSTest): #################################################################### '''Check to see if anyone is leaking memory''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Memory" # self.test = ElectionMemoryTest(cm) self.test = ResourceRecover(cm) self.startall = SimulStartLite(cm) self.before = {} self.after = {} def __call__(self, node): ps_command='''ps -eo ucomm,pid,pmem,tsiz,dsiz,rss,vsize | grep -e ccm -e ha_logd -e cib -e crmd -e lrmd -e tengine -e pengine''' memory_error = [ "", "", "", "Code", "Data", "Resident", "Total" ] ret = self.startall(None) if not ret: return self.failure("Test setup failed") time.sleep(10) for node in self.CM.Env["nodes"]: self.before[node] = {} rsh_pipe = self.CM.rsh.popen(node, ps_command) rsh_pipe.tochild.close() result = rsh_pipe.fromchild.readline() while result: tokens = result.split() self.before[node][tokens[1]] = result result = rsh_pipe.fromchild.readline() rsh_pipe.fromchild.close() self.lastrc = rsh_pipe.wait() # do something... if not self.test(node): return self.failure("Underlying test failed") time.sleep(10) for node in self.CM.Env["nodes"]: self.after[node] = {} rsh_pipe = self.CM.rsh.popen(node, ps_command) rsh_pipe.tochild.close() result = rsh_pipe.fromchild.readline() while result: tokens = result.split() self.after[node][tokens[1]] = result result = rsh_pipe.fromchild.readline() rsh_pipe.fromchild.close() self.lastrc = rsh_pipe.wait() failed_nodes = [] for node in self.CM.Env["nodes"]: failed = 0 for process in self.before[node]: messages = [] before_line = self.before[node][process] after_line = self.after[node][process] if not after_line: self.CM.log("%s %s[%s] exited during the test" %(node, before_tokens[0], before_tokens[1])) continue before_tokens = before_line.split() after_tokens = after_line.split() # 3 : Code size # 4 : Data size # 5 : Resident size # 6 : Total size for index in [ 3, 4, 6 ]: mem_before = int(before_tokens[index]) mem_after = int(after_tokens[index]) mem_diff = mem_after - mem_before mem_allow = mem_before * 0.01 # for now... mem_allow = 0 if mem_diff > mem_allow: failed = 1 messages.append("%s size grew by %dkB (%dkB)" %(memory_error[index], mem_diff, mem_after)) elif mem_diff < 0: messages.append("%s size shrank by %dkB (%dkB)" %(memory_error[index], mem_diff, mem_after)) if len(messages) > 0: self.CM.log("Process %s[%s] on %s: %s" %(before_tokens[0], before_tokens[1], node, repr(messages))) self.CM.debug("%s Before: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB" %(node, before_tokens[0], before_tokens[1], before_tokens[2], before_tokens[3], before_tokens[4], before_tokens[5], before_tokens[6])) self.CM.debug("%s After: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB" %(node, after_tokens[0], after_tokens[1], after_tokens[2], after_tokens[3], after_tokens[4], after_tokens[5], after_tokens[6])) if failed == 1: failed_nodes.append(node) if len(failed_nodes) > 0: return self.failure("Memory leaked on: " + repr(failed_nodes)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [ """ERROR: .* LRM operation.*monitor on .*: not running""", """pengine:.*Handling failed """] def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 #AllTestClasses.append(MemoryTest) #################################################################### class ElectionMemoryTest(CTSTest): #################################################################### '''Check to see if anyone is leaking memory''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Election" def __call__(self, node): self.rsh.readaline(node, self.CM["ElectionCmd"]%node) if self.CM.cluster_stable(): return self.success() return self.failure("Cluster not stable") def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''Never applicable, only for use by the memory test''' return 0 AllTestClasses.append(ElectionMemoryTest) #################################################################### class SpecialTest1(CTSTest): #################################################################### '''Set up a custom test to cause quorum failure issues for Andrew''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SpecialTest1" self.startall = SimulStartLite(cm) self.restart1 = RestartTest(cm) self.stopall = SimulStopLite(cm) def __call__(self, node): '''Perform the 'SpecialTest1' test for Andrew. ''' self.incr("calls") # Shut down all the nodes... ret = self.stopall(None) if not ret: return ret # Start the selected node ret = self.restart1(node) if not ret: return ret # Start all remaining nodes ret = self.startall(None) return ret def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): return 1 AllTestClasses.append(SpecialTest1) ################################################################### class NearQuorumPointTest(CTSTest): ################################################################### ''' This test brings larger clusters near the quorum point (50%). In addition, it will test doing starts and stops at the same time. Here is how I think it should work: - loop over the nodes and decide randomly which will be up and which will be down Use a 50% probability for each of up/down. - figure out what to do to get into that state from the current state - in parallel, bring up those going up and bring those going down. ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="NearQuorumPoint" def __call__(self, dummy): '''Perform the 'NearQuorumPoint' test. ''' self.incr("calls") startset = [] stopset = [] #decide what to do with each node for node in self.CM.Env["nodes"]: action = self.CM.Env.RandomGen.choice(["start","stop"]) #action = self.CM.Env.RandomGen.choice(["start","stop","no change"]) if action == "start" : startset.append(node) elif action == "stop" : stopset.append(node) self.CM.debug("start nodes:" + repr(startset)) self.CM.debug("stop nodes:" + repr(stopset)) #add search patterns watchpats = [ ] for node in stopset: if self.CM.ShouldBeStatus[node] == self.CM["up"]: watchpats.append(self.CM["Pat:We_stopped"] % node) for node in startset: if self.CM.ShouldBeStatus[node] == self.CM["down"]: watchpats.append(self.CM["Pat:They_started"] % node) if len(watchpats) == 0: return self.skipped() watchpats.append(self.CM["Pat:DC_IDLE"]) watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() #begin actions for node in stopset: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.StopaCMnoBlock(node) for node in startset: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.StartaCMnoBlock(node) #get the result if watch.lookforall(): self.CM.cluster_stable() return self.success() self.CM.log("Warn: Patterns not found: " + repr(watch.unmatched)) #get the "bad" nodes upnodes = [] for node in stopset: if self.CM.StataCM(node) == 1: upnodes.append(node) downnodes = [] for node in startset: if self.CM.StataCM(node) == 0: downnodes.append(node) if upnodes == [] and downnodes == []: self.CM.cluster_stable() return self.success() if len(upnodes) > 0: self.CM.log("Warn: Unstoppable nodes: " + repr(upnodes)) if len(downnodes) > 0: self.CM.log("Warn: Unstartable nodes: " + repr(downnodes)) return self.failure() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllTestClasses.append(NearQuorumPointTest) ################################################################### class BSC_AddResource(CTSTest): ################################################################### '''Add a resource to the cluster''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="AddResource" self.resource_offset = 0 self.cib_cmd="""@sbindir@/cibadmin -C -o %s -X '%s' """ def __call__(self, node): self.resource_offset = self.resource_offset + 1 r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset) start_pat = "crmd.*%s_start_0.*complete" patterns = [] patterns.append(start_pat % r_id) watch = CTS.LogWatcher( self.CM["LogFileName"], patterns, self.CM["DeadTime"]) watch.setwatch() fields = string.split(self.CM.Env["IPBase"], '.') fields[3] = str(int(fields[3])+1) ip = string.join(fields, '.') self.CM.Env["IPBase"] = ip if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip): return self.failure("Make resource %s failed" % r_id) failed = 0 watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.CM.log ("Warn: Pattern not found: %s" % (regex)) failed = 1 if failed: return self.failure("Resource pattern(s) not found") if not self.CM.cluster_stable(self.CM["DeadTime"]): return self.failure("Unstable cluster") return self.success() def make_ip_resource(self, node, id, rclass, type, ip): self.CM.log("Creating %s::%s:%s (%s) on %s" % (rclass,type,id,ip,node)) rsc_xml=""" """ % (id, rclass, type, id, id, ip) node_constraint=""" """ % (id, id, id, id, node) rc = 0 (rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("constraints", node_constraint)) if rc != 0: self.CM.log("Constraint creation failed: %d" % rc) return None (rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("resources", rsc_xml)) if rc != 0: self.CM.log("Resource creation failed: %d" % rc) return None return 1 def is_applicable(self): if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["DoBSC"]: return 1 return None def TestList(cm): result = [] for testclass in AllTestClasses: bound_test = testclass(cm) if bound_test.is_applicable(): result.append(bound_test) return result class SimulStopLite(CTSTest): ################################################################### '''Stop any active nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStopLite" def __call__(self, dummy): '''Perform the 'SimulStopLite' setup work. ''' self.incr("calls") self.CM.debug("Setup: " + self.name) # We ignore the "node" parameter... watchpats = [ ] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("WasStarted") watchpats.append(self.CM["Pat:All_stopped"] % node) if self.CM.Env["use_logd"]: watchpats.append(self.CM["Pat:Logd_stopped"] % node) if len(watchpats) == 0: self.CM.clear_all_caches() return self.skipped() # Stop all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.StopaCMnoBlock(node) if watch.lookforall(): self.CM.clear_all_caches() return self.success() did_fail=0 up_nodes = [] for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 1: did_fail=1 up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: " + repr(up_nodes)) self.CM.log("Warn: All nodes stopped but CTS didnt detect: " + repr(watch.unmatched)) self.CM.clear_all_caches() return self.failure("Missing log message: "+repr(watch.unmatched)) def is_applicable(self): '''SimulStopLite is a setup test and never applicable''' return 0 ################################################################### class SimulStartLite(CTSTest): ################################################################### '''Start any stopped nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStartLite" def __call__(self, dummy): '''Perform the 'SimulStartList' setup work. ''' self.incr("calls") self.CM.debug("Setup: " + self.name) # We ignore the "node" parameter... watchpats = [ ] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") watchpats.append(self.CM["Pat:They_started"] % node) if len(watchpats) == 0: return self.skipped() # Start all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.StartaCMnoBlock(node) if watch.lookforall(): for attempt in (1, 2, 3, 4, 5): if self.CM.cluster_stable(): return self.success() return self.failure("Cluster did not stabilize") did_fail=0 unstable = [] for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 0: did_fail=1 unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: " + repr(unstable)) unstable = [] for node in self.CM.Env["nodes"]: if not self.CM.node_stable(node): did_fail=1 unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: " + repr(unstable)) self.CM.log("ERROR: All nodes started but CTS didnt detect: " + repr(watch.unmatched)) return self.failure() def is_applicable(self): '''SimulStartLite is a setup test and never applicable''' return 0 ################################################################### class LoggingTest(CTSTest): ################################################################### def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Logging" def __call__(self, dummy): '''Perform the 'Logging' test. ''' self.incr("calls") # Make sure logging is working and we have enough disk space... if not self.CM.TestLogging(): sys.exit(1) if not self.CM.CheckDf(): sys.exit(1) def is_applicable(self): '''ResourceRecover is applicable only when there are resources running on our cluster and environment is linux-ha-v2''' return self.CM.Env["DoBSC"] def errorstoignore(self): '''Return list of errors which should be ignored''' return [] #AllTestClasses.append(LoggingTest)