diff --git a/cts/CM_LinuxHAv2.py.in b/cts/CM_LinuxHAv2.py.in index ea2f42ebe1..116a217236 100755 --- a/cts/CM_LinuxHAv2.py.in +++ b/cts/CM_LinuxHAv2.py.in @@ -1,349 +1,533 @@ #!@PYTHON@ '''CTS: Cluster Testing System: LinuxHA v2 dependent modules... ''' __copyright__=''' Author: Huang Zhen Copyright (C) 2004 International Business Machines + +Additional Audits: Andrew Beekhof ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import CTS from CTS import * from CM_hb import HeartbeatCM from xml.dom.minidom import * import CTSaudits from CTSaudits import ClusterAudit import CTStests from CTStests import * ####################################################################### # # LinuxHA v2 dependent modules # ####################################################################### class LinuxHAv2(HeartbeatCM): ''' The linux-ha version 2 cluster manager class. It implements the things we need to talk to and manipulate linux-ha version 2 clusters ''' def __init__(self, Environment, randseed=None): HeartbeatCM.__init__(self, Environment, randseed=randseed) self.update({ "Name" : "linux-ha-v2", "DeadTime" : 90, "StartCmd" : "@libdir@/heartbeat/heartbeat >/dev/null 2>&1", "StopCmd" : "@libdir@/heartbeat/heartbeat -k", "StatusCmd" : "@libdir@/heartbeat/crmadmin -S %s 2>/dev/null", + "EpocheCmd" : "@libdir@/heartbeat/ccm_epoche", "IsRscRunning" : "@libdir@/heartbeat/lrmadmin -E %s status 0 0 EVERYTIME 2>/dev/null|grep return", + "IsIPAddrRscRunning" : "", "ExecuteRscOp" : "@libdir@/heartbeat/lrmadmin -E %s %s 0 0 EVERYTIME 2>/dev/null", "CIBfile" : "%s:@HA_VARLIBDIR@/heartbeat/crm/cib.xml", # Patterns to look for in the log files for various occasions... "Pat:We_started" : " %s crmd: .* State transition .*-> (S_NOT_DC|S_IDLE)", "Pat:They_started" : " %s crmd: .* State transition .*-> (S_NOT_DC|S_IDLE)", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"Shutting down\.", r"Forcing shutdown\.", r"Timer I_TERMINATE just popped", r"Both machines own .* resources!", r"No one owns .* resources!", r", exiting\.", r"ERROR:", - r"CRIT.*:", + r"CRIT:", ), }) -# self.rsh.cp(self.Env["cib_config"], self["CIBfile"]%self.Env["nodes"][0]) + default_cts_cib=''' + + + + + + + + + + + + + + + + +''' + a_resource=''' + + + + + + + +''' # KLUDGE! Expedient, but a Kludge (FIXME) # CTStests.AllTestClasses = [FlipTest,RestartTest,StartOnebyOne,SimulStart,SimulStop,Split_brainTest,BandwidthTest] CTStests.AllTestClasses = [FlipTest, RestartTest, StartOnebyOne, SimulStart, SimulStop] - CTSaudits.AllAuditClasses = [CrmdStateAudit, HAResourceAudit] +# CTSaudits.AllAuditClasses = [CrmdStateAudit, HAResourceAudit] + CTSaudits.AllAuditClasses = [CrmdStateAudit, DcAudit, DcIPaddrAudit] + if self.Env["ClobberCIB"] != None: + if self.Env["CIBfilename"] == None: + os.system("echo " + default_cts_cib + " > /tmp/cts.default.cib") + self.Env["CIBfilename"] = "/tmp/cts.default.cib" + for node in self.Env["nodes"]: + os.system("scp /tmp/cts.default.cib root@"+self["CIBfile"]%(node)) +# self.rsh.cp(self.Env["CIBfilename"], self["CIBfile"]%node) def StataCM(self, node): '''Report the status of the cluster manager on a given node''' out=self.rsh.readaline(node, self["StatusCmd"]%node) ret= (string.find(out, 'ok') != -1) try: if ret: if self.ShouldBeStatus[node] != self["up"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["up"], self.ShouldBeStatus[node])) else: if self.ShouldBeStatus[node] != self["down"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["down"], self.ShouldBeStatus[node])) except KeyError: pass if ret: self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] return ret def StartaCM(self, node): '''Start up the cluster manager on a given node''' watch = CTS.LogWatcher(self["LogFileName"] , [self["Pat:We_started"]%node] , 60) watch.setwatch() self.log ("CM_LinuxHAv2.py: Starting %s on node %s" %(self["Name"], node)) self.rsh(node, self["StartCmd"]) if watch.look(): self.ShouldBeStatus[node]=self["up"] return 1 self.ShouldBeStatus[node]=self["down"] self.log ("Could not start %s on node %s" % (self["Name"], node)) return None def Configuration(self): if not self.rsh.cp(self["CIBfile"]%self.Env["nodes"][0],self.Env["HAdir"]): raise ValueError("Can not copy file to %s, maybe permission denied"%self.Env["HAdir"]) cib=parse("%s/cib.xml"%self.Env["HAdir"]) return cib.getElementsByTagName('configuration')[0] def Resources(self): ResourceList = [] #read resources in cib configuration=self.Configuration() resources=configuration.getElementsByTagName('resources')[0] rscs=configuration.getElementsByTagName('resource') for rsc in rscs: ResourceList.append(HAResource(self,rsc)) return ResourceList def Dependancies(self): DependancyList = [] #read dependancy in cib configuration=self.Configuration() constraints=configuration.getElementsByTagName('constraints')[0] rsc_to_rscs=configuration.getElementsByTagName('rsc_to_rsc') for node in rsc_to_rscs: dependancy = {} dependancy["id"]=node.getAttribute('id') dependancy["from"]=node.getAttribute('from') dependancy["to"]=node.getAttribute('to') dependancy["type"]=node.getAttribute('type') dependancy["strength"]=node.getAttribute('strength') DependancyList.append(dependancy) return DependancyList class HAResourceAudit(ClusterAudit): def __init__(self, cm): self.CM = cm def _RscRunningNodes(self, resource): ResourceNodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: if resource.IsRunningOn(node): ResourceNodes.append(node) return ResourceNodes def __call__(self): self.CM.log ("Do Audit %s"%self.name()) passed = 1 NodeofRsc = {} #Make sure the resouces are running on one and only one node Resources = self.CM.Resources() for resource in Resources : RunningNodes = self._RscRunningNodes(resource) NodeofRsc[resource.rid]=RunningNodes if len(RunningNodes) == 0 : print resource.rid + " isn't running anywhere" passed = 0 if len(RunningNodes) > 1: print resource.rid + " is running more than once: " \ + str(RunningNodes) passed = 0 #Make sure the resouces with "must","placement" constraint are running on the same node Dependancies = self.CM.Dependancies() for dependancy in Dependancies: if dependancy["type"] == "placement" and dependancy["strength"] == "must": if NodeofRsc[dependancy["from"]] != NodeofRsc[dependancy["to"]]: print dependancy["from"] + " and " + dependancy["to"] + " should be run on same node" passed = 0 return passed def name(self): return "HAResourceAudit" class HAResource(Resource): def __init__(self, cm, node): ''' Get information from xml node ''' self.rid = node.getAttribute('id') self.rclass = node.getAttribute('class') self.rtype = node.getAttribute('type') self.rparameters = {} attributes = node.getElementsByTagName('instance_attributes')[0] parameters = node.getElementsByTagName('rsc_parameters')[0] nvpairs = node.getElementsByTagName('nvpair') for nvpair in nvpairs: name=nvpair.getAttribute('name') value=nvpair.getAttribute('value') self.rparameters[name]=value Resource.__init__(self, cm, self.rtype, self.rid) def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' out=self.CM.rsh.readaline(nodename, self.CM["IsRscRunning"]%self.rid) return re.search("0",out) def _ResourceOperation(self, operation, nodename): ''' Execute an operation on the resource ''' self.CM.rsh.readaline(nodename, self.CM["ExecuteRscOp"]%(self.rid,operation)) return self.CM.rsh.lastrc == 0 def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self._ResourceOperation("start", nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self._ResourceOperation("stop", nodename) def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", nodename) class CrmdStateAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def __call__(self): self.CM.log ("Do Audit %s"%self.name()) passed = 1 dc_count = 0 up_count = 0 node_count = 0 up_are_down = 0 down_are_up = 0 slave_count = 0 unstable_count = 0 for node in self.CM.Env["nodes"]: out=self.CM.rsh.readaline(node, self.CM["StatusCmd"]%node) ret = (string.find(out, 'ok') != -1) node_count = node_count + 1 if ret: up_count = up_count + 1 if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.log( "Node %s %s when it should be %s" % (node, self.CM["up"], self.CM.ShouldBeStatus[node])) self.CM.ShouldBeStatus[node] = self.CM["up"] down_are_up = down_are_up + 1 ret= (string.find(out, 'S_NOT_DC') != -1) if ret: slave_count = slave_count + 1 else: ret= (string.find(out, 'S_IDLE') != -1) if ret: dc_count = dc_count + 1 else: unstable_count = unstable_count + 1 else: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.log( "Node %s %s when it should be %s" % (node, self.CM["down"], self.CM.ShouldBeStatus[node])) self.CM.ShouldBeStatus[node] = self.CM["down"] up_are_down = up_are_down + 1 if up_count > 0 and dc_count != 1: passed = 0 self.CM.log("Exactly 1 node should be DC. We found %d (of %d)" %(dc_count, up_count)) if unstable_count > 0: passed = 0 self.CM.log("Cluster is not stable. We found %d (of %d) unstable nodes" %(dc_count, up_count)) if up_are_down > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be up were down." %(up_are_down, node_count)) if down_are_up > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be down were up." %(down_are_up, node_count)) return passed def name(self): return "CrmdStateAudit" +class DcIPaddrAudit(ClusterAudit): + def __init__(self, cm): + self.CM = cm + self.Stats = {"calls":0 + , "success":0 + , "failure":0 + , "skipped":0 + , "auditfail":0} + + def has_key(self, key): + return self.Stats.has_key(key) + + def __setitem__(self, key, value): + self.Stats[key] = value + + def __getitem__(self, key): + return self.Stats[key] + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not self.Stats.has_key(name): + self.Stats[name]=0 + self.Stats[name] = self.Stats[name]+1 + + def __call__(self): + self.CM.log ("Do Audit %s"%self.name()) + passed = 1 + + the_dc = self.find_dc() + if the_dc == None: + return passed + + #Make sure the resouces are running on one and only one node + Resources = self.CM.Resources() + for resource in Resources : + if resource.rid == "DcIPaddr": + if self.audit_ip_addr(resource, the_dc) == 0: + passed = 0 + + return passed + + def is_node_dc(self, node): + out=self.CM.rsh.readaline(node, self.CM["StatusCmd"]%node) + return (string.find(out, 'S_IDLE') != -1) + + def audit_ip_addr(self, resource, node): + self.CM.log ("Auditing %s"%(resource)) + RunningNodes = self._RscRunningNodes(resource) + if len(RunningNodes) == 0 : + self.CM.log("%s is not running" %(resource)) + return 0 + + if len(RunningNodes) > 1: + self.CM.log("%s is running more than once" %(resource)) + + for running_on in RunningNodes: + if self.is_node_dc(running_on) == 0: + self.CM.log("%s is running on a non-DC node %s" + %(resource, running_on)) + return 0 + + return 1 + + def name(self): + return "DcIPaddrAudit" + + def find_dc(self): + for node in self.CM.Env["nodes"]: + if self.is_node_dc(node): + return node + return None + + def _RscRunningNodes(self, resource): + ResourceNodes = [] + for node in self.CM.Env["nodes"]: + if resource.IsRunningOn(node): + ResourceNodes.append(node) + return ResourceNodes + +class DcAudit(ClusterAudit): + def __init__(self, cm): + self.CM = cm + self.Stats = {"calls":0 + , "success":0 + , "failure":0 + , "skipped":0 + , "auditfail":0} + self.NodeEpoche={} + + def has_key(self, key): + return self.Stats.has_key(key) + + def __setitem__(self, key, value): + self.Stats[key] = value + + def __getitem__(self, key): + return self.Stats[key] + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not self.Stats.has_key(name): + self.Stats[name]=0 + self.Stats[name] = self.Stats[name]+1 + + def __call__(self): + self.CM.log ("Do Audit %s"%self.name()) + passed = 0 + lowest_epoche = None + nodes_up = 0 + + dc_allowed_list=[] + + for node in self.CM.Env["nodes"]: + if self.CM.ShouldBeStatus[node] == self.CM["up"]: + nodes_up = nodes_up + 1 + out=self.CM.rsh.readaline(node, self.CM["EpocheCmd"]) + self.NodeEpoche[node] = out + if lowest_epoche == None or self.NodeEpoche[node] < lowest_epoche: + lowest_epoche = self.NodeEpoche[node] + + if nodes_up == 0: + print ("No nodes running") + return 1 + + for node in self.CM.Env["nodes"]: + if self.CM.ShouldBeStatus[node] == self.CM["up"]: + if self.NodeEpoche[node] == lowest_epoche: + dc_allowed_list.append(node) + + for node in dc_allowed_list: + if self.is_node_dc(node): + passed = 1 + + if passed == 0: + self.CM.log ("DC not found on any of the %d allowed nodes" + %(len(dc_allowed_list))) + + return passed + + def is_node_dc(self, node): + out=self.CM.rsh.readaline(node, self.CM["StatusCmd"]%node) + return (string.find(out, 'S_IDLE') != -1) + + def name(self): + return "DcAudit" + ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': pass diff --git a/cts/CTS.py.in b/cts/CTS.py.in index 42a557af46..d6ea251c84 100755 --- a/cts/CTS.py.in +++ b/cts/CTS.py.in @@ -1,831 +1,837 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Main module Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import types, string, select, sys, time, re, os, struct, os, signal from UserDict import UserDict from syslog import * from popen2 import Popen3 class RemoteExec: '''This is an abstract remote execution class. It runs a command on another machine - somehow. The somehow is up to us. This particular class uses ssh. Most of the work is done by fork/exec of ssh or scp. ''' def __init__(self): # -n: no stdin, -x: no X11 self.Command = "@SSH@ -l root -n -x" # -f: ssh to background self.CommandnoBlock = "@SSH@ -f -l root -n -x" # -B: batch mode, -q: no stats (quiet) self.CpCommand = "@SCP@ -B -q" def setcmd(self, rshcommand): '''Set the name of the remote shell command''' self.Command = rshcommand def _fixcmd(self, cmd): return re.sub("\'", "'\\''", cmd) def _cmd(self, *args): '''Compute the string that will run the given command on the given remote system''' args= args[0] sysname = args[0] command = args[1] ret = self.Command + " " + sysname + " '" + self._fixcmd(command) + "'" #print ("About to run %s\n" % ret) return ret def __call__(self, *args): '''Run the given command on the given remote system If you call this class like a function, this is the function that gets called. It just runs it roughly as though it were a system() call on the remote machine. The first argument is name of the machine to run it on. ''' count=0; rc = 0; while count < 3: rc = os.system(self._cmd(args)) if rc == 0: return rc print "Retrying command %s" % self._cmd(args) count=count+1 return rc def popen(self, *args): '''popen the given remote command on the remote system. As in __call__, the first argument is name of the machine to run it on. ''' #print "Now running %s\n" % self._cmd(args) return Popen3(self._cmd(args), None) def readaline(self, *args): '''Run a command on the remote machine and capture 1 line of stdout from the given remote command As in __call__, the first argument is name of the machine to run it on. ''' p = self.popen(args[0], args[1]) p.tochild.close() result = p.fromchild.readline() p.fromchild.close() self.lastrc = p.wait() return result def cp(self, *args): '''Perform a remote copy''' cpstring=self.CpCommand for arg in args: cpstring = cpstring + " \'" + arg + "\'" return os.system(cpstring) == 0 def noBlock(self, *args): '''Perform a remote execution without waiting for it to finish''' sshnoBlock = self.CommandnoBlock for arg in args: sshnoBlock = sshnoBlock + " \'" + arg + "\'" return os.system(sshnoBlock) == 0 class LogWatcher: '''This class watches logs for messages that fit certain regular expressions. Watching logs for events isn't the ideal way to do business, but it's better than nothing :-) On the other hand, this class is really pretty cool ;-) The way you use this class is as follows: Construct a LogWatcher object Call setwatch() when you want to start watching the log Call look() to scan the log looking for the patterns ''' def __init__(self, log, regexes, timeout=10, debug=None): '''This is the constructor for the LogWatcher class. It takes a log name to watch, and a list of regular expressions to watch for." ''' # Validate our arguments. Better sooner than later ;-) for regex in regexes: assert re.compile(regex) self.regexes = regexes self.filename = log self.debug=debug self.whichmatch = -1 self.unmatched = None if self.debug: print "Debug now on for for log", log self.Timeout = int(timeout) self.returnonlymatch = None if not os.access(log, os.R_OK): raise ValueError("File [" + log + "] not accessible (r)") def setwatch(self, frombeginning=None): '''Mark the place to start watching the log from. ''' self.file = open(self.filename, "r") self.size = os.path.getsize(self.filename) if not frombeginning: self.file.seek(0,2) def ReturnOnlyMatch(self, onlymatch=1): '''Mark the place to start watching the log from. ''' self.returnonlymatch = onlymatch def look(self, timeout=None): '''Examine the log looking for the given patterns. It starts looking from the place marked by setwatch(). This function looks in the file in the fashion of tail -f. It properly recovers from log file truncation, but not from removing and recreating the log. It would be nice if it recovered from this as well :-) We return the first line which matches any of our patterns. ''' if timeout == None: timeout = self.Timeout done=time.time()+timeout+1 while (time.time() <= done): newsize=os.path.getsize(self.filename) if self.debug: print "newsize = %d" % newsize if newsize < self.size: # Somebody truncated the log! if self.debug: print "Log truncated!" self.setwatch(frombeginning=1) continue if newsize > self.file.tell(): line=self.file.readline() if self.debug: print "Looking at line:", line if line: which=-1 for regex in self.regexes: which=which+1 if self.debug: print "Comparing line to ", regex matchobj = re.search(regex, line) if matchobj: self.whichmatch=which if self.returnonlymatch: return matchobj.group(self.returnonlymatch) else: return line newsize=os.path.getsize(self.filename) if self.file.tell() == newsize: if timeout > 0: time.sleep(0.025) else: return None return None def lookforall(self, timeout=None): '''Examine the log looking for ALL of the given patterns. It starts looking from the place marked by setwatch(). We return when the timeout is reached, or when we have found ALL of the regexes that were part of the watch ''' if timeout == None: timeout = self.Timeout save_regexes = self.regexes returnresult = [] while (len(self.regexes) > 0): oneresult = self.look(timeout) if not oneresult: self.unmatched = self.regexes self.regexes = save_regexes return None returnresult.append(oneresult) del self.regexes[self.whichmatch] self.unmatched = None self.regexes = save_regexes return returnresult class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def __InitialConditions(self): #if os.geteuid() != 0: # raise ValueError("Must Be Root!") None def _finalConditions(self): for key in self.keys(): if self[key] == None: raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") def __init__(self, Environment, randseed=None): self.Env = Environment self.__InitialConditions() self.data = { "up" : "up", # Status meaning up "down" : "down", # Status meaning down "StonithCmd" : "@sbindir@/stonith -t baytech -p '10.10.10.100 admin admin' %s", "DeadTime" : 30, # Max time to detect dead node... "StartTime" : 90, # Max time to start up # # These next values need to be overridden in the derived class. # "Name" : None, "StartCmd" : None, "StopCmd" : None, "StatusCmd" : None, "RereadCmd" : None, "StartDRBDCmd" : None, "StopDRBDCmd" : None, "StatusDRBDCmd" : None, "DRBDCheckconf" : None, "BreakCommCmd" : None, "FixCommCmd" : None, "TestConfigDir" : None, "LogFileName" : None, "Pat:We_started" : None, "Pat:They_started" : None, "Pat:We_stopped" : None, "Pat:They_stopped" : None, "BadRegexes" : None, # A set of "bad news" regexes # to apply to the log } self.rsh = RemoteExec() self.ShouldBeStatus={} self.OurNode=string.lower(os.uname()[1]) self.ShouldBeStatus={} def log(self, args): self.Env.log(args) def prepare(self): '''Finish the Initialization process. Prepare to test...''' for node in self.Env["nodes"]: if self.StataCM(node): self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] def upcount(self): '''How many nodes are up?''' count=0 for node in self.Env["nodes"]: if self.ShouldBeStatus[node]==self["up"]: count=count+1 return count def TruncLogs(self): '''Truncate the log for the cluster manager so we can start clean''' if self["LogFileName"] != None: os.system("cp /dev/null " + self["LogFileName"]) def StartaCM(self, node): '''Start up the cluster manager on a given node''' + self.log ("Starting %s on node %s" + % (self["Name"], node)) rc=self.rsh(node, self["StartCmd"]) if rc == 0: self.ShouldBeStatus[node]=self["up"] return 1 else: self.log ("Could not start %s on node %s" % (self["Name"], node)) return None def StartaCMnoBlock(self, node): '''Start up the cluster manager on a given node with none-block mode''' self.rsh.noBlock(node, self["StartCmd"]) self.ShouldBeStatus[node]=self["up"] return 1 def StopaCM(self, node): '''Stop the cluster manager on a given node''' + self.log ("Stopping %s on node %s" + % (self["Name"], node)) + rc=self.rsh(node, self["StopCmd"]) if rc == 0: self.ShouldBeStatus[node]=self["down"] return 1 else: self.log ("Could not stop %s on node %s" % (self["Name"], node)) return None def StopaCMnoBlock(self, node): '''Stop the cluster manager on a given node with none-block mode''' self.rsh.noBlock(node, self["StopCmd"]) self.ShouldBeStatus[node]=self["down"] return 1 def RereadCM(self, node): '''Force the cluster manager on a given node to reread its config This may be a no-op on certain cluster managers. ''' rc=self.rsh(node, self["RereadCmd"]) if rc == 0: return 1 else: self.log ("Could not force %s on node %s to reread its config" % (self["Name"], node)) return None def StataCM(self, node): '''Report the status of the cluster manager on a given node''' out=self.rsh.readaline(node, self["StatusCmd"]) ret= (string.find(out, 'stopped') == -1) try: if ret: if self.ShouldBeStatus[node] != self["up"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["up"], self.ShouldBeStatus[node])) else: if self.ShouldBeStatus[node] != self["down"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["down"], self.ShouldBeStatus[node])) except KeyError: pass if ret: self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] return ret def startall(self, nodelist=None): '''Start the cluster manager on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist=self.Env["nodes"] for node in nodelist: if self.ShouldBeStatus[node] == self["down"]: self.StartaCM(node) def stopall(self, nodelist=None): '''Stop the cluster managers on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist=self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: self.StopaCM(node) def rereadall(self, nodelist=None): '''Force the cluster managers on every node in the cluster to reread their config files. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist=self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: self.RereadCM(node) def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result={} if not nodelist: nodelist=self.Env["nodes"] for node in nodelist: if self.StataCM(node): result[node] = self["up"] else: result[node] = self["down"] return result def isolate_node(self, node): '''isolate the communication between the nodes''' rc = self.rsh(node, self["BreakCommCmd"]) if rc == 0: return 1 else: self.log("Could not break the communication between the nodes frome node: %s" % node) return None def unisolate_node(self, node): '''fix the communication between the nodes''' rc = self.rsh(node, self["FixCommCmd"]) if rc == 0: return 1 else: self.log("Could not fix the communication between the nodes from node: %s" % node) return None def SyncTestConfigs(self): '''Synchronize test configurations throughout the cluster. This one's a no-op for FailSafe, since it does that by itself. ''' fromdir=self["TestConfigDir"] if not os.access(fromdir, os.F_OK | os.R_OK | os.W_OK): raise ValueError("Directory [" + fromdir + "] not accessible (rwx)") for node in self.Env["nodes"]: if node == self.OurNode: continue self.log("Syncing test configurations on " + node) # Perhaps I ought to use rsync... self.rsh.cp("-r", fromdir, node + ":" + fromdir) def SetClusterConfig(self, configpath="default", nodelist=None): '''Activate the named test configuration throughout the cluster. It would be useful to implement this :-) ''' pass return 1 def ResourceGroups(self): "Return a list of resource type/instance pairs for the cluster" raise ValueError("Abstract Class member (ResourceGroups)") def InternalCommConfig(self): "Return a list of paths: each patch consists of a tuple" raise ValueError("Abstract Class member (InternalCommConfig)") def HasQuorum(self): "Return TRUE if the cluster currently has quorum" raise ValueError("Abstract Class member (HasQuorum)") class Resource: ''' This is an HA resource (not a resource group). A resource group is just an ordered list of Resource objects. ''' def __init__(self, cm, rsctype=None, instance=None): self.CM = cm self.ResourceType = rsctype self.Instance = instance def Type(self): return self.ResourceType def Instance(self, nodename): return self.Instance def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. It is analagous to the "status" operation on SystemV init scripts and heartbeat scripts. FailSafe calls it the "exclusive" operation. ''' raise ValueError("Abstract Class member (IsRunningOn)") return None def IsWorkingCorrectly(self, nodename): ''' This member function returns true if our resource is operating correctly on the given node in the cluster. Heartbeat does not require this operation, but it might be called the Monitor operation, which is what FailSafe calls it. For remotely monitorable resources (like IP addresses), they *should* be monitored remotely for testing. ''' raise ValueError("Abstract Class member (IsWorkingCorrectly)") return None def Start(self, nodename): ''' This member function starts or activates the resource. ''' raise ValueError("Abstract Class member (Start)") return None def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' raise ValueError("Abstract Class member (Stop)") return None def __repr__(self): if (self.Instance and len(self.Instance) > 1): return "{" + self.ResourceType + "::" + self.Instance + "}" else: return "{" + self.ResourceType + "}" class ScenarioComponent: def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return TRUE if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' raise ValueError("Abstract Class member (IsApplicable)") def SetUp(self, CM): '''Set up the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") class Scenario: ( '''The basic idea of a scenario is that of an ordered list of ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, and then after the tests have been run, they are torn down using TearDown() (in reverse order). A Scenario is applicable to a particular cluster manager iff each ScenarioComponent is applicable. A partially set up scenario is torn down if it fails during setup. ''') def __init__(self, Components): "Initialize the Scenario from the list of ScenarioComponents" for comp in Components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of" " ScenarioComponent") self.Components = Components def IsApplicable(self): ( '''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() ''' ) for comp in self.Components: if not comp.IsApplicable(): return None return 1 def SetUp(self, CM): '''Set up the Scenario. Return TRUE on success.''' j=0 while j < len(self.Components): if not self.Components[j].SetUp(CM): # OOPS! We failed. Tear partial setups down. CM.log("Tearing down partial setup") self.TearDown(CM, j) return None j=j+1 return 1 def TearDown(self, CM, max=None): '''Tear Down the Scenario - in reverse order.''' if max == None: max = len(self.Components)-1 j=max while j >= 0: self.Components[j].TearDown(CM) j=j-1 class InitClusterManager(ScenarioComponent): ( '''InitClusterManager is the most basic of ScenarioComponents. This ScenarioComponent simply starts the cluster manager on all the nodes. It is fairly robust as it waits for all nodes to come up before starting as they might have been rebooted or crashed for some reason beforehand. ''') def IsApplicable(self): '''InitClusterManager is so generic it is always Applicable''' return 1 def _IsNodeBooted(self, node): '''Return TRUE if the given node is booted (responds to pings''' return os.system("@PING@ -nq -c1 @PING_TIMEOUT_OPT@ %s >/dev/null 2>&1" % node) == 0 def _WaitForNodeToComeUp(self, node, Timeout=300): '''Return TRUE when given node comes up, or FALSE if timeout''' timeout=Timeout anytimeouts=0 while timeout > 0: if self._IsNodeBooted(node): if anytimeouts: # Fudge to wait for the system to finish coming up time.sleep(30) self.Env.log("Node %s now up" % node) return 1 time.sleep(1) if (not anytimeouts): self.Env.log("Waiting for node %s to come up" % node) anytimeouts=1 timeout = timeout - 1 self.Env.log("%s did not come up within %d tries" % (node, Timeout)) return None def _WaitForAllNodesToComeUp(self, nodes, timeout=300): '''Return TRUE when all nodes come up, or FALSE if timeout''' for node in nodes: if not self._WaitForNodeToComeUp(node, timeout): return None return 1 def SetUp(self, CM): '''Basic Cluster Manager startup. Start everything''' if not self._WaitForAllNodesToComeUp(CM.Env["nodes"]): return None CM.prepare() # Clear out the cobwebs ;-) self.TearDown(CM) + for node in CM.Env["nodes"]: CM.rsh(node, CM["DelFileCommCmd"]+ "; true") # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all nodes.") CM.startall() return 1 def TearDown(self, CM): '''Set up the given ScenarioComponent''' self._WaitForAllNodesToComeUp(CM.Env["nodes"]) # Stop the cluster manager everywhere CM.log("Stopping Cluster Manager on all nodes") CM.stopall() class PingFest(ScenarioComponent): ( '''PingFest does a flood ping to each node in the cluster from the test machine. If the LabEnvironment Parameter PingSize is set, it will be used as the size of ping packet requested (via the -s option). If it is not set, it defaults to 1024 bytes. According to the manual page for ping: Outputs packets as fast as they come back or one hundred times per second, whichever is more. For every ECHO_REQUEST sent a period ``.'' is printed, while for every ECHO_REPLY received a backspace is printed. This provides a rapid display of how many packets are being dropped. Only the super-user may use this option. This can be very hard on a net- work and should be used with caution. ''' ) def __init__(self, Env): self.Env = Env def IsApplicable(self): '''PingFests are always applicable ;-) ''' return 1 def SetUp(self, CM): '''Start the PingFest!''' self.PingSize=1024 if CM.Env.has_key("PingSize"): self.PingSize=CM.Env["PingSize"] CM.log("Starting %d byte flood pings" % self.PingSize) self.PingPids=[] for node in CM.Env["nodes"]: self.PingPids.append(self._pingchild(node)) CM.log("Ping PIDs: " + repr(self.PingPids)) return 1 def TearDown(self, CM): '''Stop it right now! My ears are pinging!!''' for pid in self.PingPids: if pid != None: CM.log("Stopping ping process %d" % pid) os.kill(pid, signal.SIGKILL) def _pingchild(self, node): Args = ["ping", "-qfn", "-s", str(self.PingSize), node] sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid < 0: self.Env.log("Cannot fork ping child") return None if pid > 0: return pid # Otherwise, we're the child process. os.execvp("ping", Args) self.Env.log("Cannot execvp ping: " + repr(Args)) sys.exit(1) diff --git a/cts/CTSlab.py.in b/cts/CTSlab.py.in index 7160865062..f19019583e 100755 --- a/cts/CTSlab.py.in +++ b/cts/CTSlab.py.in @@ -1,420 +1,429 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Lab environment module ''' __copyright__=''' Copyright (C) 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. from UserDict import UserDict import sys, time, types, syslog, whrandom, os, struct, string from CTS import ClusterManager from CM_hb import HeartbeatCM from socket import gethostbyname_ex class ResetMechanism: def reset(self, node): raise ValueError("Abstract class member (reset)") class Stonith(ResetMechanism): def __init__(self, sttype="ssh", parm="foobar" , path="@sbindir@/stonith"): self.pathname=path self.configstring=parm self.stonithtype=sttype def reset(self, node): cmdstring = "%s -t '%s' -p '%s' '%s' 2>/dev/null" % (self.pathname , self.stonithtype, self.configstring, node) return (os.system(cmdstring) == 0) class Logger: TimeFormat = "%Y/%m/%d_%H:%M:%S\t" def __call__(self, lines): raise ValueError("Abstract class member (__call__)") class SysLog(Logger): defaultsource="CTS" defaultlevel= (23 << 3) def __init__(self, labinfo): if labinfo.has_key("syslogsource"): self.source=labinfo["syslogsource"] else: self.source=SysLog.defaultsource if labinfo.has_key("sysloglevel"): self.level=labinfo["sysloglevel"] else: self.level=SysLog.defaultlevel syslog.openlog(self.source, 0, self.level) def __call__(self, lines): if isinstance(lines, types.StringType): syslog.syslog(lines) else: for line in lines: syslog.syslog(line) class StdErrLog(Logger): def __init__(self, labinfo): pass def __call__(self, lines): t = time.strftime(Logger.TimeFormat, time.localtime(time.time())) if isinstance(lines, types.StringType): sys.__stderr__.writelines([t, lines, "\n"]) else: for line in lines: sys.__stderr__.writelines([t, line, "\n"]) sys.__stderr__.flush() class FileLog(Logger): def __init__(self, labinfo, filename=None): if filename == None: filename=labinfo["logfile"] self.logfile=filename def __call__(self, lines): fd = open(self.logfile, "a") t = time.strftime(Logger.TimeFormat, time.localtime(time.time())) if isinstance(lines, types.StringType): fd.writelines([t, lines, "\n"]) else: for line in lines: fd.writelines([t, line, "\n"]) fd.close() class CtsLab(UserDict): '''This class defines the Lab Environment for the Cluster Test System. It defines those things which are expected to change from test environment to test environment for the same cluster manager. It is where you define the set of nodes that are in your test lab what kind of reset mechanism you use, etc. This class is derived from a UserDict because we hold many different parameters of different kinds, and this provides provide a uniform and extensible interface useful for any kind of communication between the user/administrator/tester and CTS. At this point in time, it is the intent of this class to model static configuration and/or environmental data about the environment which doesn't change as the tests proceed. Well-known names (keys) are an important concept in this class. The HasMinimalKeys member function knows the minimal set of well-known names for the class. The following names are standard (well-known) at this time: nodes An array of the nodes in the cluster reset A ResetMechanism object logger An array of objects that log strings... CMclass The type of ClusterManager we are running (This is a class object, not a class instance) RandSeed Random seed. It is a triple of bytes. (optional) HAdir Base directory for HA installation The CTS code ignores names it doesn't know about/need. The individual tests have access to this information, and it is perfectly acceptable to provide hints, tweaks, fine-tuning directions or other information to the tests through this mechanism. ''' def __init__(self, nodes): self.data = {} self["nodes"] = nodes self.MinimalKeys=["nodes", "reset", "logger", "CMclass", "HAdir"] def HasMinimalKeys(self): 'Return TRUE if our object has the minimal set of keys/values in it' result = 1 for key in self.MinimalKeys: if not self.has_key(key): result = None return result def SupplyDefaults(self): if not self.has_key("logger"): self["logger"] = (SysLog(self), StdErrLog(self)) if not self.has_key("reset"): self["reset"] = Stonith() if not self.has_key("CMclass"): self["CMclass"] = HeartbeatCM if not self.has_key("HAdir"): self["HAdir"] = "@sysconfdir@/ha.d" if not self.has_key("LogFileName"): self["LogFileName"] = "/var/log/ha-log" # # Now set up our random number generator... # self.RandomGen = whrandom.whrandom() # Get a random seed for the random number generator. if self.has_key("RandSeed"): randseed = self["RandSeed"] else: f=open("/dev/urandom", "r") string=f.read(3) f.close() randseed=struct.unpack("BBB", string) self.log("Random seed is: " + str(randseed)) self.randseed=randseed self.RandomGen.seed(randseed[0], randseed[1], randseed[2]) def log(self, args): "Log using each of the supplied logging methods" for logfcn in self._logfunctions: logfcn(string.strip(args)) def __setitem__(self, key, value): '''Since this function gets called whenever we modify the dictionary (object), we can (and do) validate those keys that we know how to validate. For the most part, we know how to validate the "MinimalKeys" elements. ''' # # List of nodes in the system # if key == "nodes": self.Nodes = {} for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). self.Nodes[node] = gethostbyname_ex(node) if len(value) < 2: raise ValueError("Must have at least two nodes in system") # # Reset Mechanism # elif key == "reset": if not issubclass(value.__class__, ResetMechanism): raise ValueError("'reset' Value must be a subclass" " of ResetMechanism") # # List of Logging Mechanism(s) # elif key == "logger": if len(value) < 1: raise ValueError("Must have at least one logging mechanism") for logger in value: if not callable(logger): raise ValueError("'logger' elements must be callable") self._logfunctions = value # # Cluster Manager Class # elif key == "CMclass": if not issubclass(value, ClusterManager): raise ValueError("'CMclass' must be a subclass of" " ClusterManager") # # Initial Random seed... # elif key == "RandSeed": if len(value) != 3: raise ValueError("'Randseed' must be a 3-element list/tuple") for elem in value: if not isinstance(elem, types.IntType): raise ValueError("'Randseed' list must all be ints") self.data[key] = value def IsValidNode(self, node): 'Return TRUE if the given node is valid' return self.Nodes.has_key(node) def __CheckNode(self, node): "Raise a ValueError if the given node isn't valid" if not self.IsValidNode(node): raise ValueError("Invalid node [%s] in CheckNode" % node) def RandomNode(self): '''Choose a random node from the cluster''' return self.RandomGen.choice(self["nodes"]) def ResetNode(self, node): "Reset a node, (normally) using a hardware mechanism" self.__CheckNode(node) return self["reset"].reset(node) def usage(arg): print "Illegal argument " + arg print "usage: " + sys.args[0] \ + " --directory config-directory" \ + " -D config-directory" \ + " --logfile system-logfile-name" \ + " -L system-logfile-name" \ + " -v2"\ + " --stonith (1 | 0 | yes | no)" \ + " --standby (1 | 0 | yes | no)" + " [number-of-iterations]" system.exit(1) # # A little test code... # if __name__ == '__main__': from CTSaudits import AuditList from CTStests import TestList,RandomTests from CTS import Scenario, InitClusterManager, PingFest import CM_hb HAdir = "/etc/ha.d" LogFile = "/var/log/ha-log-local7" DoStonith = 1 DoStandby = 1 NumIter = 500 SuppressMonitoring = None Version = 1 + CIBfilename = None + ClobberCIB = 0 # # The values of the rest of the parameters are now properly derived from # the configuration files. # # Stonith is configurable because it's slow, I have a few machines which # don't reboot very reliably, and it can mild damage to your machine if # you're using a real power switch. # # Standby is configurable because the test is very heartbeat specific # and I haven't written the code to set it properly yet. Patches are # being accepted... # Process arguments... skipthis=None args=sys.argv[1:] for i in range(0, len(args)): if skipthis: skipthis=None continue elif args[i] == "-D" or args[i] == "--directory": skipthis=1 HAdir = args[i+1] elif args[i] == "-L" or args[i] == "--logfile": skipthis=1 LogFile = args[i+1] elif args[i] == "-v2": Version=2 elif args[i] == "--stonith": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": DoStonith=1 elif args[i+1] == "0" or args[i+1] == "no": DoStonith=0 else: usage(args[i+1]) elif args[i] == "--standby": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": DoStandby=1 elif args[i+1] == "0" or args[i+1] == "no": DoStandby=0 else: usage(args[i+1]) elif args[i] == "--suppressmonitoring": SuppressMonitoring = 1 elif args[i] == "-2" or args[i] == "--crm": Version = 2 elif args[i] == "-1" or args[i] == "--classic": Version = 1 + elif args[i] == "--clobber-cib" or args[i] == "-c": + ClobberCIB = 1 + elif args[i] == "--cib-filename": + skipthis=1 + CIBfilename = args[i+1] else: NumIter=int(args[i]) # # This reading of HBconfig here is ugly, and I suppose ought to # be done by the Cluster manager. This would probably mean moving the # list of cluster nodes into the ClusterManager class. A good thought # for our Copious Spare Time in the future... # config = CM_hb.HBConfig(HAdir) Environment = CtsLab(config.Parameters["node"]) Environment["HAdir"] = HAdir + Environment["ClobberCIB"] = ClobberCIB + Environment["CIBfilename"] = CIBfilename Environment["LogFileName"] = LogFile Environment["DoStonith"] = DoStonith Environment["DoStandby"] = DoStandby Environment["SuppressMonitoring"] = SuppressMonitoring if Version == 2: from CM_LinuxHAv2 import LinuxHAv2 Environment['CMclass']=LinuxHAv2 #Environment["RandSeed"] = (156, 104, 218) Environment.SupplyDefaults() # Your basic start up the world type of test scenario... #scenario = Scenario( #[ InitClusterManager(Environment) #, PingFest(Environment)]) scenario = Scenario( [ InitClusterManager(Environment)]) # Create the Cluster Manager object cm = Environment['CMclass'](Environment) cm.log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TESTS ") cm.log("HA configuration directory: " + Environment["HAdir"]) cm.log("System log files: " + Environment["LogFileName"]) cm.log("Enable Stonith: " + ("%d" % Environment["DoStonith"])) cm.log("Enable Standby: " + ("%d" % Environment["DoStandby"])) if Environment.has_key("SuppressMonitoring") \ and Environment["SuppressMonitoring"]: cm.log("Resource Monitoring is disabled") cm.log("Cluster nodes: " + repr(config.Parameters["node"])) Audits = AuditList(cm) Tests = TestList(cm) tests = RandomTests(scenario, cm, Tests, Audits) overall, detailed = tests.run(NumIter) cm.log("****************") cm.log("Overall Results:" + repr(overall)) cm.log("****************") cm.log("Detailed Results") for test in detailed.keys(): cm.log("Test %s:" % test + repr(detailed[test])) cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") diff --git a/cts/CTStests.py.in b/cts/CTStests.py.in index eabd7dad86..82c37b9d7d 100644 --- a/cts/CTStests.py.in +++ b/cts/CTStests.py.in @@ -1,1233 +1,1234 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Tests module There are a few things we want to do here: ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import CTS from CM_hb import HBConfig import CTSaudits import time, os, re, types, string, tempfile # List of all class objects for tests which we ought to # consider running. class RandomTests: ''' A collection of tests which are run at random. ''' def __init__(self, scenario, cm, tests, Audits): self.CM = cm self.Env = cm.Env self.Scenario = scenario self.Tests = [] for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") if test.is_applicable(): self.Tests.append(test) if not scenario.IsApplicable(): raise ValueError("Scenario not applicable in" " given Environment") self.Stats = {"success":0, "failure":0, "BadNews":0} self.IndividualStats= {} self.Audits = Audits def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def run(self, max=1): ( ''' Set up the given scenario, then run the selected tests at random for the selected number of iterations. ''') if not self.Scenario.SetUp(self.CM): return None BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"] , timeout=0) BadNews.setwatch() testcount=1 time.sleep(30) # This makes sure everything is stabilized before starting... for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " Failed.") self.incr("auditfail") while testcount <= max: test = self.Env.RandomGen.choice(self.Tests) # Some tests want a node as an argument. nodechoice = self.Env.RandomNode() + self.CM.log("Running test %s (%s) [%d]" % (test.name, nodechoice, testcount)) testcount = testcount + 1 starttime=time.time() ret=test(nodechoice) if ret: self.incr("success"); else: self.incr("failure"); # Better get the current info from the cluster... self.CM.statall() stoptime=time.time() elapsed_time = stoptime - starttime if not test.has_key("min_time"): test["elapsed_time"] = elapsed_time test["min_time"] = elapsed_time test["max_time"] = elapsed_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if elapsed_time < test["min_time"]: test["min_time"] = elapsed_time if elapsed_time > test["max_time"]: test["max_time"] = elapsed_time errcount=0 while errcount < 100: match=BadNews.look() if match: ignorelist=test.errorstoignore() ignorelist.append(" CTS: ") for ignore in ignorelist: if re.search(ignore, match): break else: self.CM.log(match) self.incr("BadNews"); errcount=errcount+1 else: break else: self.CM.log("Big problems. Shutting down.") self.CM.stopall() raise ValueError("Looks like we hit the jackpot! :-)") for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " Failed.") test.incr("auditfail") self.incr("auditfail") self.Scenario.TearDown(self.CM) for test in self.Tests: self.IndividualStats[test.name] = test.Stats return self.Stats, self.IndividualStats AllTestClasses = [ ] class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.timeout=120 def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def failure(self, reason="none"): '''Increment the failure count''' self.incr("failure") self.CM.log("Test " + self.name + " failed [reason:" + reason + "]") return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def canrunnow(self): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] ################################################################### class StopTest(CTSTest): ################################################################### '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="stop" self.uspat = self.CM["Pat:We_stopped"] self.thempat = self.CM["Pat:They_stopped"] self.allpat = self.CM["Pat:All_stopped"] def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: return self.skipped() if node == self.CM.OurNode: self.incr("us") pat = self.uspat else: if self.CM.upcount() <= 1: self.incr("all") pat = (self.allpat % node) else: self.incr("them") pat = (self.thempat % node) watch = CTS.LogWatcher(self.CM["LogFileName"], [pat] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.StopaCM(node) if watch.look(): return self.success() else: return self.failure("no match against %s "% pat) # # We don't register StopTest because it's better when called by # another test... # ################################################################### class StartTest(CTSTest): ################################################################### '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name="start" self.uspat = self.CM["Pat:We_started"] self.thempat = self.CM["Pat:They_started"] self.debug = debug def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["down"]: return self.skipped() if node == self.CM.OurNode or self.CM.upcount() < 1: self.incr("us") pat = (self.uspat % node) else: self.incr("them") pat = (self.thempat % node) watch = CTS.LogWatcher(self.CM["LogFileName"], [pat] , timeout=self.CM["StartTime"]+10, debug=self.debug) watch.setwatch() self.CM.StartaCM(node) if watch.look(): return self.success() else: self.CM.log("START FAILURE: did not find pattern " + pat) self.CM.log("START TIMEOUT = %d " % self.CM["StartTime"]) return self.failure("did not find pattern " + pat) def is_applicable(self): '''StartTest is always applicable''' return 1 # # We don't register StartTest because it's better when called by # another test... # ################################################################### class FlipTest(CTSTest): ################################################################### '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("stopped") ret = self.stop(node) type="up->down" # Give the cluster time to recognize it's gone... time.sleep(self.CM["DeadTime"]+2) elif self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("started") ret = self.start(node) type="down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) def is_applicable(self): '''FlipTest is always applicable''' return 1 # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) ################################################################### class RestartTest(CTSTest): ################################################################### '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Restart" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["DeadTime"]+2) ret2 = self.start(node) if not ret1: return self.failure("stop failure") if not ret2: return self.failure("start failure") return self.success() def is_applicable(self): '''RestartTest is always applicable''' return 1 # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) ################################################################### class StonithTest(CTSTest): ################################################################### '''Reboot a node by whacking it with stonith.''' def __init__(self, cm, timeout=600): CTSTest.__init__(self,cm) self.name="Stonith" self.theystopped = self.CM["Pat:They_stopped"] self.allstopped = self.CM["Pat:All_stopped"] self.usstart = self.CM["Pat:We_started"] self.themstart = self.CM["Pat:They_started"] self.timeout = timeout def __call__(self, node): '''Perform the 'stonith' test. (whack the node)''' self.incr("calls") stopwatch = None # Figure out what log message to look for when/if it goes down if self.CM.ShouldBeStatus[node] != self.CM["down"]: if self.CM.upcount() != 1: stopwatch = (self.theystopped % node) # Figure out what log message to look for when it comes up if (self.CM.upcount() <= 1): uppat = (self.usstart % node) else: uppat = (self.themstart % node) upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat] , timeout=self.timeout) if stopwatch: watch = CTS.LogWatcher(self.CM["LogFileName"], [stopwatch] , timeout=self.CM["DeadTime"]+10) watch.setwatch() # Reset (stonith) the node StonithWorked=None for tries in 1,2,3,4,5: if self.CM.Env.ResetNode(node): StonithWorked=1 break if not StonithWorked: return self.failure("Stonith failure") upwatch.setwatch() # Look() and see if the machine went down if stopwatch: if watch.look(): ret1=1 else: reason="Did not find " + stopwatch ret1=0 else: ret1=1 # Look() and see if the machine came back up if upwatch.look(): ret2=1 else: reason="Did not find " + uppat ret2=0 self.CM.ShouldBeStatus[node] = self.CM["up"] # I can't remember why I put this in here :-( time.sleep(10) if not ret1: self.CM.log("When node %s STONITHed, the other node didn't log it" % node) if ret1 and ret2: return self.success() else: return self.failure(reason) def is_applicable(self): '''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE''' if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 # Register StonithTest as a good test to run AllTestClasses.append(StonithTest) ################################################################### class IPaddrtest(CTSTest): ################################################################### '''Find the machine supporting a particular IP address, and knock it down. [Hint: This code isn't finished yet...] ''' def __init__(self, cm, IPaddrs): CTSTest.__init__(self,cm) self.name="IPaddrtest" self.IPaddrs = IPaddrs self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, IPaddr): ''' Perform the IPaddr test... ''' self.incr("calls") node = self.CM.Env.RandomNode() self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["DeadTime"]+10) ret2 = self.start(node) if not ret1: return self.failure("Could not stop") if not ret2: return self.failure("Could not start") return self.success() def is_applicable(self): '''IPaddrtest is always applicable (but shouldn't be)''' return 1 ################################################################### class StartOnebyOne(CTSTest): ################################################################### '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StartOnebyOne" def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] != self.CM["down"]: self.incr("stops") self.stop = StopTest(self.CM) self.stop(node) watchpats = [ ] pat = self.CM["Pat:We_started"] for node in self.CM.Env["nodes"]: thispat = (pat % node) watchpats.append(thispat) # Start all the nodes - one by one... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() for node in self.CM.Env["nodes"]: self.CM.StartaCM(node) if watch.lookforall(): return self.success() return self.failure("Did not find start pattern(s): " + repr(watch.unmatched)) def is_applicable(self): '''StartOnebyOne is always applicable''' return 1 # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) ################################################################### class SimulStart(CTSTest): ################################################################### '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStart" def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] != self.CM["down"]: self.incr("stops") self.stop = StopTest(self.CM) self.stop(node) watchpats = [ ] pat = self.CM["Pat:We_started"] for node in self.CM.Env["nodes"]: thispat = (pat % node) watchpats.append(thispat) # Start all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() for node in self.CM.Env["nodes"]: self.CM.StartaCMnoBlock(node) if watch.lookforall(): return self.success() return self.failure("Did not find start pattern(s): " + repr(watch.unmatched)) def is_applicable(self): '''SimulStart is always applicable''' return 1 # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) class SimulStop(CTSTest): ################################################################### '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStop" def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] != self.CM["up"]: self.incr("started") self.start = StartTest(self.CM) self.start(node) watchpats = [ ] pat = self.CM["Pat:We_stopped"] for node in self.CM.Env["nodes"]: prefix = ('(%s) .* ' % node) thispat = prefix + pat watchpats.append(thispat) # Stop all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() for node in self.CM.Env["nodes"]: self.CM.StopaCM(node) if watch.lookforall(): return self.success() return self.failure("Did not find stop pattern(s): " + repr(watch.unmatched)) def is_applicable(self): '''SimulStop is always applicable''' return 1 # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) ################################################################### class StandbyTest(CTSTest): ################################################################### '''Put a node in standby mode''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby" self.successpat = self.CM["Pat:StandbyOK"] self.nostandbypat = self.CM["Pat:StandbyNONE"] self.transient = self.CM["Pat:StandbyTRANSIENT"] def __call__(self, node): '''Perform the 'standby' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() if self.CM.upcount() < 2: self.incr("nostandby") pat = self.nostandbypat; else: self.incr("standby") pat = self.successpat; # # You could make a good argument that the cluster manager # ought to give us good clues on when its a bad time to # switch over to the other side, but heartbeat doesn't... # It could also queue the request. But, heartbeat # doesn't do that either :-) # retrycount=0 while (retrycount < 10): watch = CTS.LogWatcher(self.CM["LogFileName"] , [pat, self.transient] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.rsh(node, self.CM["Standby"]) match = watch.look() if match: if re.search(self.transient, match): self.incr("retries") time.sleep(2); retrycount=retrycount+1 else: return self.success() else: break # No point in retrying... return self.failure("did not find pattern " + pat) def is_applicable(self): '''StandbyTest is applicable when the CM has a Standby command''' if not self.CM.has_key("Standby"): return None else: #if self.CM.Env.has_key("DoStandby"): #flag=self.CM.Env["DoStandby"] #if type(flag) == types.IntType: #return flag #if not re.match("[yt]", flag, re.I): #return None # # We need to strip off everything after the first blank # cmd=self.CM["Standby"]; cmd = cmd.split()[0] if not os.access(cmd, os.X_OK): return None cf = self.CM.cf if not cf.Parameters.has_key("auto_failback"): return None elif cf.Parameters["auto_failback"][0] == "legacy": return None return 1 # Register StandbyTest as a good test to run AllTestClasses.append(StandbyTest) ####################################################################### class Fastdetection(CTSTest): ####################################################################### '''Test the time which one node find out the other node is killed very quickly''' def __init__(self,cm,timeout=60): CTSTest.__init__(self, cm) self.name = "DetectionTime" self.they_stopped = self.CM["Pat:They_stopped"] self.timeout = timeout self.start = StartTest(cm) self.standby = StandbyTest(cm) self.__setitem__("min", 0) self.__setitem__("max", 0) self.__setitem__("totaltime", 0) def __call__(self, node): '''Perform the fastfailureDetection test''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: ret=self.start(node) if not ret: return ret; if self.CM.upcount() < 2: return self.skipped() # Make sure they're not holding any resources ret = self.standby(node) if not ret: return ret; stoppat = (self.they_stopped % node) stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout) stopwatch.setwatch() if self.CM.rsh(node, "killall -9 heartbeat")==0: Starttime = os.times()[4] if stopwatch.look(): Stoptime = os.times()[4] self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") Detectiontime = Stoptime-Starttime detectms = int(Detectiontime*1000+0.5) self.CM.log("...failure detection time: %d ms" % detectms) self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime if self.Stats["min"] == 0: self.Stats["min"] = Detectiontime if Detectiontime > self.Stats["max"]: self.Stats["max"] = Detectiontime if Detectiontime < self.Stats["min"]: self.Stats["min"] = Detectiontime self.CM.ShouldBeStatus[node] = self.CM["down"] self.start(node) return self.success() else: self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") self.CM.ShouldBeStatus[node] = self.CM["down"] ret=self.start(node) return self.failure("Didn't find the log message") else: return self.failure("Couldn't stop heartbeat") def is_applicable(self): '''This test is applicable when auto_failback != legacy''' return self.standby.is_applicable() AllTestClasses.append(Fastdetection) ############################################################################## class BandwidthTest(CTSTest): ############################################################################## # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which heartbeat uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) self.tempfile = tempfile.mktemp(".cts") def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount()<1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] if port == None: port = 694 port = int(port) if self.CM.ShouldBeStatus[node] != self.CM["up"]: ret = self.start(node) if not ret: return ret time.sleep(5) # We get extra messages right after startup. fstmpfile = "/tmp/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile); rc = self.CM.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) self.CM.rsh.cp(farfile, self.tempfile) Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.CM.log("Could not compute bandwidth.") return self.success() intband = int(Bandwidth + 0.5) self.CM.log("...heartbeat bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.CM.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): fp = open(file, "r") fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count=count+1 linesplit = string.split(line," ") for j in range(len(linesplit)-1): if linesplit[j]=="udp": break if linesplit[j]=="length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = string.split(T1,":") time2split = string.split(timesplit[2],".") time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = string.split(line," ") for j in range(len(linessplit)-1): if linessplit[j] =="udp": break if linesplit[j]=="length:": break try: sum=int(linessplit[j+1])+sum except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = string.split(T2,":") time2split = string.split(timesplit[2],".") time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return (sum*8)/time def is_applicable(self): '''BandwidthTest is always applicable''' return 1 AllTestClasses.append(BandwidthTest) ########################################################################## class RedundantpathTest(CTSTest): ########################################################################## '''In heartbeat, it has redundant path to communicate between the cluster''' # # Tests should not be cluster-manager specific # One needs to isolate what you need from the cluster manager and then # add a (new) API to do it. # def __init__(self,cm,timeout=60): CTSTest.__init__(self,cm) self.name = "RedundantpathTest" self.timeout = timeout def PathCount(self): '''Return number of communication paths''' Path = self.CM.InternalCommConfig() cf = self.CM.cf eths = [] serials = [] num = 0 for interface in Path["interface"]: if re.search("eth",interface): eths.append(interface) num = num + 1 if re.search("/dev",interface): serials.append(interface) num = num + 1 return (num, eths, serials) def __call__(self,node): '''Perform redundant path test''' self.incr("calls") if self.CM.ShouldBeStatus[node]!=self.CM["up"]: return self.skipped() (num, eths, serials) = self.PathCount() for eth in eths: if self.CM.rsh(node,"ifconfig %s down" % eth)==0: PathDown = "OK" break if PathDown != "OK": for serial in serials: if self.CM.rsh(node,"setserial %s uart none" % serial)==0: PathDown = "OK" break if PathDown != "OK": return self.failure("Cannot break the path") time.sleep(self.timeout) for audit in CTSaudits.AuditList(self.CM): if not audit(): for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.failure("Redundant path fail") for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.success() def is_applicable(self): '''It is always applicable''' return self.PathCount()[0] > 1 #AllTestClasses.append(RedundantpathTest) ########################################################################## class DRBDTest(CTSTest): ########################################################################## '''In heartbeat, it provides replicated storage.''' def __init__(self,cm, timeout=10): CTSTest.__init__(self,cm) self.name = "DRBD" self.timeout = timeout def __call__(self, dummy): '''Perform the 'DRBD' test.''' self.incr("calls") for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() # Note: All these special cases with Start/Stop/StatusDRBD # should be reworked to use resource objects instead of # being hardwired to bypass the objects here. for node in self.CM.Env["nodes"]: done=time.time()+self.timeout+1 while (time.time()done: return self.failure("Can't start drbd, please check it") device={} for node in self.CM.Env["nodes"]: device[node]=self.getdevice(node) node = self.CM.Env["nodes"][0] done=time.time()+self.timeout+1 while 1: if (time.time()>done): return self.failure("the drbd could't sync") self.CM.rsh(node,"cp /proc/drbd /tmp >/dev/null 2>&1") if self.CM.rsh.cp("%s:/tmp/drbd" % node,"/tmp"): line = open("/tmp/drbd").readlines()[2] p = line.find("Primary") s1 = line.find("Secondary") s2 = line.rfind("Secondary") if s1!=s2: if self.CM.rsh(node,"drbdsetup %s primary" % device[node]): pass if p!=-1: if p