diff --git a/cts/CM_LinuxHAv2.py.in b/cts/CM_LinuxHAv2.py.in index e70bf43ba8..929c68703e 100755 --- a/cts/CM_LinuxHAv2.py.in +++ b/cts/CM_LinuxHAv2.py.in @@ -1,624 +1,577 @@ #!@PYTHON@ '''CTS: Cluster Testing System: LinuxHA v2 dependent modules... ''' __copyright__=''' Author: Huang Zhen Copyright (C) 2004 International Business Machines Additional Audits, Revised Start action, Default Configuration: Copyright (C) 2004 Andrew Beekhof ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import os,sys,CTS,CTSaudits,CTStests, warnings from CTS import * from CM_hb import HeartbeatCM from xml.dom.minidom import * from CTSaudits import ClusterAudit from CTStests import * from CIB import * ####################################################################### # # LinuxHA v2 dependent modules # ####################################################################### class LinuxHAv2(HeartbeatCM): ''' The linux-ha version 2 cluster manager class. It implements the things we need to talk to and manipulate linux-ha version 2 clusters ''' def __init__(self, Environment, randseed=None): HeartbeatCM.__init__(self, Environment, randseed=randseed) self.clear_cache = 0 self.cib_installed = 0 self.config = None self.cluster_monitor = 0 self.use_short_names = 1 self.update({ "Name" : "linux-ha-v2", "DeadTime" : 300, "StartTime" : 300, # Max time to start up "StableTime" : 30, "StartCmd" : "@INITDIR@/heartbeat@INIT_EXT@ start > /dev/null 2>&1", "StopCmd" : "@INITDIR@/heartbeat@INIT_EXT@ stop > /dev/null 2>&1", "ElectionCmd" : "@sbindir@/crmadmin -E %s", "StatusCmd" : "@sbindir@/crmadmin -S %s 2>/dev/null", "EpocheCmd" : "@sbindir@/ccm_tool -e", "QuorumCmd" : "@sbindir@/ccm_tool -q", "CibQuery" : "@sbindir@/cibadmin -Ql", "ParitionCmd" : "@sbindir@/ccm_tool -p", "IsRscRunning" : "@libdir@/heartbeat/lrmadmin -E %s monitor 0 0 EVERYTIME 2>/dev/null|grep return", "ExecuteRscOp" : "@libdir@/heartbeat/lrmadmin -E %s %s 0 0 EVERYTIME 2>/dev/null", "CIBfile" : "%s:@HA_VARLIBDIR@/heartbeat/crm/cib.xml", "TmpDir" : "/tmp", "BreakCommCmd2" : "@libdir@/heartbeat/TestHeartbeatComm break-communication %s>/dev/null 2>&1", "IsIPAddrRscRunning" : "", "StandbyCmd" : "@sbindir@/crm_standby -U %s -v %s 2>/dev/null", "UUIDQueryCmd" : "@sbindir@/crmadmin -N", "StandbyQueryCmd" : "@sbindir@/crm_standby -GQ -U %s 2>/dev/null", # Patterns to look for in the log files for various occasions... "Pat:DC_IDLE" : "crmd.*State transition.*-> S_IDLE", # This wont work if we have multiple partitions # Use: "Pat:They_started" : "%s crmd:.*State transition.*-> S_NOT_DC", "Pat:They_started" : "Updating node state to member for %s", "Pat:We_started" : "crmd.*%s: State transition.*-> S_IDLE", "Pat:We_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete", "Pat:Logd_stopped" : "%s logd:.*Exiting write process", "Pat:They_stopped" : "%s crmd:.*LOST:.* %s ", "Pat:All_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete", "Pat:They_dead" : "node %s.*: is dead", "Pat:TransitionComplete" : "Transition status: Complete: complete", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"ERROR:", r"CRIT:", r"Shutting down\.", r"Forcing shutdown\.", r"Timer I_TERMINATE just popped", r"input=I_ERROR", r"input=I_FAIL", r"input=I_INTEGRATED cause=C_TIMER_POPPED", r"input=I_FINALIZED cause=C_TIMER_POPPED", r"input=I_ERROR", r", exiting\.", r"WARN.*Ignoring HA message.*vote.*not in our membership list", r"pengine.*Attempting recovery of resource", r"pengine.*Handling failed ", r"tengine.*is taking more than 2x its timeout", r"Confirm not received from", r"Welcome reply not received from", r"Attempting to schedule .* after a stop", r"Resource .* was active at shutdown", r"duplicate entries for call_id", r"Search terminated:", r"No need to invoke the TE", r":global_timer_callback", r"Updating failcount for ", r"Faking parameter digest creation", r"Parameters to .* action changed:", ), }) del self["Standby"] if self.Env["DoBSC"]: del self["Pat:They_stopped"] del self["Pat:Logd_stopped"] self.check_transitions = 0 self.check_elections = 0 self.CIBsync = {} self.default_cts_cib=CIB(self).cib() self.debug(self.default_cts_cib) def errorstoignore(self): # At some point implement a more elegant solution that # also produces a report at the end '''Return list of errors which are known and very noisey should be ignored''' if 1: return [ "crmadmin:" ] return [] def install_config(self, node): + if not self.ns.WaitForNodeToComeUp(node): + self.log("Node %s is not up." % node) + return None + if not self.CIBsync.has_key(node) and self.Env["ClobberCIB"] == 1: self.CIBsync[node] = 1 self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml") self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml.sig") # Only install the CIB on the first node, all the other ones will pick it up from there if self.cib_installed == 1: return None self.cib_installed = 1 if self.Env["CIBfilename"] == None: self.debug("Installing Generated CIB on node %s" %(node)) warnings.filterwarnings("ignore") cib_file=os.tmpnam() warnings.resetwarnings() os.system("rm -f "+cib_file) self.debug("Creating new CIB for " + node + " in: " + cib_file) os.system("echo \'" + self.default_cts_cib + "\' > " + cib_file) if 0!=self.rsh.echo_cp(None, cib_file, node, "@HA_VARLIBDIR@/heartbeat/crm/cib.xml"): raise ValueError("Can not create CIB on %s "%node) os.system("rm -f "+cib_file) else: self.debug("Installing CIB (%s) on node %s" %(self.Env["CIBfilename"], node)) if 0!=self.rsh.cp(self.Env["CIBfilename"], "root@" + (self["CIBfile"]%node)): raise ValueError("Can not scp file to %s "%node) self.rsh.remote_py(node, "os", "system", "chown @HA_CCMUSER@ @HA_VARLIBDIR@/heartbeat/crm/cib.xml") def prepare(self): '''Finish the Initialization process. Prepare to test...''' for node in self.Env["nodes"]: self.ShouldBeStatus[node] = "" self.StataCM(node) def test_node_CM(self, node): '''Report the status of the cluster manager on a given node''' - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp failed, node %s set to down" % node) - self.ShouldBeStatus[node] == self["down"] - return 3 - watchpats = [ ] watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") watchpats.append(self["Pat:They_started"]%node) idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats) idle_watch.setwatch() out=self.rsh.readaline(node, self["StatusCmd"]%node) ret= (string.find(out, 'ok') != -1) self.debug("Node %s status: %s" %(node, out)) if not ret: if self.ShouldBeStatus[node] == self["up"]: self.log( "Node status for %s is %s but we think it should be %s" %(node, self["down"], self.ShouldBeStatus[node])) self.ShouldBeStatus[node]=self["down"] return 0 if self.ShouldBeStatus[node] == self["down"]: self.log( "Node status for %s is %s but we think it should be %s: %s" %(node, self["up"], self.ShouldBeStatus[node], out)) self.ShouldBeStatus[node]=self["up"] # check the output first - because syslog-ng looses messages if string.find(out, 'S_NOT_DC') != -1: # Up and stable return 2 if string.find(out, 'S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" %(node, out)) return 1 # Up and stable return 2 # Is the node up or is the node down def StataCM(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) > 0: return 1 return None # Being up and being stable is not the same question... def node_stable(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) == 2: return 1 self.log("Warn: Node %s not stable" %(node)) return None def cluster_stable(self, timeout=None): watchpats = [ ] watchpats.append("Current ping state: S_IDLE") watchpats.append(self["Pat:DC_IDLE"]) if timeout == None: timeout = self["DeadTime"] idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats, timeout) idle_watch.setwatch() - if not self.ns.WaitForAllNodesToComeUp(self.Env["nodes"]): - self.log("WaitForAllNodesToComeUp failed.") - return None - any_up = 0 for node in self.Env["nodes"]: # have each node dump its current state if self.ShouldBeStatus[node] == self["up"]: self.rsh.readaline(node, (self["StatusCmd"] %node) ) any_up = 1 if any_up == 0 or idle_watch.look(): return 1 self.log("Warn: Cluster Master not IDLE") return None def is_node_dc(self, node, status_line=None): rc = 0 - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s failed." % node) - return rc - if not status_line: status_line = self.rsh.readaline(node, self["StatusCmd"]%node) if not status_line: rc = 0 elif string.find(status_line, 'S_IDLE') != -1: rc = 1 elif string.find(status_line, 'S_INTEGRATION') != -1: rc = 1 elif string.find(status_line, 'S_FINALIZE_JOIN') != -1: rc = 1 elif string.find(status_line, 'S_POLICY_ENGINE') != -1: rc = 1 elif string.find(status_line, 'S_TRANSITION_ENGINE') != -1: rc = 1 if rc == 1: self.debug("%s _is_ the DC" % node) return rc def isolate_node(self, node, allowlist): '''isolate the communication between the nodes''' - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s failed." % node) - return None - rc = self.rsh(node, self["BreakCommCmd2"]%allowlist) if rc == 0: return 1 else: self.log("Could not break the communication from node: %s",node) return None def Configuration(self): if self.config: return self.config.getElementsByTagName('configuration')[0] warnings.filterwarnings("ignore") cib_file=os.tmpnam() warnings.resetwarnings() os.system("rm -f "+cib_file) if self.Env["ClobberCIB"] == 1: if self.Env["CIBfilename"] == None: self.debug("Creating new CIB in: " + cib_file) os.system("echo \'"+ self.default_cts_cib +"\' > "+ cib_file) else: os.system("cp "+self.Env["CIBfilename"]+" "+cib_file) else: if 0 != self.rsh.echo_cp( self.Env["nodes"][0], "@HA_VARLIBDIR@/heartbeat/crm/cib.xml", None, cib_file): raise ValueError("Can not copy file to %s, maybe permission denied"%cib_file) self.config = parse(cib_file) os.remove(cib_file) return self.config.getElementsByTagName('configuration')[0] def Resources(self): ResourceList = [] #read resources in cib configuration = self.Configuration() resources = configuration.getElementsByTagName('resources')[0] rscs = configuration.getElementsByTagName('primitive') incs = configuration.getElementsByTagName('clone') groups = configuration.getElementsByTagName('group') for rsc in rscs: if rsc in resources.childNodes: ResourceList.append(HAResource(self,rsc)) for grp in groups: for rsc in rscs: if rsc in grp.childNodes: if self.use_short_names: resource = HAResource(self,rsc) else: resource = HAResource(self,rsc,grp.getAttribute('id')) ResourceList.append(resource) for inc in incs: max = 0 inc_name = inc.getAttribute("id") instance_attributes = inc.getElementsByTagName('instance_attributes')[0] attributes = instance_attributes.getElementsByTagName('attributes')[0] nvpairs = attributes.getElementsByTagName('nvpair') for nvpair in nvpairs: if nvpair.getAttribute("name") == "clone_max": max = int(nvpair.getAttribute("value")) inc_rsc = inc.getElementsByTagName('primitive')[0] for i in range(0,max): rsc = HAResource(self,inc_rsc) rsc.inc_no = i rsc.inc_name = inc_name rsc.inc_max = max if self.use_short_names: rsc.rid = rsc.rid + ":%d"%i else: rsc.rid = inc_name+":"+rsc.rid + ":%d"%i rsc.Instance = rsc.rid ResourceList.append(rsc) return ResourceList def ResourceGroups(self): GroupList = [] #read resources in cib configuration = self.Configuration() groups = configuration.getElementsByTagName('group') rscs = configuration.getElementsByTagName('primitive') for grp in groups: group = [] GroupList.append(group) for rsc in rscs: if rsc in grp.childNodes: if self.use_short_names: resource = HAResource(self,rsc) else: resource = HAResource(self,rsc,grp.getAttribute('id')) group.append(resource) return GroupList def Dependencies(self): DependencyList = [] #read dependency in cib configuration=self.Configuration() constraints=configuration.getElementsByTagName('constraints')[0] rsc_to_rscs=configuration.getElementsByTagName('rsc_to_rsc') for node in rsc_to_rscs: dependency = {} dependency["id"]=node.getAttribute('id') dependency["from"]=node.getAttribute('from') dependency["to"]=node.getAttribute('to') dependency["type"]=node.getAttribute('type') dependency["strength"]=node.getAttribute('strength') DependencyList.append(dependency) return DependencyList def find_partitions(self): ccm_partitions = [] - if not self.ns.WaitForAllNodesToComeUp(self.Env["nodes"]): - self.log("WaitForAllNodesToComeUp failed.") - return None - for node in self.Env["nodes"]: self.debug("Retrieving partition details for %s" %node) if self.ShouldBeStatus[node] == self["up"]: partition = self.rsh.readaline(node, self["ParitionCmd"]) if not partition: self.log("no partition details for %s" %node) elif len(partition) > 2: partition = partition[:-1] - self.debug("partition details for %s: %s" %(node, partition)) found=0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: - self.debug("Adding partition") + self.debug("Adding partition from %s: %s" %(node, partition)) ccm_partitions.append(partition) else: self.log("bad partition details for %s" %node) return ccm_partitions def HasQuorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] - if not self.ns.WaitForAllNodesToComeUp(node_list): - self.log("WaitForAllNodesToComeUp failed in HasQuorum.") - return 0 - for node in node_list: if self.ShouldBeStatus[node] == self["up"]: quorum = self.rsh.readaline(node, self["QuorumCmd"]) if string.find(quorum, "1") != -1: return 1 elif string.find(quorum, "0") != -1: return 0 else: self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum) return 0 def Components(self): complist = [Process("lrmd",self),Process("crmd",self)] if self.Env["DoFencing"] == 1 : complist.append(Process("stonithd",self)) complist.append(Process("heartbeat",self)) return complist def NodeUUID(self, node): - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s failed in NodeUUID." % node) - return "" - lines = self.rsh.readlines(node, self["UUIDQueryCmd"]) for line in lines: self.debug("UUIDLine:"+ line) m = re.search(r'%s.+\((.+)\)' % node, line) if m: return m.group(1) return "" def StandbyStatus(self, node): - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s failed in StandbyStatus." % node) - return "" - out=self.rsh.readaline(node, self["StandbyQueryCmd"]%node) if not out: return "off" out = out[:-1] self.debug("Standby result: "+out) return out # status == "on" : Enter Standby mode # status == "off": Enter Active mode def SetStandbyMode(self, node, status): - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s failed in SetStandbyMode." % node) - return True - current_status = self.StandbyStatus(node) cmd = self["StandbyCmd"] % (node, status) ret = self.rsh(node, cmd) return True class HAResource(Resource): def __init__(self, cm, node, group=None): ''' Get information from xml node ''' if group == None : self.rid = str(node.getAttribute('id')) else : self.rid = group + ":" + str(node.getAttribute('id')) self.rclass = str(node.getAttribute('class')) self.rtype = str(node.getAttribute('type')) self.inc_name = None self.inc_no = -1 self.inc_max = -1 self.rparameters = {} nvpairs = [] list = node.getElementsByTagName('instance_attributes') if len(list) > 0: attributes = list[0] list = attributes.getElementsByTagName('attributes') if len(list) > 0: parameters = list[0] nvpairs = parameters.getElementsByTagName('nvpair') for nvpair in nvpairs: name=nvpair.getAttribute('name') value=nvpair.getAttribute('value') self.rparameters[name]=value # This should normally be called first... FIXME! Resource.__init__(self, cm, self.rtype, self.rid) # resources that dont need quorum will have: # ops = node.getElementsByTagName('op') for op in ops: if op.getAttribute('name') == "start" and op.getAttribute('prereq') == "nothing": self.needs_quorum = 0 - if self.needs_quorum == 0: - self.CM.debug("Resource %s does not need quorum" % self.rid) -# else: -# self.CM.debug("Resource %s DOES need quorum" % self.rid) - def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' - if not self.CM.ns.WaitForNodeToComeUp(nodename): - self.CM.log("WaitForNodeToComeUp %s failed in IsRunningOn." % node) - return 0 - out=self.CM.rsh.readaline(nodename, self.CM["IsRscRunning"]%self.rid) return re.search("0",out) def RunningNodes(self): ResourceNodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if self.IsRunningOn(node): ResourceNodes.append(node) return ResourceNodes def _ResourceOperation(self, operation, nodename): ''' Execute an operation on the resource ''' - if not self.CM.ns.WaitForNodeToComeUp(nodename): - self.CM.log("WaitForNodeToComeUp %s failed in _ResourceOperation." % node) - return 0 - self.CM.rsh.readaline(nodename, self.CM["ExecuteRscOp"]%(self.rid,operation)) return self.CM.rsh.lastrc == 0 def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self._ResourceOperation("start", nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self._ResourceOperation("stop", nodename) def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", nodename) ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': pass diff --git a/cts/CTS.py.in b/cts/CTS.py.in index 97219b53bd..9d044b65f2 100755 --- a/cts/CTS.py.in +++ b/cts/CTS.py.in @@ -1,1236 +1,1185 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Main module Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import types, string, select, sys, time, re, os, struct, os, signal import base64, pickle, binascii from UserDict import UserDict from syslog import * from popen2 import Popen3 class RemoteExec: '''This is an abstract remote execution class. It runs a command on another machine - somehow. The somehow is up to us. This particular class uses ssh. Most of the work is done by fork/exec of ssh or scp. ''' def __init__(self): # -n: no stdin, -x: no X11 self.Command = "@SSH@ -l root -n -x" # -f: ssh to background self.CommandnoBlock = "@SSH@ -f -l root -n -x" # -B: batch mode, -q: no stats (quiet) self.CpCommand = "@SCP@ -B -q" self.OurNode=string.lower(os.uname()[1]) def setcmd(self, rshcommand): '''Set the name of the remote shell command''' self.Command = rshcommand def _fixcmd(self, cmd): return re.sub("\'", "'\\''", cmd) def _cmd(self, *args): '''Compute the string that will run the given command on the given remote system''' args= args[0] sysname = args[0] command = args[1] #print "sysname: %s, us: %s" % (sysname, self.OurNode) if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost": ret = command else: ret = self.Command + " " + sysname + " '" + self._fixcmd(command) + "'" #print ("About to run %s\n" % ret) return ret def _cmd_noblock(self, *args): '''Compute the string that will run the given command on the given remote system''' args= args[0] sysname = args[0] command = args[1] #print "sysname: %s, us: %s" % (sysname, self.OurNode) if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost": ret = command + " &" else: ret = self.CommandnoBlock + " " + sysname + " '" + self._fixcmd(command) + "'" #print ("About to run %s\n" % ret) return ret def __call__(self, *args): '''Run the given command on the given remote system If you call this class like a function, this is the function that gets called. It just runs it roughly as though it were a system() call on the remote machine. The first argument is name of the machine to run it on. ''' count=0; rc = 0; while count < 3: rc = os.system(self._cmd(args)) if rc == 0: return rc print "Retrying command %s" % self._cmd(args) count=count+1 return rc def popen(self, *args): '''popen the given remote command on the remote system. As in __call__, the first argument is name of the machine to run it on. ''' #print "Now running %s\n" % self._cmd(args) return Popen3(self._cmd(args), None) def readaline(self, *args): '''Run a command on the remote machine and capture 1 line of stdout from the given remote command As in __call__, the first argument is name of the machine to run it on. ''' p = self.popen(args[0], args[1]) p.tochild.close() result = p.fromchild.readline() p.fromchild.close() self.lastrc = p.wait() return result def readlines(self, *args): p = self.popen(args[0], args[1]) p.tochild.close() result = p.fromchild.readlines() p.fromchild.close() self.lastrc = p.wait() return result def cp(self, *args): '''Perform a remote copy''' cpstring=self.CpCommand for arg in args: cpstring = cpstring + " \'" + arg + "\'" count=0; rc = 0; for i in range(3): rc = os.system(cpstring) if rc == 0: return rc print "Retrying command %s" % cpstring return rc def echo_cp(self, src_host, src_file, dest_host, dest_file): '''Perform a remote copy via echo''' (rc, lines) = self.remote_py(src_host, "os", "system", "cat %s" % src_file) if rc != 0: print "Copy of %s:%s failed" % (src_host, src_file) elif dest_host == None: fd = open(dest_file, "w") fd.writelines(lines) fd.close() else: big_line="" for line in lines: big_line = big_line + line (rc, lines) = self.remote_py(dest_host, "os", "system", "echo '%s' > %s" % (big_line, dest_file)) return rc def noBlock(self, *args): '''Perform a remote execution without waiting for it to finish''' sshnoBlock = self._cmd_noblock(args) count=0; rc = 0; for i in range(3): rc = os.system(sshnoBlock) if rc == 0: return rc print "Retrying command %s" % sshnoBlock return rc def remote_py(self, node, module, func, *args): '''Execute a remote python function If the call success, lastrc == 0 and return result. If the call fail, lastrc == 1 and return the reason (string) ''' encode_args = binascii.b2a_base64(pickle.dumps(args)) encode_cmd = string.join(["@hb_libdir@/cts/CTSproxy.py",module,func,encode_args]) #print "%s: %s.%s %s" % (node, module, func, repr(args)) result = self.readlines(node, encode_cmd) if result != None: result.pop() if self.lastrc == 0: last_line="" if result != None: array_len = len(result) if array_len > 0: last_line=result.pop() #print "result: %s" % repr(last_line) return pickle.loads(binascii.a2b_base64(last_line)), result return -1, result class LogWatcher: '''This class watches logs for messages that fit certain regular expressions. Watching logs for events isn't the ideal way to do business, but it's better than nothing :-) On the other hand, this class is really pretty cool ;-) The way you use this class is as follows: Construct a LogWatcher object Call setwatch() when you want to start watching the log Call look() to scan the log looking for the patterns ''' def __init__(self, log, regexes, timeout=10, debug=None): '''This is the constructor for the LogWatcher class. It takes a log name to watch, and a list of regular expressions to watch for." ''' # Validate our arguments. Better sooner than later ;-) for regex in regexes: assert re.compile(regex) self.regexes = regexes self.filename = log self.debug=debug self.whichmatch = -1 self.unmatched = None if self.debug: print "Debug now on for for log", log self.Timeout = int(timeout) self.returnonlymatch = None if not os.access(log, os.R_OK): raise ValueError("File [" + log + "] not accessible (r)") def setwatch(self, frombeginning=None): '''Mark the place to start watching the log from. ''' self.file = open(self.filename, "r") self.size = os.path.getsize(self.filename) if not frombeginning: self.file.seek(0,2) def ReturnOnlyMatch(self, onlymatch=1): '''Mark the place to start watching the log from. ''' self.returnonlymatch = onlymatch def look(self, timeout=None): '''Examine the log looking for the given patterns. It starts looking from the place marked by setwatch(). This function looks in the file in the fashion of tail -f. It properly recovers from log file truncation, but not from removing and recreating the log. It would be nice if it recovered from this as well :-) We return the first line which matches any of our patterns. ''' last_line=None first_line=None if timeout == None: timeout = self.Timeout done=time.time()+timeout+1 if self.debug: print "starting search: timeout=%d" % timeout for regex in self.regexes: print "Looking for regex: ", regex while (timeout <= 0 or time.time() <= done): newsize=os.path.getsize(self.filename) if self.debug > 4: print "newsize = %d" % newsize if newsize < self.size: # Somebody truncated the log! if self.debug: print "Log truncated!" self.setwatch(frombeginning=1) continue if newsize > self.file.tell(): line=self.file.readline() if self.debug > 2: print "Looking at line:", line if line: last_line=line if not first_line: first_line=line if self.debug: print "First line: "+ line which=-1 for regex in self.regexes: which=which+1 if self.debug > 3: print "Comparing line to ", regex #matchobj = re.search(string.lower(regex), string.lower(line)) matchobj = re.search(regex, line) if matchobj: self.whichmatch=which if self.returnonlymatch: return matchobj.group(self.returnonlymatch) else: if self.debug: print "Returning line" return line newsize=os.path.getsize(self.filename) if self.file.tell() == newsize: if timeout > 0: time.sleep(0.025) else: if self.debug: print "End of file" if self.debug: print "Last line: "+last_line return None if self.debug: print "Timeout" if self.debug: print "Last line: "+last_line return None def lookforall(self, timeout=None): '''Examine the log looking for ALL of the given patterns. It starts looking from the place marked by setwatch(). We return when the timeout is reached, or when we have found ALL of the regexes that were part of the watch ''' if timeout == None: timeout = self.Timeout save_regexes = self.regexes returnresult = [] while (len(self.regexes) > 0): oneresult = self.look(timeout) if not oneresult: self.unmatched = self.regexes self.regexes = save_regexes return None returnresult.append(oneresult) del self.regexes[self.whichmatch] self.unmatched = None self.regexes = save_regexes return returnresult # In case we ever want multiple regexes to match a single line... #- del self.regexes[self.whichmatch] #+ tmp_regexes = self.regexes #+ self.regexes = [] #+ which = 0 #+ for regex in tmp_regexes: #+ matchobj = re.search(regex, oneresult) #+ if not matchobj: #+ self.regexes.append(regex) class NodeStatus: def __init__(self, Env): self.Env = Env self.rsh = RemoteExec() def IsNodeBooted(self, node): - '''Return TRUE if the given node is booted (responds to pings''' + '''Return TRUE if the given node is booted (responds to pings)''' return os.system("@PING@ -nq -c1 @PING_TIMEOUT_OPT@ %s >/dev/null 2>&1" % node) == 0 def IsSshdUp(self, node): return self.rsh(node, "true") == 0; def WaitForNodeToComeUp(self, node, Timeout=300): '''Return TRUE when given node comes up, or None/FALSE if timeout''' timeout=Timeout anytimeouts=0 while timeout > 0: if self.IsNodeBooted(node) and self.IsSshdUp(node): if anytimeouts: # Fudge to wait for the system to finish coming up time.sleep(30) self.Env.log("Node %s now up" % node) return 1 time.sleep(1) if (not anytimeouts): self.Env.log("Waiting for node %s to come up" % node) anytimeouts=1 timeout = timeout - 1 self.Env.log("%s did not come up within %d tries" % (node, Timeout)) def WaitForAllNodesToComeUp(self, nodes, timeout=300): '''Return TRUE when all nodes come up, or FALSE if timeout''' for node in nodes: if not self.WaitForNodeToComeUp(node, timeout): return None return 1 class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def __InitialConditions(self): #if os.geteuid() != 0: # raise ValueError("Must Be Root!") None def _finalConditions(self): for key in self.keys(): if self[key] == None: raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") def __init__(self, Environment, randseed=None): self.Env = Environment self.__InitialConditions() self.clear_cache = 0 self.data = { "up" : "up", # Status meaning up "down" : "down", # Status meaning down "StonithCmd" : "@sbindir@/stonith -t baytech -p '10.10.10.100 admin admin' %s", "DeadTime" : 30, # Max time to detect dead node... "StartTime" : 90, # Max time to start up # # These next values need to be overridden in the derived class. # "Name" : None, "StartCmd" : None, "StopCmd" : None, "StatusCmd" : None, "RereadCmd" : None, "StartDRBDCmd" : None, "StopDRBDCmd" : None, "StatusDRBDCmd" : None, "DRBDCheckconf" : None, "BreakCommCmd" : None, "FixCommCmd" : None, "TestConfigDir" : None, "LogFileName" : None, "Pat:We_started" : None, "Pat:They_started" : None, "Pat:We_stopped" : None, "Pat:They_stopped" : None, "BadRegexes" : None, # A set of "bad news" regexes # to apply to the log } self.rsh = RemoteExec() self.ShouldBeStatus={} self.OurNode=string.lower(os.uname()[1]) self.ShouldBeStatus={} self.ns = NodeStatus(self.Env) def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] def log(self, args): self.Env.log(args) def debug(self, args): self.Env.debug(args) def prepare(self): '''Finish the Initialization process. Prepare to test...''' for node in self.Env["nodes"]: if self.StataCM(node): self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] def upcount(self): '''How many nodes are up?''' count=0 for node in self.Env["nodes"]: if self.ShouldBeStatus[node]==self["up"]: count=count+1 return count def TruncLogs(self): '''Truncate the log for the cluster manager so we can start clean''' if self["LogFileName"] != None: os.system("cp /dev/null " + self["LogFileName"]) def install_config(self, node): return None def clear_all_caches(self): if self.clear_cache: for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["down"]: self.debug("Removing cache file on: "+node) self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/hostcache") else: self.debug("NOT Removing cache file on: "+node) def StartaCM(self, node): '''Start up the cluster manager on a given node''' self.debug("Starting %s on node %s" %(self["Name"], node)) if not self.ShouldBeStatus.has_key(node): self.ShouldBeStatus[node] = self["down"] if self.ShouldBeStatus[node] != self["down"]: return 1 - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s failed" % node) - return 0 - patterns = [] # Technically we should always be able to notice ourselves starting if self.upcount() == 0: patterns.append(self["Pat:We_started"] % node) else: patterns.append(self["Pat:They_started"] % node) watch = LogWatcher( self["LogFileName"], patterns, timeout=self["StartTime"]+10) watch.setwatch() self.install_config(node) - if not self.ns.WaitForNodeToComeUp(node): - self.log("Second WaitForNodeToComeUp %s failed" % node) - return 0 - self.ShouldBeStatus[node] = "any" if self.StataCM(node) and self.cluster_stable(self["DeadTime"]): self.log ("%s was already started" %(node)) return 1 # Clear out the host cache so autojoin can be exercised if self.clear_cache: self.debug("Removing cache file on: "+node) self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/hostcache") if self.rsh(node, self["StartCmd"]) != 0: self.log ("Warn: Start command failed on node %s" %(node)) return None self.ShouldBeStatus[node]=self["up"] watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.log ("Warn: Startup pattern not found: %s" %(regex)) if watch_result: #self.debug("Found match: "+ repr(watch_result)) self.cluster_stable(self["DeadTime"]) return 1 if self.StataCM(node) and self.cluster_stable(self["DeadTime"]): return 1 self.log ("Warn: Start failed for node %s" %(node)) return None def StartaCMnoBlock(self, node): '''Start up the cluster manager on a given node with none-block mode''' self.debug("Starting %s on node %s" %(self["Name"], node)) - if not self.ns.IsNodeBooted(node): - self.log("node %s not booted, StartaCMnoBlock failed." % node) - return 0 - # Clear out the host cache so autojoin can be exercised if self.clear_cache: self.debug("Removing cache file on: "+node) self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/hostcache") self.rsh.noBlock(node, self["StartCmd"]) self.ShouldBeStatus[node]=self["up"] return 1 def StopaCM(self, node): '''Stop the cluster manager on a given node''' self.debug("Stopping %s on node %s" %(self["Name"], node)) if self.ShouldBeStatus[node] != self["up"]: return 1 - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed" % node) - return 0 - if self.rsh(node, self["StopCmd"]) == 0: self.ShouldBeStatus[node]=self["down"] self.cluster_stable(self["DeadTime"]) return 1 else: self.log ("Could not stop %s on node %s" %(self["Name"], node)) return None def StopaCMnoBlock(self, node): '''Stop the cluster manager on a given node with none-block mode''' self.debug("Stopping %s on node %s" %(self["Name"], node)) - if not self.ns.IsNodeBooted(node): - self.log("node %s not booted, StopaCNnoBlock failed." % node) - return 0 - self.rsh.noBlock(node, self["StopCmd"]) self.ShouldBeStatus[node]=self["down"] return 1 def cluster_stable(self, timeout = None): time.sleep(self["StableTime"]) return 1 def node_stable(self, node): return 1 def RereadCM(self, node): '''Force the cluster manager on a given node to reread its config This may be a no-op on certain cluster managers. ''' - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed" % node) - return 0 - rc=self.rsh(node, self["RereadCmd"]) if rc == 0: return 1 else: self.log ("Could not force %s on node %s to reread its config" % (self["Name"], node)) return None def StataCM(self, node): '''Report the status of the cluster manager on a given node''' out=self.rsh.readaline(node, self["StatusCmd"]) ret= (string.find(out, 'stopped') == -1) try: if ret: if self.ShouldBeStatus[node] == self["down"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["up"], self.ShouldBeStatus[node])) else: if self.ShouldBeStatus[node] == self["up"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["down"], self.ShouldBeStatus[node])) except KeyError: pass if ret: self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] return ret def startall(self, nodelist=None): '''Start the cluster manager on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist=self.Env["nodes"] for node in nodelist: if self.ShouldBeStatus[node] == self["down"]: if not self.StartaCM(node): ret = 0 return ret def stopall(self, nodelist=None): '''Stop the cluster managers on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist=self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: if not self.StopaCM(node): ret = 0 return ret def rereadall(self, nodelist=None): '''Force the cluster managers on every node in the cluster to reread their config files. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist=self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: self.RereadCM(node) def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result={} if not nodelist: nodelist=self.Env["nodes"] for node in nodelist: if self.StataCM(node): result[node] = self["up"] else: result[node] = self["down"] return result def isolate_node(self, node): - '''isolate the communication between the nodes''' - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed in isolate_node" % node) - return None - rc = self.rsh(node, self["BreakCommCmd"]) if rc == 0: return 1 else: self.log("Could not break the communication between the nodes frome node: %s" % node) return None def unisolate_node(self, node): - '''fix the communication between the nodes''' - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed in unisolate_node" % node) - return None - rc = self.rsh(node, self["FixCommCmd"]) if rc == 0: return 1 else: self.log("Could not fix the communication between the nodes from node: %s" % node) return None def reducecomm_node(self,node): '''reduce the communication between the nodes''' - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed in reducecomm_node" % node) - return None - rc = self.rsh(node, self["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"])) if rc == 0: return 1 else: self.log("Could not reduce the communication between the nodes from node: %s" % node) return None def savecomm_node(self,node): '''save current the communication between the nodes''' rc = 0 - - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed in savecomm_node" % node) - return None - if float(self.Env["XmitLoss"])!=0 or float(self.Env["RecvLoss"])!=0 : rc = self.rsh(node, self["SaveFileCmd"]); if rc == 0: return 1 else: self.log("Could not save the communication between the nodes from node: %s" % node) return None def restorecomm_node(self,node): '''restore the saved communication between the nodes''' rc = 0 - if not self.ns.WaitForNodeToComeUp(node): - self.log("WaitForNodeToComeUp %s filed in restoreomm_node" % node) - return None - if float(self.Env["XmitLoss"])!=0 or float(self.Env["RecvLoss"])!=0 : rc = self.rsh(node, self["RestoreCommCmd"]); if rc == 0: return 1 else: self.log("Could not restore the communication between the nodes from node: %s" % node) return None def SyncTestConfigs(self): '''Synchronize test configurations throughout the cluster. This one's a no-op for FailSafe, since it does that by itself. ''' fromdir=self["TestConfigDir"] if not os.access(fromdir, os.F_OK | os.R_OK | os.W_OK): raise ValueError("Directory [" + fromdir + "] not accessible (rwx)") for node in self.Env["nodes"]: if node == self.OurNode: continue self.log("Syncing test configurations on " + node) # Perhaps I ought to use rsync... self.rsh.cp("-r", fromdir, node + ":" + fromdir) def SetClusterConfig(self, configpath="default", nodelist=None): '''Activate the named test configuration throughout the cluster. It would be useful to implement this :-) ''' pass return 1 def ResourceGroups(self): "Return a list of resource type/instance pairs for the cluster" raise ValueError("Abstract Class member (ResourceGroups)") def InternalCommConfig(self): "Return a list of paths: each patch consists of a tuple" raise ValueError("Abstract Class member (InternalCommConfig)") def HasQuorum(self, node_list): "Return TRUE if the cluster currently has quorum" # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes raise ValueError("Abstract Class member (HasQuorum)") def Components(self): raise ValueError("Abstract Class member (Components)") def TestLogging(self): patterns= [] prefix="Test message from " for node in self.Env["nodes"]: patterns.append(prefix + node) watch = LogWatcher(self["LogFileName"], patterns, 30 + len(self.Env["nodes"])) watch.setwatch() logpri = self.Env["logfacility"] + ".info" for node in self.Env["nodes"]: cmd="logger -p %s %s%s" % (logpri, prefix, node) if self.rsh.noBlock(node, cmd) != 0: self.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) watch_result = watch.lookforall() if watch.unmatched: self.log("ERROR: Remote logging is not working. Please fix before continuing.") for regex in watch.unmatched: self.log ("ERROR: Test message [%s] not found in logs." % (regex)) return None return 1 def CheckDf(self): dfcmd="df -k /var/log | tail -1 | tr -s ' ' | cut -d' ' -f2" dfmin=500000 result=1 for node in self.Env["nodes"]: dfout=self.rsh.readaline(node, dfcmd) if not dfout: self.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) else: try: idfout = int(dfout) except (ValueError, TypeError): self.log("Warning: df output from %s was invalid [%s]" % (node, dfout)) else: if idfout == 0: self.log("CRIT: Completely out of log disk space on %s" % node) result=None elif idfout <= 500: self.log("WARN: Low on log disk space (%d Mbytes) on %s" % (idfout, node)) return result class Resource: ''' This is an HA resource (not a resource group). A resource group is just an ordered list of Resource objects. ''' def __init__(self, cm, rsctype=None, instance=None): self.CM = cm self.ResourceType = rsctype self.Instance = instance self.needs_quorum = 1 def Type(self): return self.ResourceType def Instance(self, nodename): return self.Instance def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. It is analagous to the "status" operation on SystemV init scripts and heartbeat scripts. FailSafe calls it the "exclusive" operation. ''' raise ValueError("Abstract Class member (IsRunningOn)") return None def IsWorkingCorrectly(self, nodename): ''' This member function returns true if our resource is operating correctly on the given node in the cluster. Heartbeat does not require this operation, but it might be called the Monitor operation, which is what FailSafe calls it. For remotely monitorable resources (like IP addresses), they *should* be monitored remotely for testing. ''' raise ValueError("Abstract Class member (IsWorkingCorrectly)") return None def Start(self, nodename): ''' This member function starts or activates the resource. ''' raise ValueError("Abstract Class member (Start)") return None def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' raise ValueError("Abstract Class member (Stop)") return None def __repr__(self): if (self.Instance and len(self.Instance) > 1): return "{" + self.ResourceType + "::" + self.Instance + "}" else: return "{" + self.ResourceType + "}" class Component: def kill(self, node): None class Process(Component): def __init__(self, name, cm): self.name = str(name) self.CM = cm self.KillCmd = "killall -9 " + self.name def kill(self, node): - if not self.CM.ns.WaitForNodeToComeUp(node): - self.CM.log("WaitForNodeToComeUp %s filed in kill" % node) - return None - if self.CM.rsh(node, self.KillCmd) != 0: self.log ("Warn: Kill %s failed on node %s" %(name,node)) return None return 1 class ScenarioComponent: def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return TRUE if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' raise ValueError("Abstract Class member (IsApplicable)") def SetUp(self, CM): '''Set up the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") class Scenario: ( '''The basic idea of a scenario is that of an ordered list of ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, and then after the tests have been run, they are torn down using TearDown() (in reverse order). A Scenario is applicable to a particular cluster manager iff each ScenarioComponent is applicable. A partially set up scenario is torn down if it fails during setup. ''') def __init__(self, Components): "Initialize the Scenario from the list of ScenarioComponents" for comp in Components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of" " ScenarioComponent") self.Components = Components def IsApplicable(self): ( '''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() ''' ) for comp in self.Components: if not comp.IsApplicable(): return None return 1 def SetUp(self, CM): '''Set up the Scenario. Return TRUE on success.''' j=0 while j < len(self.Components): if not self.Components[j].SetUp(CM): # OOPS! We failed. Tear partial setups down. CM.log("Tearing down partial setup") self.TearDown(CM, j) return None j=j+1 return 1 def TearDown(self, CM, max=None): '''Tear Down the Scenario - in reverse order.''' if max == None: max = len(self.Components)-1 j=max while j >= 0: self.Components[j].TearDown(CM) j=j-1 class InitClusterManager(ScenarioComponent): ( '''InitClusterManager is the most basic of ScenarioComponents. This ScenarioComponent simply starts the cluster manager on all the nodes. It is fairly robust as it waits for all nodes to come up before starting as they might have been rebooted or crashed for some reason beforehand. ''') def __init__(self, Env): pass def IsApplicable(self): '''InitClusterManager is so generic it is always Applicable''' return 1 def SetUp(self, CM): '''Basic Cluster Manager startup. Start everything''' CM.prepare() # Clear out the cobwebs ;-) self.TearDown(CM) for node in CM.Env["nodes"]: CM.rsh(node, CM["DelFileCommCmd"]+ "; true") # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all nodes.") return CM.startall() def TearDown(self, CM): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Stopping Cluster Manager on all nodes") return CM.stopall() class PingFest(ScenarioComponent): ( '''PingFest does a flood ping to each node in the cluster from the test machine. If the LabEnvironment Parameter PingSize is set, it will be used as the size of ping packet requested (via the -s option). If it is not set, it defaults to 1024 bytes. According to the manual page for ping: Outputs packets as fast as they come back or one hundred times per second, whichever is more. For every ECHO_REQUEST sent a period ``.'' is printed, while for every ECHO_REPLY received a backspace is printed. This provides a rapid display of how many packets are being dropped. Only the super-user may use this option. This can be very hard on a net- work and should be used with caution. ''' ) def __init__(self, Env): self.Env = Env def IsApplicable(self): '''PingFests are always applicable ;-) ''' return 1 def SetUp(self, CM): '''Start the PingFest!''' self.PingSize=1024 if CM.Env.has_key("PingSize"): self.PingSize=CM.Env["PingSize"] CM.log("Starting %d byte flood pings" % self.PingSize) self.PingPids=[] for node in CM.Env["nodes"]: self.PingPids.append(self._pingchild(node)) CM.log("Ping PIDs: " + repr(self.PingPids)) return 1 def TearDown(self, CM): '''Stop it right now! My ears are pinging!!''' for pid in self.PingPids: if pid != None: CM.log("Stopping ping process %d" % pid) os.kill(pid, signal.SIGKILL) def _pingchild(self, node): Args = ["ping", "-qfn", "-s", str(self.PingSize), node] sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid < 0: self.Env.log("Cannot fork ping child") return None if pid > 0: return pid # Otherwise, we're the child process. os.execvp("ping", Args) self.Env.log("Cannot execvp ping: " + repr(Args)) sys.exit(1) class PacketLoss(ScenarioComponent): ( ''' It would be useful to do some testing of CTS with a modest amount of packet loss enabled - so we could see that everything runs like it should with a certain amount of packet loss present. ''') def IsApplicable(self): '''always Applicable''' return 1 def SetUp(self, CM): '''Reduce the reliability of communications''' if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : return 1 for node in CM.Env["nodes"]: CM.reducecomm_node(node) CM.log("Reduce the reliability of communications") return 1 def TearDown(self, CM): '''Fix the reliability of communications''' if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : return 1 for node in CM.Env["nodes"]: CM.unisolate_node(node) CM.log("Fix the reliability of communications") class BasicSanityCheck(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["DoBSC"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on BSC node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on BSC node(s).") return CM.stopall() diff --git a/cts/CTSaudits.py.in b/cts/CTSaudits.py.in index abfaab8497..8cc78ce40f 100755 --- a/cts/CTSaudits.py.in +++ b/cts/CTSaudits.py.in @@ -1,698 +1,698 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Audit module ''' __copyright__=''' Copyright (C) 2000, 2001,2005 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import time, os, popen2, string, re import CTS import os import popen2 class ClusterAudit: def __init__(self, cm): self.CM = cm def __call__(self): raise ValueError("Abstract Class member (__call__)") def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def name(self): raise ValueError("Abstract Class member (name)") AllAuditClasses = [ ] class ResourceAudit(ClusterAudit): def name(self): return "ResourceAudit" def _doauditRsc(self, resource): ResourceNodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if resource.IsRunningOn(node): ResourceNodes.append(node) return ResourceNodes def _doaudit(self): '''Check to see if all resources are running in exactly one place in the cluster. We also verify that the members of a resource group are all running on the same node in the cluster, and we monitor that they are all running "properly". ''' Fatal = 0 result = [] # Thought: use self.CM.find_partitions() and make this audit # aware of partitions. Since in a split cluster one # partition may have quorum (and permission to run resources) # and the other not. Groups = self.CM.ResourceGroups() for group in Groups: GrpServedBy = None lastResource = None for resource in group: # # _doauditRsc returns the set of nodes serving # the given resource. This is normally a single node. # ResourceNodes = self._doauditRsc(resource) # Is the resource served without quorum present? if not self.CM.HasQuorum(None) and len(ResourceNodes) != 0 and resource.needs_quorum: result.append("Resource " + repr(resource) + " active without Quorum: " + repr(ResourceNodes)) # Is the resource served at all? elif len(ResourceNodes) == 0 and self.CM.HasQuorum(None): result.append("Resource " + repr(resource) + " not served anywhere.") # Is the resource served too many times? elif len(ResourceNodes) > 1: result.append("Resource " + repr(resource) + " served too many times: " + repr(ResourceNodes)) self.CM.log("Resource " + repr(resource) + " served too many times: " + repr(ResourceNodes)) Fatal = 1 elif GrpServedBy == None: GrpServedBy = ResourceNodes # Are all the members of the Rsc Grp served by the same node? elif GrpServedBy != ResourceNodes: result.append("Resource group resources" + repr(resource) + " running on different nodes: " + repr(ResourceNodes)+" vs "+repr(GrpServedBy) + "(otherRsc = " + repr(lastResource) + ")") self.CM.log("Resource group resources" + repr(resource) + " running on different nodes: " + repr(ResourceNodes)+" vs "+repr(GrpServedBy) + "(otherRsc = " + repr(lastResource) + ")") Fatal = 1 if self.CM.Env.has_key("SuppressMonitoring") and \ self.CM.Env["SuppressMonitoring"]: continue # Is the resource working correctly ? if not Fatal and len(ResourceNodes) == 1: beforearpchild = popen2.Popen3("date;/sbin/arp -n|cut -c1-15,26-50,75-" , None) beforearpchild.tochild.close() # /dev/null if not resource.IsWorkingCorrectly(ResourceNodes[0]): afterarpchild = popen2.Popen3("/sbin/arp -n|cut -c1-15,26-50,75-" , None) afterarpchild.tochild.close() # /dev/null result.append("Resource " + repr(resource) + " not operating properly." + " Resource is running on " + ResourceNodes[0]); Fatal = 1 self.CM.log("ARP table before failure ========"); for line in beforearpchild.fromchild.readlines(): self.CM.log(line) self.CM.log("ARP table after failure ========"); for line in afterarpchild.fromchild.readlines(): self.CM.log(line) self.CM.log("End of ARP tables ========"); try: beforearpchild.wait() afterarpchild.wait() except OSError: pass afterarpchild.fromchild.close() beforearpchild.fromchild.close() lastResource = resource if (Fatal): result.insert(0, "FATAL") # Kludgy. return result def __call__(self): # # Audit the resources. Since heartbeat doesn't really # know when resource acquisition is complete, we will # poll until things get stable. # # Having a resource duplicately implemented is a Fatal Error # with no tolerance granted. # audresult = self._doaudit() # # Probably the constant below should be a CM parameter. # Then it could be 0 for FailSafe. # Of course, it really depends on what resources # you have in the test suite, and how long it takes # for them to settle. # Recently, we've changed heartbeat so we know better when # resource acquisition is done. # audcount=5; while(audcount > 0): audresult = self._doaudit() if (len(audresult) <= 0 or audresult[0] == "FATAL"): audcount=0 else: audcount = audcount - 1 if (audcount > 0): time.sleep(1) if (len(audresult) > 0): self.CM.log("Fatal Audit error: " + repr(audresult)) return (len(audresult) == 0) def is_applicable(self): if self.CM["Name"] == "heartbeat": return 1 return 0 class HAResourceAudit(ClusterAudit): def __init__(self, cm): self.CM = cm def _RscRunningNodes(self, resource): ResourceNodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if resource.IsRunningOn(node): ResourceNodes.append(node) return ResourceNodes def __call__(self): passed = 1 NodeofRsc = {} NumofInc = {} MaxofInc = {} self.CM.debug("Do Audit HAResourceAudit") #Calculate the count of active nodes up_count = 0; for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: up_count += 1 #Make sure the resouces are running on one and only one node Resources = self.CM.Resources() for resource in Resources : RunningNodes = self._RscRunningNodes(resource) NodeofRsc[resource.rid]=RunningNodes if resource.inc_name == None: #Is the resource served without quorum present? if not self.CM.HasQuorum(None) and len(RunningNodes) != 0 and resource.needs_quorum: self.CM.log("Resource " + repr(resource) + " active without Quorum: " + repr(RunningNodes)) passed = 0 #Is the resource served at all? elif len(RunningNodes) == 0 and self.CM.HasQuorum(None): self.CM.log("Resource " + repr(resource) + " not served anywhere.") passed = 0 # Is the resource served too many times? elif len(RunningNodes) > 1: self.CM.log("Resource " + repr(resource) + " served too many times: " + repr(RunningNodes)) passed = 0 else: if not NumofInc.has_key(resource.inc_name): NumofInc[resource.inc_name]=0 MaxofInc[resource.inc_name]=resource.inc_max running = 1 #Is the resource served without quorum present? if not self.CM.HasQuorum(None) and len(RunningNodes) != 0 and resource.needs_quorum == 1: self.CM.log("Resource " + repr(resource) + " active without Quorum: " + repr(RunningNodes)) passed = 0 #Is the resource served at all? elif len(RunningNodes) == 0 : running = 0 # Is the resource served too many times? elif len(RunningNodes) > 1: self.CM.log("Resource " + repr(resource) + " served too many times: " + repr(RunningNodes)) passed = 0 if running: NumofInc[resource.inc_name] += 1 if self.CM.HasQuorum(None): for inc_name in NumofInc.keys(): if NumofInc[inc_name] != min(up_count, MaxofInc[inc_name]): passed = 0 self.CM.log("Cloned resource "+ str(inc_name) +" has "+ str(NumofInc[inc_name]) +" active instances (max: " + str(MaxofInc[inc_name]) +", active nodes: "+ str(up_count) + ")") Groups = self.CM.ResourceGroups() for group in Groups : group_printed = 0 first_rsc = group[0].rid RunningNodes = NodeofRsc[first_rsc] for rsc in group : if RunningNodes != NodeofRsc[rsc.rid]: passed = 0 if group_printed == 0: group_printed = 1 self.CM.log("Group audit failed for: %s" % repr(group)) if not NodeofRsc[first_rsc] or len(NodeofRsc[first_rsc]) == 0: self.CM.log("* %s not running" % first_rsc) else: self.CM.log("* %s running on %s" %(first_rsc, repr(NodeofRsc[first_rsc]))) if not NodeofRsc[rsc.rid] or len(NodeofRsc[rsc.rid]) == 0: self.CM.log("* %s not running" % rsc.rid) else: self.CM.log("* %s running on %s" %(rsc.rid, repr(NodeofRsc[rsc.rid]))) # Make sure the resouces with "must","placement" constraint # are running on the same node Dependancies = self.CM.Dependencies() for dependency in Dependancies: if dependency["type"] == "placement" and dependency["strength"] == "must": if NodeofRsc[dependency["from"]] != NodeofRsc[dependency["to"]]: print dependency["from"] + " and " + dependency["to"] + " should be run on same node" passed = 0 return passed def is_applicable(self): if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["ResCanStop"] == 0: return 1 return 0 def name(self): return "HAResourceAudit" class CrmdStateAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 up_are_down = 0 down_are_up = 0 unstable_list = [] self.CM.debug("Do Audit %s"%self.name()) for node in self.CM.Env["nodes"]: should_be = self.CM.ShouldBeStatus[node] - rc = self.CM.StataCM(node) - if rc: + rc = self.CM.test_node_CM(node) + if rc > 0: if should_be == self.CM["down"]: down_are_up = down_are_up + 1 - if not self.CM.node_stable(node): + if rc == 1: unstable_list.append(node) elif should_be == self.CM["up"]: up_are_down = up_are_down + 1 if len(unstable_list) > 0: passed = 0 self.CM.log("Cluster is not stable: %d (of %d): %s" %(len(unstable_list), self.CM.upcount(), repr(unstable_list))) if up_are_down > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be up were down." %(up_are_down, len(self.CM.Env["nodes"]))) if down_are_up > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be down were up." %(down_are_up, len(self.CM.Env["nodes"]))) return passed def name(self): return "CrmdStateAudit" def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 class CIBAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): self.CM.debug("Do Audit %s"%self.name()) passed = 1 ccm_partitions = self.CM.find_partitions() if len(ccm_partitions) == 0: self.CM.debug("\tNo partitions to audit") return 1 for partition in ccm_partitions: self.CM.debug("\tAuditing CIB consistency for: %s" %partition) partition_passed = 0 if self.audit_cib_contents(partition) == 0: passed = 0 return passed def audit_cib_contents(self, hostlist): passed = 1 first_host = None first_host_xml = "" partition_hosts = hostlist.split() for a_host in partition_hosts: if first_host == None: first_host = a_host first_host_xml = self.store_remote_cib(a_host) #self.CM.debug("Retrieved CIB: %s" % first_host_xml) else: a_host_xml = self.store_remote_cib(a_host) diff_cmd="@sbindir@/crm_diff -c -VV -f -N \'%s\' -O '%s'" % (a_host_xml, first_host_xml) infile, outfile, errfile = os.popen3(diff_cmd) diff_lines = outfile.readlines() for line in diff_lines: if not re.search("", line): passed = 0 self.CM.log("CibDiff[%s-%s]: %s" % (first_host, a_host, line)) else: self.CM.debug("CibDiff[%s-%s] Ignoring: %s" % (first_host, a_host, line)) diff_lines = errfile.readlines() for line in diff_lines: passed = 0 self.CM.log("CibDiff[%s-%s] ERROR: %s" % (first_host, a_host, line)) return passed def store_remote_cib(self, node): combined = "" first_line = 1 extra_debug = 0 #self.CM.debug("\tRetrieving CIB from: %s" % node) lines = self.CM.rsh.readlines(node, self.CM["CibQuery"]) if extra_debug: self.CM.debug("Start Cib[%s]" % node) for line in lines: combined = combined + line[:-1] if first_line: self.CM.debug("[Cib]" + line) first_line = 0 elif extra_debug: self.CM.debug("[Cib]" + line) if extra_debug: self.CM.debug("End Cib[%s]" % node) #self.CM.debug("Complete CIB: %s" % combined) return combined def name(self): return "CibAudit" def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 class PartitionAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} self.NodeEpoche={} self.NodeState={} self.NodeQuorum={} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): self.CM.debug("Do Audit %s"%self.name()) passed = 1 ccm_partitions = self.CM.find_partitions() if ccm_partitions == None or len(ccm_partitions) == 0: return 1 if len(ccm_partitions) > 1: self.CM.log("Warn: %d cluster partitions detected:" %len(ccm_partitions)) for partition in ccm_partitions: self.CM.log("\t %s" %partition) for partition in ccm_partitions: partition_passed = 0 if self.audit_partition(partition) == 0: passed = 0 return passed def trim_string(self, avalue): if not avalue: return None if len(avalue) > 1: return avalue[:-1] def trim2int(self, avalue): if not avalue: return None if len(avalue) > 1: return int(avalue[:-1]) def audit_partition(self, partition): passed = 1 dc_found = [] dc_allowed_list = [] lowest_epoche = None node_list = partition.split() self.CM.debug("Auditing partition: %s" %(partition)) for node in node_list: if self.CM.ShouldBeStatus[node] != self.CM["up"]: self.CM.log("Warn: Node %s appeared out of nowhere" %(node)) self.CM.ShouldBeStatus[node] = self.CM["up"] # not in itself a reason to fail the audit (not what we're # checking for in this audit) self.NodeState[node] = self.CM.rsh.readaline( node, self.CM["StatusCmd"]%node) self.NodeEpoche[node] = self.CM.rsh.readaline( node, self.CM["EpocheCmd"]) self.NodeQuorum[node] = self.CM.rsh.readaline( node, self.CM["QuorumCmd"]) self.NodeState[node] = self.trim_string(self.NodeState[node]) self.NodeEpoche[node] = self.trim2int(self.NodeEpoche[node]) self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) if not self.NodeEpoche[node]: self.CM.log("Warn: Node %s dissappeared: cant determin epoche" %(node)) self.CM.ShouldBeStatus[node] = self.CM["down"] # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoche == None or self.NodeEpoche[node] < lowest_epoche: lowest_epoche = self.NodeEpoche[node] if not lowest_epoche: self.CM.log("Lowest epoche not determined in %s" % (partition)) passed = 0 for node in node_list: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if self.CM.is_node_dc(node, self.NodeState[node]): dc_found.append(node) if self.NodeEpoche[node] == lowest_epoche: self.CM.debug("%s: OK" % node) elif not self.NodeEpoche[node]: self.CM.debug("Check on %s ignored: no node epoche" % node) elif not lowest_epoche: self.CM.debug("Check on %s ignored: no lowest epoche" % node) else: self.CM.log("DC %s is not the oldest node (%d vs. %d)" %(node, self.NodeEpoche[node], lowest_epoche)) passed = 0 if len(dc_found) == 0: self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" %(len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self.CM.log("%d DCs (%s) found in cluster partition: %s" %(len(dc_found), str(dc_found), str(node_list))) passed = 0 elif self.CM.Env["CIBResource"] == 1 and self.NodeQuorum[dc_found[0]]: if self.audit_dc_resources(node_list, dc_found) == 0: passed = 0 Resources = self.CM.Resources() for node in node_list: if self.CM.ShouldBeStatus[node] == self.CM["up"]: for resource in Resources: if resource.rid == "rsc_"+node: if resource.IsRunningOn(node) == 0: self.CM.log("Node %s is not running its own resource" %(node)) passed = 0 elif self.CM.Env["CIBResource"] == 1: # no quorum means no resource management self.CM.debug("Not auditing resources - no quorum") if passed == 0: for node in node_list: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.log("epoche %s : %s" %(self.NodeEpoche[node], self.NodeState[node])) return passed def audit_dc_resources(self, node_list, dc_list): passed = 1 Resources = self.CM.Resources() for resource in Resources: if resource.rid == "DcIPaddr": self.CM.debug("Auditing resource: %s" %(resource)) # All DCs are running the resource for dc in dc_list: if self.NodeQuorum[dc]: if resource.IsRunningOn(dc) == 0: self.CM.log("Resource %s not running on DC: %s" %(resource, dc)) passed = 0 # All nodes running the resource are DCs for node in node_list: if resource.IsRunningOn(node): if self.CM.is_node_dc(node, self.NodeState[node]) == 0: self.CM.log("Resource %s is running on non-DC node %s" %("DcIPaddr", node)) passed = 0 return passed def name(self): return "PartitionAudit" def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllAuditClasses.append(CrmdStateAudit) AllAuditClasses.append(PartitionAudit) AllAuditClasses.append(ResourceAudit) AllAuditClasses.append(HAResourceAudit) AllAuditClasses.append(CIBAudit) def AuditList(cm): result = [] for auditclass in AllAuditClasses: result.append(auditclass(cm)) return result diff --git a/cts/CTStests.py.in b/cts/CTStests.py.in index e2542b737e..e01f2e7b81 100644 --- a/cts/CTStests.py.in +++ b/cts/CTStests.py.in @@ -1,2364 +1,2349 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Tests module There are a few things we want to do here: ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. Add RecourceRecover testcase Zhao Kai ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # SPECIAL NOTE: # # Tests may NOT implement any cluster-manager-specific code in them. # EXTEND the ClusterManager object to provide the base capabilities # the test needs if you need to do something that the current CM classes # do not. Otherwise you screw up the whole point of the object structure # in CTS. # # Thank you. # import CTS from CM_hb import HBConfig import CTSaudits import time, os, re, types, string, tempfile, sys from CTSaudits import * from stat import * from xml.dom.minidom import * # List of all class objects for tests which we ought to # consider running. class RandomTests: ''' A collection of tests which are run at random. ''' def __init__(self, scenario, cm, tests, Audits): self.CM = cm self.Env = cm.Env self.Scenario = scenario self.Tests = [] self.Audits = [] self.ns=CTS.NodeStatus(self.Env) for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") if test.is_applicable(): self.Tests.append(test) if not scenario.IsApplicable(): raise ValueError("Scenario not applicable in" " given Environment") self.Stats = {"success":0, "failure":0, "BadNews":0} self.IndividualStats= {} for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be a subclass of ClusterAudit") if audit.is_applicable(): self.Audits.append(audit) def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def audit(self, BadNews, test): errcount=0 BadNewsDebug=0 #BadNews.debug=1 while errcount < 1000: if BadNewsDebug: print "Looking for BadNews" match=BadNews.look(0) if match: if BadNewsDebug: print "BadNews found: "+match add_err = 1 ignorelist = [] if test: ignorelist=test.errorstoignore() ignorelist.append(" CTS: ") ignorelist.append("BadNews:") for ignore in ignorelist: if re.search(ignore, match): if BadNewsDebug: print "Ignoring:"+match+" (pattern: "+ignore+")" add_err = 0 if add_err == 1: ignorelist=self.CM.errorstoignore() for ignore in ignorelist: if re.search(ignore, match): if BadNewsDebug: print "Ignoring:"+match+" (pattern: "+ignore+")" add_err = 0 if add_err == 1: self.CM.log("BadNews: " + match) self.incr("BadNews") errcount=errcount+1 else: break else: self.CM.log("Big problems. Shutting down.") self.CM.stopall() self.summarize() raise ValueError("Looks like we hit the jackpot! :-)") for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " FAILED.") self.incr("auditfail") if test: test.incr("auditfail") def summarize(self): self.CM.log("****************") self.CM.log("Overall Results:" + repr(self.Stats)) self.CM.log("****************") self.CM.log("Detailed Results") for test in self.Tests: self.CM.log("Test %s: \t%s" %(test.name, repr(test.Stats))) self.CM.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def run(self, max=1): ( ''' Set up the given scenario, then run the selected tests at random for the selected number of iterations. ''') BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"] , timeout=0) BadNews.setwatch() if not self.Scenario.SetUp(self.CM): return None self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) testcount=1 time.sleep(30) # This makes sure everything is stabilized before starting... self.audit(BadNews, None) while testcount <= max: test = self.Env.RandomGen.choice(self.Tests) # Some tests want a node as an argument. nodechoice = self.Env.RandomNode() #logsize = os.stat(self.CM["LogFileName"])[ST_SIZE] #self.CM.log("Running test %s (%s) \t[%d : %d]" # % (test.name, nodechoice, testcount, logsize)) self.CM.log("Running test %s (%s) \t[%d]" % (test.name, nodechoice, testcount)) testcount = testcount + 1 starttime=time.time() test.starttime=starttime ret=test(nodechoice) if ret: self.incr("success") else: self.incr("failure") self.CM.log("Test %s (%s) \t[FAILED]" %(test.name,nodechoice)) # Better get the current info from the cluster... self.CM.statall() # Make sure logging is working and we have enough disk space... if not self.CM.Env["DoBSC"]: - self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) if not self.CM.TestLogging(): sys.exit(1) if not self.CM.CheckDf(): sys.exit(1) stoptime=time.time() elapsed_time = stoptime - starttime test_time = stoptime - test.starttime if not test.has_key("min_time"): test["elapsed_time"] = elapsed_time test["min_time"] = test_time test["max_time"] = test_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if test_time < test["min_time"]: test["min_time"] = test_time if test_time > test["max_time"]: test["max_time"] = test_time self.audit(BadNews, test) self.Scenario.TearDown(self.CM) self.audit(BadNews, None) for test in self.Tests: self.IndividualStats[test.name] = test.Stats return self.Stats, self.IndividualStats AllTestClasses = [ ] class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.timeout=120 self.starttime=0 def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def failure(self, reason="none"): '''Increment the failure count''' self.incr("failure") self.CM.log("Test " + self.name + " failed [reason:" + reason + "]") return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def canrunnow(self): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] ################################################################### class StopTest(CTSTest): ################################################################### '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="Stop" self.uspat = self.CM["Pat:We_stopped"] self.thempat = self.CM["Pat:They_stopped"] def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: return self.skipped() patterns = [] # Technically we should always be able to notice ourselves stopping patterns.append(self.CM["Pat:We_stopped"] % node) if self.CM.Env["use_logd"]: patterns.append(self.CM["Pat:Logd_stopped"] % node) # Any active node needs to notice this one left # NOTE: This wont work if we have multiple partitions for other in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node: patterns.append(self.CM["Pat:They_stopped"] %(other, node)) #self.debug("Checking %s will notice %s left"%(other, node)) watch = CTS.LogWatcher( self.CM["LogFileName"], patterns, self.CM["DeadTime"]) watch.setwatch() if node == self.CM.OurNode: self.incr("us") else: if self.CM.upcount() <= 1: self.incr("all") else: self.incr("them") self.CM.StopaCM(node) watch_result = watch.lookforall() UnmatchedList = "||" if watch.unmatched: for regex in watch.unmatched: self.CM.log ("Warn: Shutdown pattern not found: %s" % (regex)) UnmatchedList += regex + "||"; self.CM.cluster_stable(self.CM["DeadTime"]) # because syslog looses so many messages we can only really fail # the stop if _none_ of the CCM peers notice the node leave if not watch.unmatched or self.CM.upcount() == 0: return self.success() elif len(watch.unmatched) >= self.CM.upcount(): return self.failure("no match against (%s)" % UnmatchedList) return self.success() # # We don't register StopTest because it's better when called by # another test... # ################################################################### class StartTest(CTSTest): ################################################################### '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name="start" self.debug = debug self.uspat = self.CM["Pat:We_started"] self.thempat = self.CM["Pat:They_started"] def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self.CM.upcount() == 0: self.incr("us") else: self.incr("them") if self.CM.ShouldBeStatus[node] != self.CM["down"]: return self.skipped() elif self.CM.StartaCM(node): return self.success() else: return self.failure("Startup %s on node %s failed" %(self.CM["Name"], node)) def is_applicable(self): '''StartTest is always applicable''' return 1 # # We don't register StartTest because it's better when called by # another test... # ################################################################### class FlipTest(CTSTest): ################################################################### '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'Flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("stopped") ret = self.stop(node) type="up->down" # Give the cluster time to recognize it's gone... time.sleep(self.CM["StableTime"]) elif self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("started") ret = self.start(node) type="down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) def is_applicable(self): '''FlipTest is always applicable''' return 1 # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) ################################################################### class RestartTest(CTSTest): ################################################################### '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Restart" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) ret1 = 1 if self.CM.StataCM(node): self.incr("WasStopped") if not self.start(node): return self.failure("start (setup) failure: "+node) self.starttime=time.time() if not self.stop(node): return self.failure("stop failure: "+node) if not self.start(node): return self.failure("start failure: "+node) return self.success() def is_applicable(self): '''RestartTest is always applicable''' return 1 # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) ################################################################### class StonithTest(CTSTest): ################################################################### '''Reboot a node by whacking it with stonith.''' def __init__(self, cm, timeout=900): CTSTest.__init__(self,cm) self.name="Stonith" self.theystopped = self.CM["Pat:They_dead"] self.allstopped = self.CM["Pat:All_stopped"] self.usstart = self.CM["Pat:We_started"] self.themstart = self.CM["Pat:They_started"] self.timeout = timeout def _reset(self, node): StonithWorked=False for tries in 1,2,3,4,5: if self.CM.Env.ResetNode(node): StonithWorked=True break return StonithWorked def setup(self, target_node): # nothing to do return 1 def __call__(self, node): '''Perform the 'stonith' test. (whack the node)''' self.incr("calls") stopwatch = 0 rc = 0 if not self.setup(node): return self.failure("Setup failed") # Figure out what log message to look for when/if it goes down # # Any active node needs to notice this one left # NOTE: This wont work if we have multiple partitions stop_patterns = [] for other in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node: stop_patterns.append(self.CM["Pat:They_stopped"] %(other, node)) stopwatch = 1 #self.debug("Checking %s will notice %s left"%(other, node)) if self.CM.ShouldBeStatus[node] == self.CM["down"]: # actually no-one will notice this node die since HA isnt running stopwatch = 0 # Figure out what log message to look for when it comes up if self.CM.upcount() == 1 and self.CM.ShouldBeStatus[node] == self.CM["up"]: uppat = (self.usstart % node) else: uppat = (self.themstart % node) upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat] , timeout=self.timeout) if stopwatch == 1: watch = CTS.LogWatcher(self.CM["LogFileName"], stop_patterns , timeout=self.CM["DeadTime"]+10) watch.setwatch() # Reset (stonith) the node StonithWorked = self._reset(node) if not StonithWorked: return self.failure("Stonith didn't work") upwatch.setwatch() # Look() and see if the machine went down if stopwatch == 0: # Allow time for the node to die time.sleep(self.CM["DeadTime"]+10) elif not watch.lookforall(): if watch.unmatched: for regex in watch.unmatched: self.CM.log("Warn: STONITH pattern not found: %s"%regex) # !!no-one!! saw this node die if len(watch.unmatched) == len(stop_patterns): return self.failure("No-one saw %s die" %node) # else: syslog* lost a message # Alas I dont think this check is plausable (beekhof) # # Check it really stopped... #self.CM.ShouldBeStatus[node] = self.CM["down"] #if self.CM.StataCM(node) == 1: # ret1=0 # Look() and see if the machine came back up rc=0 if upwatch.look(): self.CM.debug("Startup pattern found: %s" %uppat) rc=1 else: self.CM.log("Warn: Startup pattern not found: %s" %uppat) # Check it really started... self.CM.ShouldBeStatus[node] = self.CM["up"] if rc == 0 and self.CM.StataCM(node) == 1: rc=1 # wait for the cluster to stabilize self.CM.cluster_stable() # return case processing if rc == 0: return self.failure("Node %s did not restart" %node) else: return self.success() def is_applicable(self): '''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE''' # for v2, stonithd test is a better test to run. if self.CM["Name"] == "linux-ha-v2": return None if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 # Register StonithTest as a good test to run AllTestClasses.append(StonithTest) ################################################################### class StonithdTest(StonithTest): ################################################################### def __init__(self, cm, timeout=600): StonithTest.__init__(self, cm, timeout=600) self.name="Stonithd" self.startall = SimulStartLite(cm) self.start = StartTest(cm) self.stop = StopTest(cm) self.init_node = None def _reset(self, target_node): if len(self.CM.Env["nodes"]) < 2: return self.skipped() StonithWorked = False for tries in range(1,5): fail_reasons = [] if self.CM.Env.ResetNode2(self.init_node, target_node, fail_reasons): StonithWorked = True break for reason in fail_reasons: self.CM.log(reason) return StonithWorked def setup(self, target_node): if len(self.CM.Env["nodes"]) < 2: return 1 self.init_node = self.CM.Env.RandomNode() while self.init_node == target_node: self.init_node = self.CM.Env.RandomNode() if not self.startall(None): return self.failure("Test setup failed") return 1 def is_applicable(self): if not self.CM["Name"] == "linux-ha-v2": return 0 if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 AllTestClasses.append(StonithdTest) ################################################################### class IPaddrtest(CTSTest): ################################################################### '''Find the machine supporting a particular IP address, and knock it down. [Hint: This code isn't finished yet...] ''' def __init__(self, cm, IPaddrs): CTSTest.__init__(self,cm) self.name="IPaddrtest" self.IPaddrs = IPaddrs self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, IPaddr): ''' Perform the IPaddr test... ''' self.incr("calls") node = self.CM.Env.RandomNode() self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["StableTime"]) ret2 = self.start(node) if not ret1: return self.failure("Could not stop") if not ret2: return self.failure("Could not start") return self.success() def is_applicable(self): '''IPaddrtest is always applicable (but shouldn't be)''' return 1 ################################################################### class StartOnebyOne(CTSTest): ################################################################### '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StartOnebyOne" self.stopall = SimulStopLite(cm) self.start = StartTest(cm) self.ns=CTS.NodeStatus(cm.Env) def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Test setup failed") failed=[] self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.start(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to start: " + repr(failed)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''StartOnebyOne is always applicable''' return 1 # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) ################################################################### class SimulStart(CTSTest): ################################################################### '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStart" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Setup failed") self.CM.clear_all_caches() if not self.startall(None): return self.failure("Startall failed") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''SimulStart is always applicable''' return 1 # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) class SimulStop(CTSTest): ################################################################### '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStop" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.stopall(None): return self.failure("Stopall failed") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''SimulStop is always applicable''' return 1 # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) class StopOnebyOne(CTSTest): ################################################################### '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StopOnebyOne" self.startall = SimulStartLite(cm) self.stop = StopTest(cm) def __call__(self, dummy): '''Perform the 'StopOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") failed=[] self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.stop(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to stop: " + repr(failed)) self.CM.clear_all_caches() return self.success() def is_applicable(self): '''StopOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(StopOnebyOne) class RestartOnebyOne(CTSTest): ################################################################### '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="RestartOnebyOne" self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'RestartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") did_fail=[] self.starttime=time.time() self.restart = RestartTest(self.CM) for node in self.CM.Env["nodes"]: if not self.restart(node): did_fail.append(node) if did_fail: return self.failure("Could not restart %d nodes: %s" %(len(did_fail), repr(did_fail))) return self.success() def is_applicable(self): '''RestartOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(RestartOnebyOne) ################################################################### class StandbyTest(CTSTest): ################################################################### '''Put a node in standby mode''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby" self.successpat = self.CM["Pat:StandbyOK"] self.nostandbypat = self.CM["Pat:StandbyNONE"] self.transient = self.CM["Pat:StandbyTRANSIENT"] def __call__(self, node): '''Perform the 'standby' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() if self.CM.upcount() < 2: self.incr("nostandby") pat = self.nostandbypat else: self.incr("standby") pat = self.successpat # # You could make a good argument that the cluster manager # ought to give us good clues on when its a bad time to # switch over to the other side, but heartbeat doesn't... # It could also queue the request. But, heartbeat # doesn't do that either :-) # retrycount=0 while (retrycount < 10): watch = CTS.LogWatcher(self.CM["LogFileName"] , [pat, self.transient] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.rsh(node, self.CM["Standby"]) match = watch.look() if match: if re.search(self.transient, match): self.incr("retries") time.sleep(2) retrycount=retrycount+1 else: return self.success() else: break # No point in retrying... return self.failure("did not find pattern " + pat) def is_applicable(self): '''StandbyTest is applicable when the CM has a Standby command''' if not self.CM.has_key("Standby"): return None else: #if self.CM.Env.has_key("DoStandby"): #flag=self.CM.Env["DoStandby"] #if type(flag) == types.IntType: #return flag #if not re.match("[yt]", flag, re.I): #return None # # We need to strip off everything after the first blank # cmd=self.CM["Standby"] cmd = cmd.split()[0] if not os.access(cmd, os.X_OK): return None cf = self.CM.cf if not cf.Parameters.has_key("auto_failback"): return None elif cf.Parameters["auto_failback"][0] == "legacy": return None return 1 # Register StandbyTest as a good test to run AllTestClasses.append(StandbyTest) ####################################################################### class StandbyTest2(CTSTest): ####################################################################### '''Standby with CRM of HA release 2''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby2" self.start = StartTest(cm) self.startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resouces, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): self.incr("calls") ret=self.startall(None) if not ret: return self.failure("Start all nodes failed") self.CM.debug("Make sure node %s is active" % node) if self.CM.StandbyStatus(node) != "off": if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.CM.debug("Getting resources running on node %s" % node) rsc_on_node = [] for rsc in self.CM.Resources(): if rsc.IsRunningOn(node): rsc_on_node.append(rsc) self.CM.debug("Setting node %s to standby mode" % node) if not self.CM.SetStandbyMode(node, "on"): return self.failure("can't set node %s to standby mode" % node) time.sleep(30) # Allow time for the update to be applied and cause something self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "on": return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status)) self.CM.debug("Checking resources") for rsc in self.CM.Resources(): if rsc.IsRunningOn(node): return self.failure("%s set to standby, %s is still running on it" % (node, rsc.rid)) self.CM.debug("Setting node %s to active mode" % node) if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) time.sleep(30) # Allow time for the update to be applied and cause something self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.CM.debug("Checking resources") for rsc in rsc_on_node: if not rsc.IsRunningOn(node): return self.failure("%s set to active but %s is NOT back" % (node, rsc.rid)) return self.success() def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllTestClasses.append(StandbyTest2) ####################################################################### class Fastdetection(CTSTest): ####################################################################### '''Test the time which one node find out the other node is killed very quickly''' def __init__(self,cm,timeout=60): CTSTest.__init__(self, cm) self.name = "DetectionTime" self.they_stopped = self.CM["Pat:They_stopped"] self.timeout = timeout self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.standby = StandbyTest(cm) self.__setitem__("min", 0) self.__setitem__("max", 0) self.__setitem__("totaltime", 0) def __call__(self, node): '''Perform the fastfailureDetection test''' self.incr("calls") ret=self.startall(None) if not ret: return self.failure("Test setup failed") if self.CM.upcount() < 2: return self.skipped() # Make sure they're not holding any resources ret = self.standby(node) if not ret: return ret stoppat = (self.they_stopped % ("", node)) stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout) stopwatch.setwatch() # # This test is CM-specific - FIXME!! # if self.CM.rsh(node, "killall -9 heartbeat")==0: Starttime = os.times()[4] if stopwatch.look(): Stoptime = os.times()[4] # This test is CM-specific - FIXME!! self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") Detectiontime = Stoptime-Starttime detectms = int(Detectiontime*1000+0.5) self.CM.log("...failure detection time: %d ms" % detectms) self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime if self.Stats["min"] == 0: self.Stats["min"] = Detectiontime if Detectiontime > self.Stats["max"]: self.Stats["max"] = Detectiontime if Detectiontime < self.Stats["min"]: self.Stats["min"] = Detectiontime self.CM.ShouldBeStatus[node] = self.CM["down"] self.start(node) return self.success() else: # This test is CM-specific - FIXME!! self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") self.CM.ShouldBeStatus[node] = self.CM["down"] ret=self.start(node) return self.failure("Didn't find the log message") else: return self.failure("Couldn't kill cluster manager") def is_applicable(self): '''This test is applicable when auto_failback != legacy''' return self.standby.is_applicable() # This test is CM-specific - FIXME!! def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ "ccm.*ERROR: ccm_control_process:failure to send protoversion request" , "ccm.*ERROR: Lost connection to heartbeat service. Need to bail out" ] AllTestClasses.append(Fastdetection) ############################################################################## class BandwidthTest(CTSTest): ############################################################################## # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which heartbeat uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) self.tempfile = tempfile.mktemp(".cts") self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount()<1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] port = int(port) ret = self.startall(None) if not ret: return self.failure("Test setup failed") time.sleep(5) # We get extra messages right after startup. fstmpfile = "/var/run/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile) rc = self.CM.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) self.CM.rsh.cp(farfile, self.tempfile) Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.CM.log("Could not compute bandwidth.") return self.success() intband = int(Bandwidth + 0.5) self.CM.log("...bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.CM.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): fp = open(file, "r") fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count=count+1 linesplit = string.split(line," ") for j in range(len(linesplit)-1): if linesplit[j]=="udp": break if linesplit[j]=="length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = string.split(T1,":") time2split = string.split(timesplit[2],".") time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = string.split(line," ") for j in range(len(linessplit)-1): if linessplit[j] =="udp": break if linesplit[j]=="length:": break try: sum=int(linessplit[j+1])+sum except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = string.split(T2,":") time2split = string.split(timesplit[2],".") time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return (sum*8)/time def is_applicable(self): '''BandwidthTest is always applicable''' return 0 AllTestClasses.append(BandwidthTest) ########################################################################## class RedundantpathTest(CTSTest): ########################################################################## '''In heartbeat, it has redundant path to communicate between the cluster''' # # Tests should not be cluster-manager specific # One needs to isolate what you need from the cluster manager and then # add a (new) API to do it. # def __init__(self,cm,timeout=60): CTSTest.__init__(self,cm) self.name = "RedundantpathTest" self.timeout = timeout def PathCount(self): '''Return number of communication paths''' Path = self.CM.InternalCommConfig() cf = self.CM.cf eths = [] serials = [] num = 0 for interface in Path["interface"]: if re.search("eth",interface): eths.append(interface) num = num + 1 if re.search("/dev",interface): serials.append(interface) num = num + 1 return (num, eths, serials) def __call__(self,node): '''Perform redundant path test''' self.incr("calls") if self.CM.ShouldBeStatus[node]!=self.CM["up"]: return self.skipped() (num, eths, serials) = self.PathCount() for eth in eths: if self.CM.rsh(node,"ifconfig %s down" % eth)==0: PathDown = "OK" break if PathDown != "OK": for serial in serials: if self.CM.rsh(node,"setserial %s uart none" % serial)==0: PathDown = "OK" break if PathDown != "OK": return self.failure("Cannot break the path") time.sleep(self.timeout) for audit in CTSaudits.AuditList(self.CM): if not audit(): for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.failure("Redundant path fail") for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.success() def is_applicable(self): '''It is applicable when you have more than one connection''' return self.PathCount()[0] > 1 # FIXME!! Why is this one commented out? #AllTestClasses.append(RedundantpathTest) ########################################################################## class DRBDTest(CTSTest): ########################################################################## '''In heartbeat, it provides replicated storage.''' def __init__(self,cm, timeout=10): CTSTest.__init__(self,cm) self.name = "DRBD" self.timeout = timeout def __call__(self, dummy): '''Perform the 'DRBD' test.''' self.incr("calls") for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() # Note: All these special cases with Start/Stop/StatusDRBD # should be reworked to use resource objects instead of # being hardwired to bypass the objects here. for node in self.CM.Env["nodes"]: done=time.time()+self.timeout+1 while (time.time()done: return self.failure("Can't start drbd, please check it") device={} for node in self.CM.Env["nodes"]: device[node]=self.getdevice(node) node = self.CM.Env["nodes"][0] done=time.time()+self.timeout+1 while 1: if (time.time()>done): return self.failure("the drbd could't sync") self.CM.rsh(node,"cp /proc/drbd /var/run >/dev/null 2>&1") if self.CM.rsh.cp("%s:/var/run/drbd" % node,"/var/run"): line = open("/tmp/var/run").readlines()[2] p = line.find("Primary") s1 = line.find("Secondary") s2 = line.rfind("Secondary") if s1!=s2: if self.CM.rsh(node,"drbdsetup %s primary" % device[node]): pass if p!=-1: if p mem_allow: failed = 1 messages.append("%s size grew by %dkB (%dkB)" %(memory_error[index], mem_diff, mem_after)) elif mem_diff < 0: messages.append("%s size shrank by %dkB (%dkB)" %(memory_error[index], mem_diff, mem_after)) if len(messages) > 0: self.CM.log("Process %s[%s] on %s: %s" %(before_tokens[0], before_tokens[1], node, repr(messages))) self.CM.debug("%s Before: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB" %(node, before_tokens[0], before_tokens[1], before_tokens[2], before_tokens[3], before_tokens[4], before_tokens[5], before_tokens[6])) self.CM.debug("%s After: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB" %(node, after_tokens[0], after_tokens[1], after_tokens[2], after_tokens[3], after_tokens[4], after_tokens[5], after_tokens[6])) if failed == 1: failed_nodes.append(node) if len(failed_nodes) > 0: return self.failure("Memory leaked on: " + repr(failed_nodes)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [ """ERROR: .* LRM operation.*monitor on .*: not running""", """pengine:.*Handling failed """] def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 #AllTestClasses.append(MemoryTest) #################################################################### class ElectionMemoryTest(CTSTest): #################################################################### '''Check to see if anyone is leaking memory''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Election" def __call__(self, node): self.rsh.readaline(node, self.CM["ElectionCmd"]%node) if self.CM.cluster_stable(): return self.success() return self.failure("Cluster not stable") def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''Never applicable, only for use by the memory test''' return 0 AllTestClasses.append(ElectionMemoryTest) #################################################################### class SpecialTest1(CTSTest): #################################################################### '''Set up a custom test to cause quorum failure issues for Andrew''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SpecialTest1" self.startall = SimulStartLite(cm) self.restart1 = RestartTest(cm) self.stopall = SimulStopLite(cm) def __call__(self, node): '''Perform the 'SpecialTest1' test for Andrew. ''' self.incr("calls") # Shut down all the nodes... ret = self.stopall(None) if not ret: return ret # Start the selected node ret = self.restart1(node) if not ret: return ret # Start all remaining nodes ret = self.startall(None) return ret def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): return 1 AllTestClasses.append(SpecialTest1) ################################################################### class NearQuorumPointTest(CTSTest): ################################################################### ''' This test brings larger clusters near the quorum point (50%). In addition, it will test doing starts and stops at the same time. Here is how I think it should work: - loop over the nodes and decide randomly which will be up and which will be down Use a 50% probability for each of up/down. - figure out what to do to get into that state from the current state - in parallel, bring up those going up and bring those going down. ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="NearQuorumPoint" def __call__(self, dummy): '''Perform the 'NearQuorumPoint' test. ''' self.incr("calls") startset = [] stopset = [] #decide what to do with each node for node in self.CM.Env["nodes"]: action = self.CM.Env.RandomGen.choice(["start","stop"]) #action = self.CM.Env.RandomGen.choice(["start","stop","no change"]) if action == "start" : startset.append(node) elif action == "stop" : stopset.append(node) self.CM.debug("start nodes:" + repr(startset)) self.CM.debug("stop nodes:" + repr(stopset)) - if not self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]): - self.log("WaitForAllNodesToComeUp filed") - return self.skipped() - #add search patterns watchpats = [ ] for node in stopset: if self.CM.ShouldBeStatus[node] == self.CM["up"]: watchpats.append(self.CM["Pat:We_stopped"] % node) for node in startset: if self.CM.ShouldBeStatus[node] == self.CM["down"]: watchpats.append(self.CM["Pat:They_started"] % node) if len(watchpats) == 0: return self.skipped() watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() #begin actions for node in stopset: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.StopaCMnoBlock(node) for node in startset: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.StartaCMnoBlock(node) #get the result if watch.lookforall(): self.CM.cluster_stable() return self.success() self.CM.log("Warn: Patterns not found: " + repr(watch.unmatched)) #get the "bad" nodes upnodes = [] for node in stopset: if self.CM.StataCM(node) == 1: upnodes.append(node) downnodes = [] for node in startset: if self.CM.StataCM(node) == 0: downnodes.append(node) if upnodes == [] and downnodes == []: self.CM.cluster_stable() return self.success() if len(upnodes) > 0: self.CM.log("Warn: Unstoppable nodes: " + repr(upnodes)) if len(downnodes) > 0: self.CM.log("Warn: Unstartable nodes: " + repr(downnodes)) return self.failure() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllTestClasses.append(NearQuorumPointTest) ################################################################### class BSC_AddResource(CTSTest): ################################################################### '''Add a resource to the cluster''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="AddResource" self.resource_offset = 0 self.cib_cmd="""@sbindir@/cibadmin -C -o %s -X '%s' """ def __call__(self, node): self.resource_offset = self.resource_offset + 1 r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset) start_pat = "crmd.*start_0 on %s complete" patterns = [] patterns.append(start_pat % r_id) watch = CTS.LogWatcher( self.CM["LogFileName"], patterns, self.CM["DeadTime"]) watch.setwatch() fields = string.split(self.CM.Env["IPBase"], '.') fields[3] = str(int(fields[3])+1) ip = string.join(fields, '.') self.CM.Env["IPBase"] = ip if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip): return self.failure("Make resource %s failed" % r_id) failed = 0 watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.CM.log ("Warn: Pattern not found: %s" % (regex)) failed = 1 if failed: return self.failure("Resource pattern(s) not found") if not self.CM.cluster_stable(self.CM["DeadTime"]): return self.failure("Unstable cluster") return self.success() def make_ip_resource(self, node, id, rclass, type, ip): self.CM.log("Creating %s::%s:%s (%s) on %s" % (rclass,type,id,ip,node)) rsc_xml=""" """ % (id, rclass, type, id, id, ip) node_constraint=""" """ % (id, id, id, id, node) rc = 0 (rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("constraints", node_constraint)) if rc != 0: self.CM.log("Constraint creation failed: %d" % rc) return None (rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("resources", rsc_xml)) if rc != 0: self.CM.log("Resource creation failed: %d" % rc) return None return 1 def is_applicable(self): if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["DoBSC"]: return 1 return None def TestList(cm): result = [] for testclass in AllTestClasses: bound_test = testclass(cm) if bound_test.is_applicable(): result.append(bound_test) return result class SimulStopLite(CTSTest): ################################################################### '''Stop any active nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStopLite" def __call__(self, dummy): '''Perform the 'SimulStopLite' setup work. ''' self.incr("calls") - if not self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]): - self.log("WaitForAllNodesToComeUp filed") - return self.skipped() - self.CM.debug("Setup: " + self.name) # We ignore the "node" parameter... watchpats = [ ] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("WasStarted") watchpats.append(self.CM["Pat:All_stopped"] % node) if self.CM.Env["use_logd"]: watchpats.append(self.CM["Pat:Logd_stopped"] % node) if len(watchpats) == 0: self.CM.clear_all_caches() return self.skipped() # Stop all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.StopaCMnoBlock(node) if watch.lookforall(): self.CM.clear_all_caches() return self.success() did_fail=0 up_nodes = [] for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 1: did_fail=1 up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: " + repr(up_nodes)) self.CM.log("Warn: All nodes stopped but CTS didnt detect: " + repr(watch.unmatched)) self.CM.clear_all_caches() return self.success() def is_applicable(self): '''SimulStopLite is a setup test and never applicable''' return 0 ################################################################### class SimulStartLite(CTSTest): ################################################################### '''Start any stopped nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStartLite" def __call__(self, dummy): '''Perform the 'SimulStartList' setup work. ''' self.incr("calls") self.CM.debug("Setup: " + self.name) - if not self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]): - self.log("WaitForAllNodesToComeUp filed") - return self.skipped() - # We ignore the "node" parameter... watchpats = [ ] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") watchpats.append(self.CM["Pat:They_started"] % node) if len(watchpats) == 0: return self.skipped() # Start all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.StartaCMnoBlock(node) if watch.lookforall(): for attempt in (1, 2, 3, 4, 5): if self.CM.cluster_stable(): return self.success() return self.failure("Cluster did not stabilize") did_fail=0 unstable = [] for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 0: did_fail=1 unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: " + repr(unstable)) unstable = [] for node in self.CM.Env["nodes"]: if not self.CM.node_stable(node): did_fail=1 unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: " + repr(unstable)) self.CM.log("Warn: All nodes started but CTS didnt detect: " + repr(watch.unmatched)) return self.success() def is_applicable(self): '''SimulStartLite is a setup test and never applicable''' return 0 ################################################################### class LoggingTest(CTSTest): ################################################################### def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Logging" def __call__(self, dummy): '''Perform the 'Logging' test. ''' self.incr("calls") - self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) - # Make sure logging is working and we have enough disk space... if not self.CM.TestLogging(): sys.exit(1) if not self.CM.CheckDf(): sys.exit(1) def is_applicable(self): '''ResourceRecover is applicable only when there are resources running on our cluster and environment is linux-ha-v2''' return self.CM.Env["DoBSC"] def errorstoignore(self): '''Return list of errors which should be ignored''' return [] #AllTestClasses.append(LoggingTest)