diff --git a/cts/CM_hb.py.in b/cts/CM_hb.py.in index b6054d6467..a57755ee93 100755 --- a/cts/CM_hb.py.in +++ b/cts/CM_hb.py.in @@ -1,645 +1,648 @@ #!@PYTHON@ '''CTS: Cluster Testing System: heartbeat dependent modules... Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000,2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. from CTS import * class HeartbeatCM(ClusterManager): ''' The heartbeat cluster manager class. It implements the things we need to talk to and manipulate heartbeat clusters ''' def __init__(self, Environment, randseed=None): self.ResourceDirs = ["@sysconfdir@/ha.d/resource.d", "@sysconfdir@/rc.d/init.d", "@sysconfdir@/rc.d/"] self.ResourceFile = Environment["HAdir"] + "/haresources" self.ConfigFile = Environment["HAdir"]+ "/ha.cf" ClusterManager.__init__(self, Environment, randseed=randseed) self.update({ "Name" : "heartbeat", "DeadTime" : 30, "StableTime" : 30, "StartCmd" : "ulimit -c unlimited; MALLOC_CHECK_=2; export MALLOC_CHECK_; @libdir@/heartbeat/heartbeat -d 2>/dev/null", "StopCmd" : "@libdir@/heartbeat/heartbeat -k", "StatusCmd" : "@libdir@/heartbeat/heartbeat -s", "RereadCmd" : "@libdir@/heartbeat/heartbeat -r", "StartDRBDCmd" : "@sysconfdir@/init.d/drbd start >/dev/null 2>&1", "StopDRBDCmd" : "@sysconfdir@/init.d/drbd stop", "StatusDRBDCmd" : "@sysconfdir@/init.d/drbd status", "DRBDCheckconf" : "@sysconfdir@/init.d/drbd checkconfig >/tmp/drbdconf 2>&1", "BreakCommCmd" : "@libdir@/heartbeat/TestHeartbeatComm break-communication >/dev/null 2>&1", "FixCommCmd" : "@libdir@/heartbeat/TestHeartbeatComm fix-communication >/dev/null 2>&1", "DelFileCommCmd" : "/usr/lib/heartbeat/TestHeartbeatComm delete-testingfile >/dev/null 2>&1", "SaveFileCmd" : "/usr/lib/heartbeat/TestHeartbeatComm save-testingfile /tmp/OnlyForTesting >/dev/null 2>&1", "ReduceCommCmd" : "/usr/lib/heartbeat/TestHeartbeatComm reduce-communication %s %s>/dev/null 2>&1", "RestoreCommCmd" : "/usr/lib/heartbeat/TestHeartbeatComm restore-communication /tmp/OnlyForTesting >/dev/null 2>&1", "IPaddrCmd" : "@sysconfdir@/ha.d/resource.d/IPaddr %s status", "Standby" : "@libdir@/heartbeat/hb_standby >/dev/null 2>&1", "TestConfigDir" : "@sysconfdir@/ha.d/testconfigs", "LogFileName" : Environment["LogFileName"], # Patterns to look for in the log files for various occasions... "Pat:We_started" : " (%s) .* Initial resource acquisition complete", "Pat:They_started" : " (%s) .* Initial resource acquisition complete", "Pat:We_stopped" : "Heartbeat shutdown complete", "Pat:They_stopped" : "node (%s).*: is dead", "Pat:They_dead" : "node (%s).*: is dead", "Pat:All_stopped" : " (%s).*heartbeat.*Heartbeat shutdown complete", "Pat:StandbyOK" : "Standby resource acquisition done", "Pat:StandbyNONE" : "No reply to standby request", "Pat:StandbyTRANSIENT" : "standby message.*ignored.*in flux", "Pat:Return_partition" : "Cluster node %s returning after partition", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"Shutting down\.", r"Forcing shutdown\.", r"Both machines own .* resources!", r"No one owns .* resources!", r", exiting\.", r"ERROR:", r"CRIT.*:", ), }) self.cf=HBConfig(Environment["HAdir"]) self._finalConditions() def SetClusterConfig(self, configpath="default", nodelist=None): '''Activate the named test configuration throughout the cluster. This code is specialized to heartbeat. ''' rc=1 Command=''' cd %s%s%s; : cd to test configuration directory for j in * do if [ -f "@sysconfdir@/ha.d/$j" ]; then if cmp $j @sysconfdir@/ha.d/$j >/dev/null 2>&1; then : Config file $j is already up to correct. else echo "Touching $j" cp $j @sysconfdir@/ha.d/$j fi fi done ''' % (self["TestConfigDir"], os.sep, configpath) if nodelist == None: nodelist=self.Env["nodes"] for node in nodelist: if not self.rsh(node, Command): rc=None self.rereadall() return rc def ResourceGroups(self): ''' Return the list of resources groups defined in this configuration. This code is specialized to heartbeat. We make the assumption that the resource file on the local machine is the same as that of a cluster member. We aren't necessarily a member of the cluster (In fact, we usually aren't). ''' RscGroups=[] file = open(self.ResourceFile, "r") while (1): line = file.readline() if line == "": break idx=string.find(line, '#') if idx >= 0: line=line[:idx] if line == "": continue line = string.strip(line) # Is this wrong? tokens = re.split("[ \t]+", line) # Ignore the default server for this resource group del tokens[0] Group=[] for token in tokens: if token != "": idx=string.find(token, "::") if idx > 0: tuple=string.split(token, "::") else: # # Is this an IPaddr default resource type? # if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$" , token): tuple=["IPaddr", token] else: tuple = [token, None] Resource = self.hbResource(tuple[0], tuple[1]) Group.append(Resource) RscGroups.append(Group) file.close() return RscGroups def InternalCommConfig(self): ''' Return a list of communication paths. Each path consists of a tuple like this: mediatype serial | ip interface/dev name eth0 | /dev/ttyS0... protocol tcp?? | udp | None port Number | None ''' Path = {"mediatype" : None, "interface": None, "protocol" : None, "port": None} cf = self.cf for cfp in cf.Parameters: if cfp == "serial": if Path["mediatype"] == None: Path["mediatype"] = ["serial"] else: Path["mediatype"].append("serial") if Path["interface"] == None: Path["interface"] = cf.Parameters["serial"] else: for serial in cf.Parameters["serial"]: Path["interface"].append(serial) if cfp == "bcast" or cfp == "mcast" or cfp == "ucast" : if Path["mediatype"] == None: Path["mediatype"] = ["ip"] else: Path["mediatype"].append("ip") if cfp == "bcast": interfaces = cf.Parameters[cfp] if cfp == "ucast": interfaces = [cf.Parameters[cfp][0]] if cfp == "mcast": Path["port"] = [cf.Parameters[cfp][0][2]] Path["protocol"] = "udp" interfaces = [cf.Parameters[cfp][0][0]] if Path["interface"] == None: Path["interface"] = interfaces else: for interface in interfaces: if interface not in Path["interface"]: Path["interface"].append(interface) if cfp == "udpport": Path["port"] = cf.Parameters["udpport"] Path["protocol"] = ["udp"] + if Path["port"] == None: + Path["port"] = [694] + return Path def HasQuorum(self, node_list): ( '''Return TRUE if the cluster currently has quorum. According to current heartbeat code this means one node is up. ''') return self.upcount() >= 1 def hbResource(self, type, instance): ''' Our job is to create the right kind of resource. For most resources, we just create an HBResource object, but for IP addresses, we create an HBipResource instead. Some other types of resources may also be added as special cases. ''' if type == "IPaddr": return HBipResource(self, type, instance) return HBResource(self, type, instance) class HBResource(Resource): def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' return self._ResourceOperation("status", "OK|running", nodename) def _ResourceOperation(self, operation, pattern, nodename): ''' We call the requested operation for the resource script. We don't care what kind of operation we were called to do particularly. When we were created, we were bound to a cluster manager, which has its own remote execution method (which we use here). ''' if self.Instance == None: instance = "" else: instance = self.Instance Rlist = 'LIST="' for dir in self.CM.ResourceDirs: Rlist = Rlist + " " + dir Rlist = Rlist + '"; ' Script= Rlist + ''' T="''' + self.ResourceType + '''"; I="''' + instance + '''"; for dir in $LIST; do if [ -f "$dir/$T" -a -x "$dir/$T" ] then "$dir/$T" $I ''' + operation + ''' exit $? fi done 2>&1; exit 1;''' #print "Running " + Script + "\n" line = self.CM.rsh.readaline(nodename, Script) if operation == "status": if re.search(pattern, line): return 1 return self.CM.rsh.lastrc == 0 def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self._ResourceOperation("start", None, nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self._ResourceOperation("stop", None, nodename) # def IsWorkingCorrectly(self, nodename): # "We default to returning TRUE for this one..." # if self.Instance == None: # self.CM.log("Faking out: " + self.ResourceType) # else: # self.CM.log("Faking out: " + self.ResourceType + self.Instance) # return 1 def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", "OK", nodename) class HBipResource(HBResource): ''' We are a specialized IP address resource which knows how to test to see if our resource type is actually being served. We are cheat and run the IPaddr resource script on the current machine, because it's a more interesting case. ''' def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", "OK", self.CM.OurNode) # # A heartbeat configuration class... # It reads and parses the heartbeat config # files # class HBConfig: # Which options have multiple words on the line? MultiTokenKeywords = {"mcast" : None , "stonith_host": None} def __init__(self, configdir="/etc/ha.d"): self.Parameters = {} self.ResourceGroups = {} self._ReadConfig(os.path.join(configdir, "ha.cf")) FirstUp_NodeSelection() LastUp_NodeSelection() no_failback = NoAutoFailbackPolicy() auto_failback = AutoFailbackPolicy() # # We allow each resource group to have its own failover/back # policies # if self.Parameters.has_key("nice_failback") \ and self.Parameters["nice_failback"] == "on": HBConfig.DefaultFailbackPolicy = no_failback elif self.Parameters.has_key("auto_failback") \ and self.Parameters["auto_failback"] == "off": HBConfig.DefaultFailbackPolicy = no_failback else: HBConfig.DefaultFailbackPolicy = auto_failback HBConfig.DefaultNodeSelectionPolicy = NodeSelectionPolicies["FirstUp"] self._ReadResourceGroups(os.path.join(configdir, "haresources")) # Read ha.cf config file def _ReadConfig(self, ConfigFile): self.ConfigPath = ConfigFile; fp = open(ConfigFile) while 1: line=fp.readline() if not line: return line = re.sub("#.*", "", line) line = string.rstrip(line) if len(line) < 1: continue tokens = line.split() key = tokens[0] values = tokens[1:] if HBConfig.MultiTokenKeywords.has_key(key): # group items from this line together, and separate # from the items on other lines values = [values] if self.Parameters.has_key(key): if key == "node": self.Parameters[key].extend(values) else: self.Parameters[key].append(values[0]) else: self.Parameters[key] = values # Read a line from the haresources file... # - allow for \ continuations... def _GetRscLine(self, fp): linesofar = None continuation=1 while continuation: continuation = 0 line=fp.readline() if not line: break line = re.sub("#.*", "", line) if line[len(line)-2] == "\\": line = line[0:len(line)-2] + "\n" continuation=1 if linesofar == None: linesofar = line else: linesofar = linesofar + line return linesofar def _ReadResourceGroups(self, RscFile): self.RscPath = RscFile; fp = open(RscFile) thisline = "" while 1: line=self._GetRscLine(fp) if not line: return line = line.strip() if len(line) < 1: continue tokens = line.split() node = tokens[0] resources = tokens[1:] rscargs=[] for resource in resources: name=resource.split("::", 1) if len(name) > 1: args=name[1].split("::") else: args=None name = name[0] rscargs.append(Resource(name, args)) name = tokens[0] + "__" + tokens[1] assert not self.ResourceGroups.has_key(name) # # Create the resource group # self.ResourceGroups[name] = ResourceGroup(name \ , rscargs , node.split(",") # Provide default value , HBConfig.DefaultNodeSelectionPolicy , HBConfig.DefaultFailbackPolicy) # # Return the list of nodes in the cluster... # def nodes(self): result = self.Parameters["node"] result.sort() return result class ClusterState: pass class ResourceGroup: def __init__(self, name, resourcelist, possiblenodes , nodeselection_policy, failback_policy): self.name = name self.resourcelist = resourcelist self.possiblenodes = possiblenodes self.nodeselection_policy = nodeselection_policy self.failback_policy = failback_policy self.state = None self.attributes = {} self.history = [] def __str__(self): result = string.join(self.possiblenodes, ",") for rsc in self.resourcelist: result = result + " " + str(rsc) return result class Resource: def __init__(self, name, arguments=None): self.name = name self.arguments = arguments def __str__(self): result = self.name try: for arg in self.arguments: result = result + "::" + arg except TypeError: pass return result ####################################################################### # # Base class defining policies for where we put resources # when we're starting, or when a failure has occurred... # ####################################################################### NodeSelectionPolicies = {} class NodeSelectionPolicy: def __init__(self, name): self.name = name NodeSelectionPolicies[name] = self def name(self): return self.name # # nodenames: the list of nodes eligible to run this resource # ResourceGroup: the group to be started... # ClusterState: Cluster state information # def SelectNode(self, nodenames, ResourceGroup, ClusterState): return None # # Choose the first node in the list... # class FirstUp_NodeSelection(NodeSelectionPolicy): def __init__(self): NodeSelectionPolicy.__init__(self, "FirstUp") def SelectNode(self, nodenames, ResourceGroup, ClusterState): return nodenames[0] # # Choose the last node in the list... # (kind of a dumb example) # class LastUp_NodeSelection(NodeSelectionPolicy): def __init__(self): NodeSelectionPolicy.__init__(self, "LastUp") def SelectNode(self, nodenames, ResourceGroup, ClusterState): return nodenames[len(nodenames)-1] ####################################################################### # # Failback policies... # # Where to locate a resource group when an eligible node rejoins # the cluster... # ####################################################################### FailbackPolicies = {} class FailbackPolicy: def __init__(self, name): self.name = name FailbackPolicies[name] = self def name(self): return self.name # # currentnode: The node the service is currently on # returningnode: The node which just rejoined # eligiblenodes: Permitted nodes which are up # SelectionPolicy: the normal NodeSelectionPolicy # Cluster state information... # def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup, ClusterState): return None # # This FailbackPolicy is like "normal failback" in heartbeat # class AutoFailbackPolicy(FailbackPolicy): def __init__(self): FailbackPolicy.__init__(self, "failback") def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup, ClusterState): # Select where it should run based on current normal policy # just as though we were starting it for the first time. return SelectionPolicy(eligiblenodes, ResourceGroup, ClusterState) # # This FailbackPolicy is like "nice failback" in heartbeat # class NoAutoFailbackPolicy(FailbackPolicy): def __init__(self): FailbackPolicy.__init__(self, "failuresonly") def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup): # Always leave the resource where it is... return currentnode ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': FirstUp_NodeSelection() LastUp_NodeSelection() no_failback = NoAutoFailbackPolicy() auto_failback = AutoFailbackPolicy() cf=HBConfig("/etc/ha.d") print "Cluster configuration:\n" print "Nodes:", cf.nodes(), "\n" print "Config Parameters:", cf.Parameters, "\n" for groupname in cf.ResourceGroups.keys(): print "Resource Group %s:\n\t%s\n" % (groupname, cf.ResourceGroups[groupname]) diff --git a/cts/CTStests.py.in b/cts/CTStests.py.in index d8ef7f7297..46bbff57df 100644 --- a/cts/CTStests.py.in +++ b/cts/CTStests.py.in @@ -1,1723 +1,1721 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Tests module There are a few things we want to do here: ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. Add RecourceRecover testcase Zhao Kai ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import CTS from CM_hb import HBConfig import CTSaudits import time, os, re, types, string, tempfile from CTSaudits import * from stat import * # List of all class objects for tests which we ought to # consider running. class RandomTests: ''' A collection of tests which are run at random. ''' def __init__(self, scenario, cm, tests, Audits): self.CM = cm self.Env = cm.Env self.Scenario = scenario self.Tests = [] self.Audits = [] for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") if test.is_applicable(): self.Tests.append(test) if not scenario.IsApplicable(): raise ValueError("Scenario not applicable in" " given Environment") self.Stats = {"success":0, "failure":0, "BadNews":0} self.IndividualStats= {} for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be a subclass of ClusterAudit") if audit.is_applicable(): self.Audits.append(audit) def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def audit(self, BadNews, test): errcount=0 while errcount < 1000: match=BadNews.look() if match: add_err = 1 ignorelist = [] if test: ignorelist=test.errorstoignore() ignorelist.append(" CTS: ") for ignore in ignorelist: if re.search(ignore, match): add_err = 0 if add_err == 1: ignorelist=self.CM.errorstoignore() for ignore in ignorelist: if re.search(ignore, match): add_err = 0 if add_err == 1: self.CM.log("BadNews: " + match) self.incr("BadNews") errcount=errcount+1 else: break else: self.CM.log("Big problems. Shutting down.") self.CM.stopall() self.summarize() raise ValueError("Looks like we hit the jackpot! :-)") for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " FAILED.") self.incr("auditfail") if test: test.incr("auditfail") def summarize(self): self.CM.log("****************") self.CM.log("Overall Results:" + repr(self.Stats)) self.CM.log("****************") self.CM.log("Detailed Results") for test in self.Tests: self.CM.log("Test %s: \t%s" %(test.name, repr(test.Stats))) self.CM.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def run(self, max=1): ( ''' Set up the given scenario, then run the selected tests at random for the selected number of iterations. ''') BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"] , timeout=0) BadNews.setwatch() if not self.Scenario.SetUp(self.CM): return None testcount=1 time.sleep(30) # This makes sure everything is stabilized before starting... self.audit(BadNews, None) while testcount <= max: test = self.Env.RandomGen.choice(self.Tests) # Some tests want a node as an argument. nodechoice = self.Env.RandomNode() #logsize = os.stat(self.CM["LogFileName"])[ST_SIZE] #self.CM.log("Running test %s (%s) \t[%d : %d]" # % (test.name, nodechoice, testcount, logsize)) self.CM.log("Running test %s (%s) \t[%d]" % (test.name, nodechoice, testcount)) testcount = testcount + 1 starttime=time.time() test.starttime=starttime ret=test(nodechoice) if ret: self.incr("success") else: self.incr("failure") self.CM.log("Test %s (%s) \t[FAILED]" %(test.name,nodechoice)) # Better get the current info from the cluster... self.CM.statall() stoptime=time.time() elapsed_time = stoptime - starttime test_time = stoptime - test.starttime if not test.has_key("min_time"): test["elapsed_time"] = elapsed_time test["min_time"] = test_time test["max_time"] = test_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if test_time < test["min_time"]: test["min_time"] = test_time if test_time > test["max_time"]: test["max_time"] = test_time self.audit(BadNews, test) self.Scenario.TearDown(self.CM) self.audit(BadNews, None) for test in self.Tests: self.IndividualStats[test.name] = test.Stats return self.Stats, self.IndividualStats AllTestClasses = [ ] class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.timeout=120 self.starttime=0 def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def failure(self, reason="none"): '''Increment the failure count''' self.incr("failure") self.CM.log("Test " + self.name + " failed [reason:" + reason + "]") return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def canrunnow(self): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] ################################################################### class StopTest(CTSTest): ################################################################### '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="stop" self.uspat = self.CM["Pat:We_stopped"] self.thempat = self.CM["Pat:They_stopped"] self.allpat = self.CM["Pat:All_stopped"] def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: return self.skipped() if node == self.CM.OurNode: self.incr("us") pat = self.uspat else: if self.CM.upcount() <= 1: self.incr("all") pat = (self.allpat % node) else: self.incr("them") pat = (self.thempat % node) watch = CTS.LogWatcher(self.CM["LogFileName"], [pat] , self.CM["DeadTime"]+10) watch.setwatch() self.CM.StopaCM(node) if watch.look(): return self.success() else: return self.failure("no match against %s "% pat) # # We don't register StopTest because it's better when called by # another test... # ################################################################### class StartTest(CTSTest): ################################################################### '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name="start" self.debug = debug def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if node == self.CM.OurNode or self.CM.upcount() < 1: self.incr("us") else: self.incr("them") if self.CM.ShouldBeStatus[node] != self.CM["down"]: return self.skipped() elif self.CM.StartaCM(node) == 1: return self.success() return self.failure("Startup %s on node %s failed" %(self.CM["Name"], node)) def is_applicable(self): '''StartTest is always applicable''' return 1 # # We don't register StartTest because it's better when called by # another test... # ################################################################### class FlipTest(CTSTest): ################################################################### '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("stopped") ret = self.stop(node) type="up->down" # Give the cluster time to recognize it's gone... time.sleep(self.CM["StableTime"]) elif self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("started") ret = self.start(node) type="down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) def is_applicable(self): '''FlipTest is always applicable''' return 1 # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) ################################################################### class RestartTest(CTSTest): ################################################################### '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Restart" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) ret1 = 1 if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") ret1 = self.start(node) self.starttime=time.time() ret2 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["StableTime"]) ret3 = self.start(node) if not ret1: return self.failure("start (setup) failure") if not ret2: return self.failure("stop failure") if not ret3: return self.failure("start failure") return self.success() def is_applicable(self): '''RestartTest is always applicable''' return 1 # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) ################################################################### class StonithTest(CTSTest): ################################################################### '''Reboot a node by whacking it with stonith.''' def __init__(self, cm, timeout=600): CTSTest.__init__(self,cm) self.name="Stonith" self.theystopped = self.CM["Pat:They_dead"] self.allstopped = self.CM["Pat:All_stopped"] self.usstart = self.CM["Pat:We_started"] self.themstart = self.CM["Pat:They_started"] self.timeout = timeout def __call__(self, node): '''Perform the 'stonith' test. (whack the node)''' self.incr("calls") stopwatch = None # Figure out what log message to look for when/if it goes down if self.CM.ShouldBeStatus[node] != self.CM["down"]: if self.CM.upcount() != 1: stopwatch = (self.theystopped % node) # Figure out what log message to look for when it comes up if (self.CM.upcount() <= 1): uppat = (self.usstart % node) else: uppat = (self.themstart % node) upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat] , timeout=self.timeout) if stopwatch: watch = CTS.LogWatcher(self.CM["LogFileName"], [stopwatch] , timeout=self.CM["DeadTime"]+10) watch.setwatch() # Reset (stonith) the node StonithWorked=None for tries in 1,2,3,4,5: if self.CM.Env.ResetNode(node): StonithWorked=1 break if not StonithWorked: return self.failure("Stonith failure") upwatch.setwatch() # Look() and see if the machine went down if stopwatch: if watch.look(): ret1=1 else: reason="Did not find " + stopwatch ret1=0 else: ret1=1 # Look() and see if the machine came back up if upwatch.look(): ret2=1 else: reason="Did not find " + uppat ret2=0 self.CM.ShouldBeStatus[node] = self.CM["up"] # I can't remember why I put this in here :-( time.sleep(30) if not ret1: self.CM.log("When node %s STONITHed, the other node didn't log it" % node) if ret1 and ret2: return self.success() else: return self.failure(reason) def is_applicable(self): '''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE''' if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 # Register StonithTest as a good test to run AllTestClasses.append(StonithTest) ################################################################### class IPaddrtest(CTSTest): ################################################################### '''Find the machine supporting a particular IP address, and knock it down. [Hint: This code isn't finished yet...] ''' def __init__(self, cm, IPaddrs): CTSTest.__init__(self,cm) self.name="IPaddrtest" self.IPaddrs = IPaddrs self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, IPaddr): ''' Perform the IPaddr test... ''' self.incr("calls") node = self.CM.Env.RandomNode() self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["StableTime"]) ret2 = self.start(node) if not ret1: return self.failure("Could not stop") if not ret2: return self.failure("Could not start") return self.success() def is_applicable(self): '''IPaddrtest is always applicable (but shouldn't be)''' return 1 ################################################################### class StartOnebyOne(CTSTest): ################################################################### '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StartOnebyOne" self.stopall = SimulStopLite(cm) def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.skipped() watchpats = [ ] pat = self.CM["Pat:We_started"] for node in self.CM.Env["nodes"]: thispat = (pat % node) watchpats.append(thispat) # Start all the nodes - one by one... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: self.CM.StartaCM(node) if watch.lookforall(): return self.success() did_fail=0 for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 0: did_fail=1 if did_fail: return self.failure("Did not find start pattern(s): " + repr(watch.unmatched)) return self.failure("All nodes were started but %d may be unstable" %(len(watch.unmatched))) def is_applicable(self): '''StartOnebyOne is always applicable''' return 1 # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) ################################################################### class SimulStart(CTSTest): ################################################################### '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStart" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Setup failed") return self.startall(None) def is_applicable(self): '''SimulStart is always applicable''' return 1 # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) class SimulStop(CTSTest): ################################################################### '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStop" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") return self.stopall(None) def is_applicable(self): '''SimulStop is always applicable''' return 1 # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) class StopOnebyOne(CTSTest): ################################################################### '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StopOnebyOne" self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'StopOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.skipped() did_fail=0 self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.CM.StopaCM(node): did_fail=did_fail + 1 if did_fail: return self.failure("Could not stop %d nodes" %did_fail) return self.success() def is_applicable(self): '''StopOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(StopOnebyOne) class RestartOnebyOne(CTSTest): ################################################################### '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="RestartOnebyOne" self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'RestartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.skipped() did_fail=0 self.starttime=time.time() self.restart = RestartTest(self.CM) for node in self.CM.Env["nodes"]: if not self.restart(node): did_fail=did_fail + 1 if did_fail: return self.failure("Could not restart %d nodes" %did_fail) return self.success() def is_applicable(self): '''RestartOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(RestartOnebyOne) ################################################################### class StandbyTest(CTSTest): ################################################################### '''Put a node in standby mode''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby" self.successpat = self.CM["Pat:StandbyOK"] self.nostandbypat = self.CM["Pat:StandbyNONE"] self.transient = self.CM["Pat:StandbyTRANSIENT"] def __call__(self, node): '''Perform the 'standby' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() if self.CM.upcount() < 2: self.incr("nostandby") pat = self.nostandbypat else: self.incr("standby") pat = self.successpat # # You could make a good argument that the cluster manager # ought to give us good clues on when its a bad time to # switch over to the other side, but heartbeat doesn't... # It could also queue the request. But, heartbeat # doesn't do that either :-) # retrycount=0 while (retrycount < 10): watch = CTS.LogWatcher(self.CM["LogFileName"] , [pat, self.transient] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.rsh(node, self.CM["Standby"]) match = watch.look() if match: if re.search(self.transient, match): self.incr("retries") time.sleep(2) retrycount=retrycount+1 else: return self.success() else: break # No point in retrying... return self.failure("did not find pattern " + pat) def is_applicable(self): '''StandbyTest is applicable when the CM has a Standby command''' if not self.CM.has_key("Standby"): return None else: #if self.CM.Env.has_key("DoStandby"): #flag=self.CM.Env["DoStandby"] #if type(flag) == types.IntType: #return flag #if not re.match("[yt]", flag, re.I): #return None # # We need to strip off everything after the first blank # cmd=self.CM["Standby"] cmd = cmd.split()[0] if not os.access(cmd, os.X_OK): return None cf = self.CM.cf if not cf.Parameters.has_key("auto_failback"): return None elif cf.Parameters["auto_failback"][0] == "legacy": return None return 1 # Register StandbyTest as a good test to run AllTestClasses.append(StandbyTest) ####################################################################### class Fastdetection(CTSTest): ####################################################################### '''Test the time which one node find out the other node is killed very quickly''' def __init__(self,cm,timeout=60): CTSTest.__init__(self, cm) self.name = "DetectionTime" self.they_stopped = self.CM["Pat:They_stopped"] self.timeout = timeout self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.standby = StandbyTest(cm) self.__setitem__("min", 0) self.__setitem__("max", 0) self.__setitem__("totaltime", 0) def __call__(self, node): '''Perform the fastfailureDetection test''' self.incr("calls") ret=self.startall(None) if not ret: return self.skipped() if self.CM.upcount() < 2: return self.skipped() # Make sure they're not holding any resources ret = self.standby(node) if not ret: return ret stoppat = (self.they_stopped % node) stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout) stopwatch.setwatch() if self.CM.rsh(node, "killall -9 heartbeat")==0: Starttime = os.times()[4] if stopwatch.look(): Stoptime = os.times()[4] self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") Detectiontime = Stoptime-Starttime detectms = int(Detectiontime*1000+0.5) self.CM.log("...failure detection time: %d ms" % detectms) self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime if self.Stats["min"] == 0: self.Stats["min"] = Detectiontime if Detectiontime > self.Stats["max"]: self.Stats["max"] = Detectiontime if Detectiontime < self.Stats["min"]: self.Stats["min"] = Detectiontime self.CM.ShouldBeStatus[node] = self.CM["down"] self.start(node) return self.success() else: self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") self.CM.ShouldBeStatus[node] = self.CM["down"] ret=self.start(node) return self.failure("Didn't find the log message") else: return self.failure("Couldn't stop heartbeat") def is_applicable(self): '''This test is applicable when auto_failback != legacy''' return self.standby.is_applicable() def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ "ccm.*ERROR: ccm_control_process:failure to send protoversion request" , "ccm.*ERROR: Lost connection to heartbeat service. Need to bail out" ] AllTestClasses.append(Fastdetection) ############################################################################## class BandwidthTest(CTSTest): ############################################################################## # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which heartbeat uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) self.tempfile = tempfile.mktemp(".cts") self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount()<1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] - if port == None: - port = 694 port = int(port) ret = self.startall(None) if not ret: return self.skipped() time.sleep(5) # We get extra messages right after startup. fstmpfile = "/tmp/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile) rc = self.CM.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) self.CM.rsh.cp(farfile, self.tempfile) Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.CM.log("Could not compute bandwidth.") return self.success() intband = int(Bandwidth + 0.5) self.CM.log("...heartbeat bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.CM.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): fp = open(file, "r") fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count=count+1 linesplit = string.split(line," ") for j in range(len(linesplit)-1): if linesplit[j]=="udp": break if linesplit[j]=="length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = string.split(T1,":") time2split = string.split(timesplit[2],".") time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = string.split(line," ") for j in range(len(linessplit)-1): if linessplit[j] =="udp": break if linesplit[j]=="length:": break try: sum=int(linessplit[j+1])+sum except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = string.split(T2,":") time2split = string.split(timesplit[2],".") time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return (sum*8)/time def is_applicable(self): '''BandwidthTest is always applicable''' return 1 AllTestClasses.append(BandwidthTest) ########################################################################## class RedundantpathTest(CTSTest): ########################################################################## '''In heartbeat, it has redundant path to communicate between the cluster''' # # Tests should not be cluster-manager specific # One needs to isolate what you need from the cluster manager and then # add a (new) API to do it. # def __init__(self,cm,timeout=60): CTSTest.__init__(self,cm) self.name = "RedundantpathTest" self.timeout = timeout def PathCount(self): '''Return number of communication paths''' Path = self.CM.InternalCommConfig() cf = self.CM.cf eths = [] serials = [] num = 0 for interface in Path["interface"]: if re.search("eth",interface): eths.append(interface) num = num + 1 if re.search("/dev",interface): serials.append(interface) num = num + 1 return (num, eths, serials) def __call__(self,node): '''Perform redundant path test''' self.incr("calls") if self.CM.ShouldBeStatus[node]!=self.CM["up"]: return self.skipped() (num, eths, serials) = self.PathCount() for eth in eths: if self.CM.rsh(node,"ifconfig %s down" % eth)==0: PathDown = "OK" break if PathDown != "OK": for serial in serials: if self.CM.rsh(node,"setserial %s uart none" % serial)==0: PathDown = "OK" break if PathDown != "OK": return self.failure("Cannot break the path") time.sleep(self.timeout) for audit in CTSaudits.AuditList(self.CM): if not audit(): for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.failure("Redundant path fail") for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.success() def is_applicable(self): '''It is applicable when you have more than one connection''' return self.PathCount()[0] > 1 # FIXME!! Why is this one commented out? #AllTestClasses.append(RedundantpathTest) ########################################################################## class DRBDTest(CTSTest): ########################################################################## '''In heartbeat, it provides replicated storage.''' def __init__(self,cm, timeout=10): CTSTest.__init__(self,cm) self.name = "DRBD" self.timeout = timeout def __call__(self, dummy): '''Perform the 'DRBD' test.''' self.incr("calls") for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() # Note: All these special cases with Start/Stop/StatusDRBD # should be reworked to use resource objects instead of # being hardwired to bypass the objects here. for node in self.CM.Env["nodes"]: done=time.time()+self.timeout+1 while (time.time()done: return self.failure("Can't start drbd, please check it") device={} for node in self.CM.Env["nodes"]: device[node]=self.getdevice(node) node = self.CM.Env["nodes"][0] done=time.time()+self.timeout+1 while 1: if (time.time()>done): return self.failure("the drbd could't sync") self.CM.rsh(node,"cp /proc/drbd /tmp >/dev/null 2>&1") if self.CM.rsh.cp("%s:/tmp/drbd" % node,"/tmp"): line = open("/tmp/drbd").readlines()[2] p = line.find("Primary") s1 = line.find("Secondary") s2 = line.rfind("Secondary") if s1!=s2: if self.CM.rsh(node,"drbdsetup %s primary" % device[node]): pass if p!=-1: if p