diff --git a/cts/CM_LinuxHAv2.py.in b/cts/CM_LinuxHAv2.py.in index f2febdaae0..d29e2b710f 100755 --- a/cts/CM_LinuxHAv2.py.in +++ b/cts/CM_LinuxHAv2.py.in @@ -1,736 +1,739 @@ #!@PYTHON@ '''CTS: Cluster Testing System: LinuxHA v2 dependent modules... ''' __copyright__=''' Author: Huang Zhen Copyright (C) 2004 International Business Machines Additional Audits, Revised Start action, Default Configuration: Copyright (C) 2004 Andrew Beekhof ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import os,sys,CTS,CTSaudits,CTStests, warnings from CTS import * from CM_hb import HeartbeatCM from CTSaudits import ClusterAudit from CTStests import * from CIB import * try: from xml.dom.minidom import * except ImportError: sys.__stdout__.write("Python module xml.dom.minidom not found\n") sys.__stdout__.write("Please install python-xml or similar before continuing\n") sys.__stdout__.flush() sys.exit(1) ####################################################################### # # LinuxHA v2 dependent modules # ####################################################################### class LinuxHAv2(HeartbeatCM): ''' The linux-ha version 2 cluster manager class. It implements the things we need to talk to and manipulate linux-ha version 2 clusters ''' def __init__(self, Environment, randseed=None): HeartbeatCM.__init__(self, Environment, randseed=randseed) self.fastfail = 0 self.clear_cache = 0 self.cib_installed = 0 self.config = None self.cluster_monitor = 0 self.use_short_names = 1 self.update({ "Name" : "linux-ha-v2", "DeadTime" : 300, "StartTime" : 300, # Max time to start up "StableTime" : 30, "StartCmd" : "@INITDIR@/heartbeat@INIT_EXT@ start > /dev/null 2>&1", "StopCmd" : "@INITDIR@/heartbeat@INIT_EXT@ stop > /dev/null 2>&1", "ElectionCmd" : "@sbindir@/crmadmin -E %s", "StatusCmd" : "@sbindir@/crmadmin -S %s 2>/dev/null", "EpocheCmd" : "@sbindir@/ccm_tool -e", "QuorumCmd" : "@sbindir@/ccm_tool -q", "CibQuery" : "@sbindir@/cibadmin -Ql", "ParitionCmd" : "@sbindir@/ccm_tool -p", "IsRscRunning" : "@libdir@/heartbeat/lrmadmin -E %s monitor 0 0 EVERYTIME 2>/dev/null|grep return", "ExecuteRscOp" : "@libdir@/heartbeat/lrmadmin -n %s -E %s %s 0 %d EVERYTIME 2>/dev/null", "CIBfile" : "%s:@HA_VARLIBDIR@/heartbeat/crm/cib.xml", "TmpDir" : "/tmp", "BreakCommCmd2" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm break-communication %s>/dev/null 2>&1", "IsIPAddrRscRunning" : "", "StandbyCmd" : "@sbindir@/crm_standby -U %s -v %s 2>/dev/null", "UUIDQueryCmd" : "@sbindir@/crmadmin -N", "StandbyQueryCmd" : "@sbindir@/crm_standby -GQ -U %s 2>/dev/null", # Patterns to look for in the log files for various occasions... "Pat:DC_IDLE" : "crmd.*State transition.*-> S_IDLE", # This wont work if we have multiple partitions - # Use: "Pat:They_started" : "%s crmd:.*State transition.*-> S_NOT_DC", - "Pat:They_started" : "Updating node state to member for %s", - "Pat:We_started" : "%s crmd:.* State transition.*-> S_IDLE", + "Pat:Local_started" : "%s crmd:.*The local CRM is operational", + "Pat:Slave_started" : "%s crmd:.*State transition.*-> S_NOT_DC", + "Pat:Master_started" : "%s crmd:.* State transition.*-> S_IDLE", "Pat:We_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete", "Pat:Logd_stopped" : "%s logd:.*Exiting write process", "Pat:They_stopped" : "%s crmd:.*LOST:.* %s ", "Pat:All_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete", "Pat:They_dead" : "node %s.*: is dead", "Pat:TransitionComplete" : "Transition status: Complete: complete", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"ERROR:", r"CRIT:", r"Shutting down\.", r"Forcing shutdown\.", r"Timer I_TERMINATE just popped", r"input=I_ERROR", r"input=I_FAIL", r"input=I_INTEGRATED cause=C_TIMER_POPPED", r"input=I_FINALIZED cause=C_TIMER_POPPED", r"input=I_ERROR", r", exiting\.", r"WARN.*Ignoring HA message.*vote.*not in our membership list", r"pengine.*Attempting recovery of resource", r"tengine.*is taking more than 2x its timeout", r"Confirm not received from", r"Welcome reply not received from", r"Attempting to schedule .* after a stop", r"Resource .* was active at shutdown", r"duplicate entries for call_id", r"Search terminated:", r"No need to invoke the TE", r":global_timer_callback", r"Faking parameter digest creation", r"Parameters to .* action changed:", r"Parameters to .* changed", ), }) del self["Standby"] if self.Env["DoBSC"]: del self["Pat:They_stopped"] del self["Pat:Logd_stopped"] self.Env["use_logd"] = 0 self.check_transitions = 0 self.check_elections = 0 self.CIBsync = {} self.default_cts_cib=CIB(self).cib() self.debug(self.default_cts_cib) def errorstoignore(self): # At some point implement a more elegant solution that # also produces a report at the end '''Return list of errors which are known and very noisey should be ignored''' if 1: return [ "ERROR: Message hist queue is filling up", "stonithd: .*CRIT: external_hostlist: 'vmware gethosts' returned an empty hostlist", "stonithd: .*ERROR: Could not list nodes for stonith RA external/vmware.", - + "pengine: Preventing .* from re-starting", ] return [] def install_config(self, node): if not self.ns.WaitForNodeToComeUp(node): self.log("Node %s is not up." % node) return None if not self.CIBsync.has_key(node) and self.Env["ClobberCIB"] == 1: self.CIBsync[node] = 1 self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml") self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml.sig") self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml.last") self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml.sig.last") # Only install the CIB on the first node, all the other ones will pick it up from there if self.cib_installed == 1: return None self.cib_installed = 1 if self.Env["CIBfilename"] == None: self.debug("Installing Generated CIB on node %s" %(node)) warnings.filterwarnings("ignore") cib_file=os.tmpnam() warnings.resetwarnings() os.system("rm -f "+cib_file) self.debug("Creating new CIB for " + node + " in: " + cib_file) os.system("echo \'" + self.default_cts_cib + "\' > " + cib_file) if 0!=self.rsh.echo_cp(None, cib_file, node, "@HA_VARLIBDIR@/heartbeat/crm/cib.xml"): raise ValueError("Can not create CIB on %s "%node) os.system("rm -f "+cib_file) else: self.debug("Installing CIB (%s) on node %s" %(self.Env["CIBfilename"], node)) if 0!=self.rsh.cp(self.Env["CIBfilename"], "root@" + (self["CIBfile"]%node)): raise ValueError("Can not scp file to %s "%node) self.rsh.remote_py(node, "os", "system", "chown @HA_CCMUSER@ @HA_VARLIBDIR@/heartbeat/crm/cib.xml") def prepare(self): '''Finish the Initialization process. Prepare to test...''' for node in self.Env["nodes"]: self.ShouldBeStatus[node] = "" self.StataCM(node) def test_node_CM(self, node): '''Report the status of the cluster manager on a given node''' watchpats = [ ] watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") - watchpats.append(self["Pat:They_started"]%node) + watchpats.append(self["Pat:Slave_started"]%node) idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats) idle_watch.setwatch() out=self.rsh.readaline(node, self["StatusCmd"]%node) self.debug("Node %s status: '%s'" %(node, out)) if not out or string.find(out, 'ok') < 0: if self.ShouldBeStatus[node] == self["up"]: self.log( "Node status for %s is %s but we think it should be %s" %(node, self["down"], self.ShouldBeStatus[node])) self.ShouldBeStatus[node]=self["down"] return 0 if self.ShouldBeStatus[node] == self["down"]: self.log( "Node status for %s is %s but we think it should be %s: %s" %(node, self["up"], self.ShouldBeStatus[node], out)) self.ShouldBeStatus[node]=self["up"] # check the output first - because syslog-ng looses messages if string.find(out, 'S_NOT_DC') != -1: # Up and stable return 2 if string.find(out, 'S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" %(node, out)) return 1 # Up and stable return 2 # Is the node up or is the node down def StataCM(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) > 0: return 1 return None # Being up and being stable is not the same question... def node_stable(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) == 2: return 1 self.log("Warn: Node %s not stable" %(node)) return None def cluster_stable(self, timeout=None): watchpats = [ ] watchpats.append("Current ping state: S_IDLE") watchpats.append(self["Pat:DC_IDLE"]) + self.debug("Waiting for cluster stability...") if timeout == None: timeout = self["DeadTime"] idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats, timeout) idle_watch.setwatch() any_up = 0 for node in self.Env["nodes"]: # have each node dump its current state if self.ShouldBeStatus[node] == self["up"]: self.rsh.readaline(node, (self["StatusCmd"] %node) ) any_up = 1 if any_up == 0: self.debug("Cluster is inactive") return 1 ret = idle_watch.look() if ret: self.debug(ret) return 1 self.log("Warn: Cluster Master not IDLE after %ds" % timeout) return None def is_node_dc(self, node, status_line=None): rc = 0 if not status_line: status_line = self.rsh.readaline(node, self["StatusCmd"]%node) if not status_line: rc = 0 elif string.find(status_line, 'S_IDLE') != -1: rc = 1 elif string.find(status_line, 'S_INTEGRATION') != -1: rc = 1 elif string.find(status_line, 'S_FINALIZE_JOIN') != -1: rc = 1 elif string.find(status_line, 'S_POLICY_ENGINE') != -1: rc = 1 elif string.find(status_line, 'S_TRANSITION_ENGINE') != -1: rc = 1 if rc == 1: self.debug("%s _is_ the DC" % node) return rc def active_resources(self, node): (rc, output) = self.rsh.remote_py( node, "os", "system", """@sbindir@/crm_mon -1 | grep "Started %s" """ % node) resources = [] for line in output: fields = line.split() resources.append(fields[0]) return resources def ResourceOp(self, resource, op, node, interval=0, app="lrmadmin"): ''' Execute an operation on a resource ''' self.rsh.readaline(node, self["ExecuteRscOp"] % (app, resource, op, interval)) return self.rsh.lastrc def ResourceLocation(self, rid): ResourceNodes = [] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: if self.ResourceOp(rid, "monitor", node) == 0: ResourceNodes.append(node) return ResourceNodes def isolate_node(self, node, allowlist): '''isolate the communication between the nodes''' rc = self.rsh(node, self["BreakCommCmd2"]%allowlist) if rc == 0: return 1 else: self.log("Could not break the communication from node: %s",node) return None def Configuration(self): if self.config: return self.config.getElementsByTagName('configuration')[0] warnings.filterwarnings("ignore") cib_file=os.tmpnam() warnings.resetwarnings() os.system("rm -f "+cib_file) if self.Env["ClobberCIB"] == 1: if self.Env["CIBfilename"] == None: self.debug("Creating new CIB in: " + cib_file) os.system("echo \'"+ self.default_cts_cib +"\' > "+ cib_file) else: os.system("cp "+self.Env["CIBfilename"]+" "+cib_file) else: if 0 != self.rsh.echo_cp( self.Env["nodes"][0], "@HA_VARLIBDIR@/heartbeat/crm/cib.xml", None, cib_file): raise ValueError("Can not copy file to %s, maybe permission denied"%cib_file) self.config = parse(cib_file) os.remove(cib_file) return self.config.getElementsByTagName('configuration')[0] def Resources(self): ResourceList = [] #read resources in cib configuration = self.Configuration() resources = configuration.getElementsByTagName('resources')[0] rscs = configuration.getElementsByTagName('primitive') incs = configuration.getElementsByTagName('clone') groups = configuration.getElementsByTagName('group') for rsc in rscs: if rsc in resources.childNodes: ResourceList.append(HAResource(self,rsc)) for grp in groups: for rsc in rscs: if rsc in grp.childNodes: if self.use_short_names: resource = HAResource(self,rsc) else: resource = HAResource(self,rsc,grp.getAttribute('id')) ResourceList.append(resource) for inc in incs: max = 0 inc_name = inc.getAttribute("id") instance_attributes = inc.getElementsByTagName('instance_attributes')[0] attributes = instance_attributes.getElementsByTagName('attributes')[0] nvpairs = attributes.getElementsByTagName('nvpair') for nvpair in nvpairs: if nvpair.getAttribute("name") == "clone_max": max = int(nvpair.getAttribute("value")) inc_rsc = inc.getElementsByTagName('primitive')[0] for i in range(0,max): rsc = HAResource(self,inc_rsc) rsc.inc_no = i rsc.inc_name = inc_name rsc.inc_max = max if self.use_short_names: rsc.rid = rsc.rid + ":%d"%i else: rsc.rid = inc_name+":"+rsc.rid + ":%d"%i rsc.Instance = rsc.rid ResourceList.append(rsc) return ResourceList def ResourceGroups(self): GroupList = [] #read resources in cib configuration = self.Configuration() groups = configuration.getElementsByTagName('group') rscs = configuration.getElementsByTagName('primitive') for grp in groups: group = [] GroupList.append(group) for rsc in rscs: if rsc in grp.childNodes: if self.use_short_names: resource = HAResource(self,rsc) else: resource = HAResource(self,rsc,grp.getAttribute('id')) group.append(resource) return GroupList def Dependencies(self): DependencyList = [] #read dependency in cib configuration=self.Configuration() constraints=configuration.getElementsByTagName('constraints')[0] rsc_to_rscs=configuration.getElementsByTagName('rsc_to_rsc') for node in rsc_to_rscs: dependency = {} dependency["id"]=node.getAttribute('id') dependency["from"]=node.getAttribute('from') dependency["to"]=node.getAttribute('to') dependency["type"]=node.getAttribute('type') dependency["strength"]=node.getAttribute('strength') DependencyList.append(dependency) return DependencyList def find_partitions(self): ccm_partitions = [] for node in self.Env["nodes"]: self.debug("Retrieving partition details for %s" %node) if self.ShouldBeStatus[node] == self["up"]: partition = self.rsh.readaline(node, self["ParitionCmd"]) if not partition: self.log("no partition details for %s" %node) elif len(partition) > 2: partition = partition[:-1] found=0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug("Adding partition from %s: %s" %(node, partition)) ccm_partitions.append(partition) else: self.log("bad partition details for %s" %node) return ccm_partitions def HasQuorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] for node in node_list: if self.ShouldBeStatus[node] == self["up"]: quorum = self.rsh.readaline(node, self["QuorumCmd"]) if string.find(quorum, "1") != -1: return 1 elif string.find(quorum, "0") != -1: return 0 else: self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum) return 0 def Components(self): complist = [] common_ignore = [ "Pending action:", "ERROR: crm_log_message_adv:", "ERROR: MSG: No message to dump", "pending LRM operations at shutdown", "Lost connection to the CIB service", "Connection to the CIB terminated...", "Sending message to CIB service FAILED", "crmd: .*Action A_RECOVER .* not supported", "ERROR: stonithd_op_result_ready: not signed on", "send_ipc_message: IPC Channel to .* is not connected", "unconfirmed_actions: Waiting on .* unconfirmed actions", "cib_native_msgready: Message pending on command channel", "crmd:.*do_exit: Performing A_EXIT_1 - forcefully exiting the CRMd", "verify_stopped: Resource .* was active at shutdown. You may ignore this error if it is unmanaged.", ] stonith_ignore = [ "ERROR: stonithd_signon: ", "update_failcount: Updating failcount for child_DoFencing", "ERROR: te_connect_stonith: Sign-in failed: triggered a retry", ] stonith_ignore.extend(common_ignore) ccm = Process("ccm", 0, [ "State transition S_IDLE", "CCM connection appears to have failed", "crmd: .*Action A_RECOVER .* not supported", "crmd: .*Input I_TERMINATE from do_recover", "Exiting to recover from CCM connection failure", "crmd:.*do_exit: Could not recover from internal error", "crmd: .*I_ERROR.*(ccm_dispatch|crmd_cib_connection_destroy)", # "WARN: determine_online_status: Node .* is unclean", # "Scheduling Node .* for STONITH", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", - "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN", + "A new node joined the cluster", +# "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE", +# "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN", "State transition S_STARTING -> S_PENDING", ], [], common_ignore, self.fastfail, self) cib = Process("cib", 0, [ "State transition S_IDLE", "Lost connection to the CIB service", "Connection to the CIB terminated...", "crmd: .*Input I_TERMINATE from do_recover", "crmd: .*I_ERROR.*crmd_cib_connection_destroy", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self.fastfail, self) lrmd = Process("lrmd", 0, [ "State transition S_IDLE", "LRM Connection failed", "crmd: .*I_ERROR.*lrm_dispatch", "State transition S_STARTING -> S_PENDING", ".*crmd .*exited with return code 2.", "crmd: .*Input I_TERMINATE from do_recover", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self.fastfail, self) crmd = Process("crmd", 0, [ # "WARN: determine_online_status: Node .* is unclean", # "Scheduling Node .* for STONITH", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", "State transition S_IDLE", "State transition S_STARTING -> S_PENDING", ], [ "tengine: .*ERROR: subsystem_msg_dispatch: The server .* has left us: Shutting down...NOW", "pengine: .*ERROR: subsystem_msg_dispatch: The server .* has left us: Shutting down...NOW", ], common_ignore, self.fastfail, self) pengine = Process("pengine", 1, [ "State transition S_IDLE", ".*crmd .*exited with return code 2.", "crmd: .*Input I_TERMINATE from do_recover", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self.fastfail, self) tengine = Process("tengine", 1, [ "State transition S_IDLE", ".*crmd .*exited with return code 2.", "crmd: .*Input I_TERMINATE from do_recover", "crmd:.*do_exit: Could not recover from internal error", ], [], common_ignore, self.fastfail, self) if self.Env["DoFencing"] == 1 : complist.append(Process("stonithd", 0, [], [ "tengine_stonith_connection_destroy: Fencing daemon has left us", "Attempting connection to fencing daemon", "te_connect_stonith: Connected", ], stonith_ignore, 0, self)) # complist.append(Process("heartbeat", 0, [], [], [], None, self)) if self.fastfail == 0: ccm.pats.extend([ "ERROR: Client .*attrd exited with return code 1", "ERROR: Respawning client .*attrd", "ERROR: Client .*cib exited with return code 2", "ERROR: Respawning client .*cib", "ERROR: Client .*crmd exited with return code 2", "ERROR: Respawning client .*crmd" ]) cib.pats.extend([ "ERROR: Client .*attrd exited with return code 1", "ERROR: Respawning client .*attrd", "ERROR: Client .*crmd exited with return code 2", "ERROR: Respawning client .*crmd" ]) lrmd.pats.extend([ "ERROR: Client .*crmd exited with return code 2", "ERROR: Respawning client .*crmd" ]) pengine.pats.extend([ "ERROR: Client .*crmd exited with return code 2", "ERROR: Respawning client .*crmd" ]) tengine.pats.extend([ "ERROR: Client .*crmd exited with return code 2", "ERROR: Respawning client .*crmd" ]) complist.append(ccm) complist.append(cib) complist.append(lrmd) complist.append(crmd) complist.append(pengine) complist.append(tengine) return complist def NodeUUID(self, node): lines = self.rsh.readlines(node, self["UUIDQueryCmd"]) for line in lines: self.debug("UUIDLine:"+ line) m = re.search(r'%s.+\((.+)\)' % node, line) if m: return m.group(1) return "" def StandbyStatus(self, node): out=self.rsh.readaline(node, self["StandbyQueryCmd"]%node) if not out: return "off" out = out[:-1] self.debug("Standby result: "+out) return out # status == "on" : Enter Standby mode # status == "off": Enter Active mode def SetStandbyMode(self, node, status): current_status = self.StandbyStatus(node) cmd = self["StandbyCmd"] % (node, status) ret = self.rsh(node, cmd) return True class HAResource(Resource): def __init__(self, cm, node, group=None): ''' Get information from xml node ''' if group == None : self.rid = str(node.getAttribute('id')) else : self.rid = group + ":" + str(node.getAttribute('id')) self.rclass = str(node.getAttribute('class')) self.rtype = str(node.getAttribute('type')) self.inc_name = None self.inc_no = -1 self.inc_max = -1 self.rparameters = {} nvpairs = [] list = node.getElementsByTagName('instance_attributes') if len(list) > 0: attributes = list[0] list = attributes.getElementsByTagName('attributes') if len(list) > 0: parameters = list[0] nvpairs = parameters.getElementsByTagName('nvpair') for nvpair in nvpairs: name=nvpair.getAttribute('name') value=nvpair.getAttribute('value') self.rparameters[name]=value # This should normally be called first... FIXME! Resource.__init__(self, cm, self.rtype, self.rid) # resources that dont need quorum will have: # ops = node.getElementsByTagName('op') for op in ops: if op.getAttribute('name') == "start" and op.getAttribute('prereq') == "nothing": self.needs_quorum = 0 def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' rc = self.CM.ResourceOp(self.rid, "monitor", nodename) return (rc == 0) def RunningNodes(self): return self.CM.ResourceLocation(self.rid) def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self.CM.ResourceOp(self.rid, "start", nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self.CM.ResourceOp(self.rid, "stop", nodename) def IsWorkingCorrectly(self, nodename): return self.IsRunningOn(nodename) ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': pass diff --git a/cts/CM_fs.py.in b/cts/CM_fs.py.in index 328863ac8f..26dc83d1e1 100644 --- a/cts/CM_fs.py.in +++ b/cts/CM_fs.py.in @@ -1,73 +1,73 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Failsafe dependent modules... Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. #import types, string, select, sys, time, re, os, random, struct #from os import system #from UserDict import UserDict #from syslog import * #from popen2 import Popen3 class FailSafeCM(ClusterManager): ''' The FailSafe cluster manager class. Not implemented yet. ''' def __init__(self, randseed=None): ClusterManager.__init__(self, randseed=randseed) self.update({ "Name" : "FailSafe", "StartCmd" : None, # Fix me! "StopCmd" : None, # Fix me! "StatusCmd" : None, # Fix me! "RereadCmd" : None, # Fix me! "TestConfigDir" : None, # Fix me! "LogFileName" : None, # Fix me! - "Pat:We_started" : None, # Fix me! - "Pat:They_started" : None, # Fix me! + "Pat:Master_started" : None, # Fix me! + "Pat:Slave_started" : None, # Fix me! "Pat:We_stopped" : None, # Fix me! "Pat:They_stopped" : None, # Fix me! "BadRegexes" : None, # Fix me! }) self._finalConditions() def SyncTestConfigs(self): pass def SetClusterConfig(self): pass def ResourceGroups(self): raise ValueError("Forgot to write ResourceGroups()") diff --git a/cts/CM_hb.py.in b/cts/CM_hb.py.in index 2a4929f9ff..5fc8d6e6d7 100755 --- a/cts/CM_hb.py.in +++ b/cts/CM_hb.py.in @@ -1,649 +1,649 @@ #!@PYTHON@ '''CTS: Cluster Testing System: heartbeat dependent modules... Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000,2001,2005 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. from CTS import * class HeartbeatCM(ClusterManager): ''' The heartbeat cluster manager class. It implements the things we need to talk to and manipulate heartbeat clusters ''' def __init__(self, Environment, randseed=None): self.ResourceDirs = ["@sysconfdir@/ha.d/resource.d", "@sysconfdir@/rc.d/init.d", "@sysconfdir@/rc.d/"] self.ResourceFile = Environment["HAdir"] + "/haresources" self.ConfigFile = Environment["HAdir"]+ "/ha.cf" ClusterManager.__init__(self, Environment, randseed=randseed) self.update({ "Name" : "heartbeat", "DeadTime" : 30, "StableTime" : 30, "StartCmd" : "@libdir@/heartbeat/ha_logd -d >/dev/null 2>&1; MALLOC_CHECK_=2 @libdir@/heartbeat/heartbeat >/dev/null 2>&1", "StopCmd" : "@libdir@/heartbeat/heartbeat -k", "StatusCmd" : "@libdir@/heartbeat/heartbeat -s", "RereadCmd" : "@libdir@/heartbeat/heartbeat -r", "StartDRBDCmd" : "@sysconfdir@/init.d/drbd start >/dev/null 2>&1", "StopDRBDCmd" : "@sysconfdir@/init.d/drbd stop", "StatusDRBDCmd" : "@sysconfdir@/init.d/drbd status", "DRBDCheckconf" : "@sysconfdir@/init.d/drbd checkconfig >/var/run/drbdconf 2>&1", "BreakCommCmd" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm break-communication >/dev/null 2>&1", "FixCommCmd" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm fix-communication >/dev/null 2>&1", "DelFileCommCmd" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm delete-testingfile >/dev/null 2>&1", "SaveFileCmd" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm save-testingfile /tmp/OnlyForTesting >/dev/null 2>&1", "ReduceCommCmd" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm reduce-communication %s %s>/dev/null 2>&1", "RestoreCommCmd" : "@HA_NOARCHDATAHBDIR@/TestHeartbeatComm restore-communication /tmp/OnlyForTesting >/dev/null 2>&1", "IPaddrCmd" : "@sysconfdir@/ha.d/resource.d/IPaddr %s status", "Standby" : "@HA_NOARCHDATAHBDIR@/hb_standby >/dev/null 2>&1", "TestConfigDir" : "@sysconfdir@/ha.d/testconfigs", "LogFileName" : Environment["LogFileName"], # Patterns to look for in the log files for various occasions... - "Pat:We_started" : " (%s) .* Initial resource acquisition complete", - "Pat:They_started" : " (%s) .* Initial resource acquisition complete", + "Pat:Master_started" : " (%s) .* Initial resource acquisition complete", + "Pat:Slave_started" : " (%s) .* Initial resource acquisition complete", "Pat:We_stopped" : "%s heartbeat.*Heartbeat shutdown complete", "Pat:Logd_stopped" : "%s logd:.*Exiting write process", "Pat:They_stopped" : "%s heartbeat.*node (%s).*: is dead", "Pat:They_dead" : "node (%s).*: is dead", "Pat:All_stopped" : " (%s).*heartbeat.*Heartbeat shutdown complete", "Pat:StandbyOK" : "Standby resource acquisition done", "Pat:StandbyNONE" : "No reply to standby request", "Pat:StandbyTRANSIENT" : "standby message.*ignored.*in flux", "Pat:Return_partition" : "Cluster node %s returning after partition", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"Shutting down\.", r"Forcing shutdown\.", r"Both machines own .* resources!", r"No one owns .* resources!", r", exiting\.", r"ERROR:", r"CRIT.*:", ), }) self.cf=HBConfig(Environment["HAdir"]) self._finalConditions() def SetClusterConfig(self, configpath="default", nodelist=None): '''Activate the named test configuration throughout the cluster. This code is specialized to heartbeat. ''' rc=1 Command=''' cd %s%s%s; : cd to test configuration directory for j in * do if [ -f "@sysconfdir@/ha.d/$j" ]; then if cmp $j @sysconfdir@/ha.d/$j >/dev/null 2>&1; then : Config file $j is already up to correct. else echo "Touching $j" cp $j @sysconfdir@/ha.d/$j fi fi done ''' % (self["TestConfigDir"], os.sep, configpath) if nodelist == None: nodelist=self.Env["nodes"] for node in nodelist: if not self.rsh(node, Command): rc=None self.rereadall() return rc def ResourceGroups(self): ''' Return the list of resources groups defined in this configuration. This code is specialized to heartbeat. We make the assumption that the resource file on the local machine is the same as that of a cluster member. We aren't necessarily a member of the cluster (In fact, we usually aren't). ''' RscGroups=[] file = open(self.ResourceFile, "r") while (1): line = file.readline() if line == "": break idx=string.find(line, '#') if idx >= 0: line=line[:idx] if line == "": continue line = string.strip(line) # Is this wrong? tokens = re.split("[ \t]+", line) # Ignore the default server for this resource group del tokens[0] Group=[] for token in tokens: if token != "": idx=string.find(token, "::") if idx > 0: tuple=string.split(token, "::") else: # # Is this an IPaddr default resource type? # if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$" , token): tuple=["IPaddr", token] else: tuple = [token, None] Resource = self.hbResource(tuple[0], tuple[1]) Group.append(Resource) RscGroups.append(Group) file.close() return RscGroups def InternalCommConfig(self): ''' Return a list of communication paths. Each path consists of a tuple like this: mediatype serial | ip interface/dev name eth0 | /dev/ttyS0... protocol tcp?? | udp | None port Number | None ''' Path = {"mediatype" : None, "interface": None, "protocol" : None, "port": None} cf = self.cf for cfp in cf.Parameters: if cfp == "serial": if Path["mediatype"] == None: Path["mediatype"] = ["serial"] else: Path["mediatype"].append("serial") if Path["interface"] == None: Path["interface"] = cf.Parameters["serial"] else: for serial in cf.Parameters["serial"]: Path["interface"].append(serial) if cfp == "bcast" or cfp == "mcast" or cfp == "ucast" : if Path["mediatype"] == None: Path["mediatype"] = ["ip"] else: Path["mediatype"].append("ip") if cfp == "bcast": interfaces = cf.Parameters[cfp] if cfp == "ucast": interfaces = [cf.Parameters[cfp][0]] if cfp == "mcast": Path["port"] = [cf.Parameters[cfp][0][2]] Path["protocol"] = "udp" interfaces = [cf.Parameters[cfp][0][0]] if Path["interface"] == None: Path["interface"] = interfaces else: for interface in interfaces: if interface not in Path["interface"]: Path["interface"].append(interface) if cfp == "udpport": Path["port"] = cf.Parameters["udpport"] Path["protocol"] = ["udp"] if Path["port"] == None: Path["port"] = [694] return Path def HasQuorum(self, node_list): ( '''Return TRUE if the cluster currently has quorum. According to current heartbeat code this means one node is up. ''') return self.upcount() >= 1 def hbResource(self, type, instance): ''' Our job is to create the right kind of resource. For most resources, we just create an HBResource object, but for IP addresses, we create an HBipResource instead. Some other types of resources may also be added as special cases. ''' if type == "IPaddr": return HBipResource(self, type, instance) return HBResource(self, type, instance) class HBResource(Resource): def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' return self._ResourceOperation("status", "OK|running", nodename) def _ResourceOperation(self, operation, pattern, nodename): ''' We call the requested operation for the resource script. We don't care what kind of operation we were called to do particularly. When we were created, we were bound to a cluster manager, which has its own remote execution method (which we use here). ''' if self.Instance == None: instance = "" else: instance = self.Instance Rlist = 'LIST="' for dir in self.CM.ResourceDirs: Rlist = Rlist + " " + dir Rlist = Rlist + '"; ' Script= Rlist + ''' T="''' + self.ResourceType + '''"; I="''' + instance + '''"; for dir in $LIST; do if [ -f "$dir/$T" -a -x "$dir/$T" ] then "$dir/$T" $I ''' + operation + ''' exit $? fi done 2>&1; exit 1;''' #print "Running " + Script + "\n" line = self.CM.rsh.readaline(nodename, Script) if operation == "status": if re.search(pattern, line): return 1 return self.CM.rsh.lastrc == 0 def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self._ResourceOperation("start", None, nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self._ResourceOperation("stop", None, nodename) # def IsWorkingCorrectly(self, nodename): # "We default to returning TRUE for this one..." # if self.Instance == None: # self.CM.log("Faking out: " + self.ResourceType) # else: # self.CM.log("Faking out: " + self.ResourceType + self.Instance) # return 1 def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", "OK", nodename) class HBipResource(HBResource): ''' We are a specialized IP address resource which knows how to test to see if our resource type is actually being served. We are cheat and run the IPaddr resource script on the current machine, because it's a more interesting case. ''' def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", "OK", self.CM.OurNode) # # A heartbeat configuration class... # It reads and parses the heartbeat config # files # class HBConfig: # Which options have multiple words on the line? MultiTokenKeywords = {"mcast" : None , "stonith_host": None} def __init__(self, configdir="/etc/ha.d"): self.Parameters = {} self.ResourceGroups = {} self._ReadConfig(os.path.join(configdir, "ha.cf")) FirstUp_NodeSelection() LastUp_NodeSelection() no_failback = NoAutoFailbackPolicy() auto_failback = AutoFailbackPolicy() # # We allow each resource group to have its own failover/back # policies # if self.Parameters.has_key("nice_failback") \ and self.Parameters["nice_failback"] == "on": HBConfig.DefaultFailbackPolicy = no_failback elif self.Parameters.has_key("auto_failback") \ and self.Parameters["auto_failback"] == "off": HBConfig.DefaultFailbackPolicy = no_failback else: HBConfig.DefaultFailbackPolicy = auto_failback HBConfig.DefaultNodeSelectionPolicy = NodeSelectionPolicies["FirstUp"] self._ReadResourceGroups(os.path.join(configdir, "haresources")) # Read ha.cf config file def _ReadConfig(self, ConfigFile): self.ConfigPath = ConfigFile; fp = open(ConfigFile) while 1: line=fp.readline() if not line: return line = re.sub("#.*", "", line) line = string.rstrip(line) if len(line) < 1: continue tokens = line.split() key = tokens[0] values = tokens[1:] if HBConfig.MultiTokenKeywords.has_key(key): # group items from this line together, and separate # from the items on other lines values = [values] if self.Parameters.has_key(key): if key == "node": self.Parameters[key].extend(values) else: self.Parameters[key].append(values[0]) else: self.Parameters[key] = values # Read a line from the haresources file... # - allow for \ continuations... def _GetRscLine(self, fp): linesofar = None continuation=1 while continuation: continuation = 0 line=fp.readline() if not line: break line = re.sub("#.*", "", line) if line[len(line)-2] == "\\": line = line[0:len(line)-2] + "\n" continuation=1 if linesofar == None: linesofar = line else: linesofar = linesofar + line return linesofar def _ReadResourceGroups(self, RscFile): self.RscPath = RscFile; fp = open(RscFile) thisline = "" while 1: line=self._GetRscLine(fp) if not line: return line = line.strip() if len(line) < 1: continue tokens = line.split() node = tokens[0] resources = tokens[1:] rscargs=[] for resource in resources: name=resource.split("::", 1) if len(name) > 1: args=name[1].split("::") else: args=None name = name[0] rscargs.append(Resource(name, args)) name = tokens[0] + "__" + tokens[1] assert not self.ResourceGroups.has_key(name) # # Create the resource group # self.ResourceGroups[name] = ResourceGroup(name \ , rscargs , node.split(",") # Provide default value , HBConfig.DefaultNodeSelectionPolicy , HBConfig.DefaultFailbackPolicy) # # Return the list of nodes in the cluster... # def nodes(self): result = self.Parameters["node"] result.sort() return result class ClusterState: pass class ResourceGroup: def __init__(self, name, resourcelist, possiblenodes , nodeselection_policy, failback_policy): self.name = name self.resourcelist = resourcelist self.possiblenodes = possiblenodes self.nodeselection_policy = nodeselection_policy self.failback_policy = failback_policy self.state = None self.attributes = {} self.history = [] def __str__(self): result = string.join(self.possiblenodes, ",") for rsc in self.resourcelist: result = result + " " + str(rsc) return result class Resource: def __init__(self, name, arguments=None): self.name = name self.arguments = arguments def __str__(self): result = self.name try: for arg in self.arguments: result = result + "::" + arg except TypeError: pass return result ####################################################################### # # Base class defining policies for where we put resources # when we're starting, or when a failure has occurred... # ####################################################################### NodeSelectionPolicies = {} class NodeSelectionPolicy: def __init__(self, name): self.name = name NodeSelectionPolicies[name] = self def name(self): return self.name # # nodenames: the list of nodes eligible to run this resource # ResourceGroup: the group to be started... # ClusterState: Cluster state information # def SelectNode(self, nodenames, ResourceGroup, ClusterState): return None # # Choose the first node in the list... # class FirstUp_NodeSelection(NodeSelectionPolicy): def __init__(self): NodeSelectionPolicy.__init__(self, "FirstUp") def SelectNode(self, nodenames, ResourceGroup, ClusterState): return nodenames[0] # # Choose the last node in the list... # (kind of a dumb example) # class LastUp_NodeSelection(NodeSelectionPolicy): def __init__(self): NodeSelectionPolicy.__init__(self, "LastUp") def SelectNode(self, nodenames, ResourceGroup, ClusterState): return nodenames[len(nodenames)-1] ####################################################################### # # Failback policies... # # Where to locate a resource group when an eligible node rejoins # the cluster... # ####################################################################### FailbackPolicies = {} class FailbackPolicy: def __init__(self, name): self.name = name FailbackPolicies[name] = self def name(self): return self.name # # currentnode: The node the service is currently on # returningnode: The node which just rejoined # eligiblenodes: Permitted nodes which are up # SelectionPolicy: the normal NodeSelectionPolicy # Cluster state information... # def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup, ClusterState): return None # # This FailbackPolicy is like "normal failback" in heartbeat # class AutoFailbackPolicy(FailbackPolicy): def __init__(self): FailbackPolicy.__init__(self, "failback") def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup, ClusterState): # Select where it should run based on current normal policy # just as though we were starting it for the first time. return SelectionPolicy(eligiblenodes, ResourceGroup, ClusterState) # # This FailbackPolicy is like "nice failback" in heartbeat # class NoAutoFailbackPolicy(FailbackPolicy): def __init__(self): FailbackPolicy.__init__(self, "failuresonly") def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup): # Always leave the resource where it is... return currentnode ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': FirstUp_NodeSelection() LastUp_NodeSelection() no_failback = NoAutoFailbackPolicy() auto_failback = AutoFailbackPolicy() cf=HBConfig("/etc/ha.d") print "Cluster configuration:\n" print "Nodes:", cf.nodes(), "\n" print "Config Parameters:", cf.Parameters, "\n" for groupname in cf.ResourceGroups.keys(): print "Resource Group %s:\n\t%s\n" % (groupname, cf.ResourceGroups[groupname]) diff --git a/cts/CTS.py.in b/cts/CTS.py.in index 85197b60ce..b132627379 100755 --- a/cts/CTS.py.in +++ b/cts/CTS.py.in @@ -1,1211 +1,1212 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Main module Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import types, string, select, sys, time, re, os, struct, os, signal import base64, pickle, binascii from UserDict import UserDict from syslog import * from popen2 import Popen3 class RemoteExec: '''This is an abstract remote execution class. It runs a command on another machine - somehow. The somehow is up to us. This particular class uses ssh. Most of the work is done by fork/exec of ssh or scp. ''' def __init__(self): # -n: no stdin, -x: no X11 self.Command = "@SSH@ -l root -n -x" # -f: ssh to background self.CommandnoBlock = "@SSH@ -f -l root -n -x" # -B: batch mode, -q: no stats (quiet) self.CpCommand = "@SCP@ -B -q" self.OurNode=string.lower(os.uname()[1]) def setcmd(self, rshcommand): '''Set the name of the remote shell command''' self.Command = rshcommand def _fixcmd(self, cmd): return re.sub("\'", "'\\''", cmd) def _cmd(self, *args): '''Compute the string that will run the given command on the given remote system''' args= args[0] sysname = args[0] command = args[1] #print "sysname: %s, us: %s" % (sysname, self.OurNode) if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost": ret = command else: ret = self.Command + " " + sysname + " '" + self._fixcmd(command) + "'" #print ("About to run %s\n" % ret) return ret def _cmd_noblock(self, *args): '''Compute the string that will run the given command on the given remote system''' args= args[0] sysname = args[0] command = args[1] #print "sysname: %s, us: %s" % (sysname, self.OurNode) if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost": ret = command + " &" else: ret = self.CommandnoBlock + " " + sysname + " '" + self._fixcmd(command) + "'" #print ("About to run %s\n" % ret) return ret def __call__(self, *args): '''Run the given command on the given remote system If you call this class like a function, this is the function that gets called. It just runs it roughly as though it were a system() call on the remote machine. The first argument is name of the machine to run it on. ''' count=0; rc = 0; while count < 3: rc = os.system(self._cmd(args)) if rc == 0: return rc print "Retrying command %s" % self._cmd(args) count=count+1 return rc def popen(self, *args): '''popen the given remote command on the remote system. As in __call__, the first argument is name of the machine to run it on. ''' #print "Now running %s\n" % self._cmd(args) return Popen3(self._cmd(args), None) def readaline(self, *args): '''Run a command on the remote machine and capture 1 line of stdout from the given remote command As in __call__, the first argument is name of the machine to run it on. ''' p = self.popen(args[0], args[1]) p.tochild.close() result = p.fromchild.readline() p.fromchild.close() self.lastrc = p.wait() return result def readlines(self, *args): p = self.popen(args[0], args[1]) p.tochild.close() result = p.fromchild.readlines() p.fromchild.close() self.lastrc = p.wait() return result def cp(self, *args): '''Perform a remote copy''' cpstring=self.CpCommand for arg in args: cpstring = cpstring + " \'" + arg + "\'" count=0; rc = 0; for i in range(3): rc = os.system(cpstring) if rc == 0: return rc print "Retrying command %s" % cpstring return rc def echo_cp(self, src_host, src_file, dest_host, dest_file): '''Perform a remote copy via echo''' (rc, lines) = self.remote_py(src_host, "os", "system", "cat %s" % src_file) if rc != 0: print "Copy of %s:%s failed" % (src_host, src_file) elif dest_host == None: fd = open(dest_file, "w") fd.writelines(lines) fd.close() else: big_line="" for line in lines: big_line = big_line + line (rc, lines) = self.remote_py(dest_host, "os", "system", "echo '%s' > %s" % (big_line, dest_file)) return rc def noBlock(self, *args): '''Perform a remote execution without waiting for it to finish''' sshnoBlock = self._cmd_noblock(args) count=0; rc = 0; for i in range(3): rc = os.system(sshnoBlock) if rc == 0: return rc print "Retrying command %s" % sshnoBlock return rc def remote_py(self, node, module, func, *args): '''Execute a remote python function If the call success, lastrc == 0 and return result. If the call fail, lastrc == 1 and return the reason (string) ''' encode_args = binascii.b2a_base64(pickle.dumps(args)) encode_cmd = string.join(["@HA_NOARCHDATAHBDIR@/cts/CTSproxy.py",module,func,encode_args]) #print "%s: %s.%s %s" % (node, module, func, repr(args)) result = self.readlines(node, encode_cmd) if result != None: result.pop() if self.lastrc == 0: last_line="" if result != None: array_len = len(result) if array_len > 0: last_line=result.pop() #print "result: %s" % repr(last_line) return pickle.loads(binascii.a2b_base64(last_line)), result return -1, result class LogWatcher: '''This class watches logs for messages that fit certain regular expressions. Watching logs for events isn't the ideal way to do business, but it's better than nothing :-) On the other hand, this class is really pretty cool ;-) The way you use this class is as follows: Construct a LogWatcher object Call setwatch() when you want to start watching the log Call look() to scan the log looking for the patterns ''' def __init__(self, log, regexes, timeout=10, debug=None): '''This is the constructor for the LogWatcher class. It takes a log name to watch, and a list of regular expressions to watch for." ''' # Validate our arguments. Better sooner than later ;-) for regex in regexes: assert re.compile(regex) self.regexes = regexes self.filename = log self.debug=debug self.whichmatch = -1 self.unmatched = None if self.debug: print "Debug now on for for log", log self.Timeout = int(timeout) self.returnonlymatch = None if not os.access(log, os.R_OK): raise ValueError("File [" + log + "] not accessible (r)") def setwatch(self, frombeginning=None): '''Mark the place to start watching the log from. ''' self.file = open(self.filename, "r") self.size = os.path.getsize(self.filename) if not frombeginning: self.file.seek(0,2) def ReturnOnlyMatch(self, onlymatch=1): '''Mark the place to start watching the log from. ''' self.returnonlymatch = onlymatch def look(self, timeout=None): '''Examine the log looking for the given patterns. It starts looking from the place marked by setwatch(). This function looks in the file in the fashion of tail -f. It properly recovers from log file truncation, but not from removing and recreating the log. It would be nice if it recovered from this as well :-) We return the first line which matches any of our patterns. ''' last_line=None first_line=None if timeout == None: timeout = self.Timeout done=time.time()+timeout+1 if self.debug: print "starting search: timeout=%d" % timeout for regex in self.regexes: print "Looking for regex: ", regex while (timeout <= 0 or time.time() <= done): newsize=os.path.getsize(self.filename) if self.debug > 4: print "newsize = %d" % newsize if newsize < self.size: # Somebody truncated the log! if self.debug: print "Log truncated!" self.setwatch(frombeginning=1) continue if newsize > self.file.tell(): line=self.file.readline() if self.debug > 2: print "Looking at line:", line if line: last_line=line if not first_line: first_line=line if self.debug: print "First line: "+ line which=-1 for regex in self.regexes: which=which+1 if self.debug > 3: print "Comparing line to ", regex #matchobj = re.search(string.lower(regex), string.lower(line)) matchobj = re.search(regex, line) if matchobj: self.whichmatch=which if self.returnonlymatch: return matchobj.group(self.returnonlymatch) else: if self.debug: print "Returning line" return line newsize=os.path.getsize(self.filename) if self.file.tell() == newsize: if timeout > 0: time.sleep(0.025) else: if self.debug: print "End of file" if self.debug: print "Last line: "+last_line return None if self.debug: print "Timeout" if self.debug: print "Last line: "+last_line return None def lookforall(self, timeout=None): '''Examine the log looking for ALL of the given patterns. It starts looking from the place marked by setwatch(). We return when the timeout is reached, or when we have found ALL of the regexes that were part of the watch ''' if timeout == None: timeout = self.Timeout save_regexes = self.regexes returnresult = [] while (len(self.regexes) > 0): oneresult = self.look(timeout) if not oneresult: self.unmatched = self.regexes self.regexes = save_regexes return None returnresult.append(oneresult) del self.regexes[self.whichmatch] self.unmatched = None self.regexes = save_regexes return returnresult # In case we ever want multiple regexes to match a single line... #- del self.regexes[self.whichmatch] #+ tmp_regexes = self.regexes #+ self.regexes = [] #+ which = 0 #+ for regex in tmp_regexes: #+ matchobj = re.search(regex, oneresult) #+ if not matchobj: #+ self.regexes.append(regex) class NodeStatus: def __init__(self, Env): self.Env = Env self.rsh = RemoteExec() def IsNodeBooted(self, node): '''Return TRUE if the given node is booted (responds to pings)''' return os.system("@PING@ -nq -c1 @PING_TIMEOUT_OPT@ %s >/dev/null 2>&1" % node) == 0 def IsSshdUp(self, node): return self.rsh(node, "true") == 0; def WaitForNodeToComeUp(self, node, Timeout=300): '''Return TRUE when given node comes up, or None/FALSE if timeout''' timeout=Timeout anytimeouts=0 while timeout > 0: if self.IsNodeBooted(node) and self.IsSshdUp(node): if anytimeouts: # Fudge to wait for the system to finish coming up time.sleep(30) self.Env.log("Node %s now up" % node) return 1 time.sleep(1) if (not anytimeouts): self.Env.log("Waiting for node %s to come up" % node) anytimeouts=1 timeout = timeout - 1 self.Env.log("%s did not come up within %d tries" % (node, Timeout)) def WaitForAllNodesToComeUp(self, nodes, timeout=300): '''Return TRUE when all nodes come up, or FALSE if timeout''' for node in nodes: if not self.WaitForNodeToComeUp(node, timeout): return None return 1 class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def __InitialConditions(self): #if os.geteuid() != 0: # raise ValueError("Must Be Root!") None def _finalConditions(self): for key in self.keys(): if self[key] == None: raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") def __init__(self, Environment, randseed=None): self.Env = Environment self.__InitialConditions() self.clear_cache = 0 self.TestLoggingLevel=0 self.data = { "up" : "up", # Status meaning up "down" : "down", # Status meaning down "StonithCmd" : "@sbindir@/stonith -t baytech -p '10.10.10.100 admin admin' %s", "DeadTime" : 30, # Max time to detect dead node... "StartTime" : 90, # Max time to start up # # These next values need to be overridden in the derived class. # "Name" : None, "StartCmd" : None, "StopCmd" : None, "StatusCmd" : None, "RereadCmd" : None, "StartDRBDCmd" : None, "StopDRBDCmd" : None, "StatusDRBDCmd" : None, "DRBDCheckconf" : None, "BreakCommCmd" : None, "FixCommCmd" : None, "TestConfigDir" : None, "LogFileName" : None, - "Pat:We_started" : None, - "Pat:They_started" : None, + "Pat:Master_started" : None, + "Pat:Slave_started" : None, "Pat:We_stopped" : None, "Pat:They_stopped" : None, "BadRegexes" : None, # A set of "bad news" regexes # to apply to the log } self.rsh = RemoteExec() self.ShouldBeStatus={} self.OurNode=string.lower(os.uname()[1]) self.ShouldBeStatus={} self.ns = NodeStatus(self.Env) def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] def log(self, args): self.Env.log(args) def debug(self, args): self.Env.debug(args) def prepare(self): '''Finish the Initialization process. Prepare to test...''' for node in self.Env["nodes"]: if self.StataCM(node): self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] def upcount(self): '''How many nodes are up?''' count=0 for node in self.Env["nodes"]: if self.ShouldBeStatus[node]==self["up"]: count=count+1 return count def TruncLogs(self): '''Truncate the log for the cluster manager so we can start clean''' if self["LogFileName"] != None: os.system("cp /dev/null " + self["LogFileName"]) def install_config(self, node): return None def clear_all_caches(self): if self.clear_cache: for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["down"]: self.debug("Removing cache file on: "+node) self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBHBDIR@/hostcache") else: self.debug("NOT Removing cache file on: "+node) def StartaCM(self, node): '''Start up the cluster manager on a given node''' self.debug("Starting %s on node %s" %(self["Name"], node)) ret = 1 if not self.ShouldBeStatus.has_key(node): self.ShouldBeStatus[node] = self["down"] if self.ShouldBeStatus[node] != self["down"]: return 1 patterns = [] # Technically we should always be able to notice ourselves starting + patterns.append(self["Pat:Local_started"] % node) if self.upcount() == 0: - patterns.append(self["Pat:We_started"] % node) + patterns.append(self["Pat:Master_started"] % node) else: - patterns.append(self["Pat:They_started"] % node) + patterns.append(self["Pat:Slave_started"] % node) watch = LogWatcher( self["LogFileName"], patterns, timeout=self["StartTime"]+10) watch.setwatch() self.install_config(node) self.ShouldBeStatus[node] = "any" if self.StataCM(node) and self.cluster_stable(self["DeadTime"]): self.log ("%s was already started" %(node)) return 1 # Clear out the host cache so autojoin can be exercised if self.clear_cache: self.debug("Removing cache file on: "+node) self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBHBDIR@/hostcache") if self.rsh(node, self["StartCmd"]) != 0: self.log ("Warn: Start command failed on node %s" %(node)) return None self.ShouldBeStatus[node]=self["up"] watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.log ("Warn: Startup pattern not found: %s" %(regex)) if watch_result: #self.debug("Found match: "+ repr(watch_result)) self.cluster_stable(self["DeadTime"]) return 1 if self.StataCM(node) and self.cluster_stable(self["DeadTime"]): return 1 self.log ("Warn: Start failed for node %s" %(node)) return None def StartaCMnoBlock(self, node): '''Start up the cluster manager on a given node with none-block mode''' self.debug("Starting %s on node %s" %(self["Name"], node)) # Clear out the host cache so autojoin can be exercised if self.clear_cache: self.debug("Removing cache file on: "+node) self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBHBDIR@/hostcache") self.rsh.noBlock(node, self["StartCmd"]) self.ShouldBeStatus[node]=self["up"] return 1 def StopaCM(self, node): '''Stop the cluster manager on a given node''' self.debug("Stopping %s on node %s" %(self["Name"], node)) if self.ShouldBeStatus[node] != self["up"]: return 1 if self.rsh(node, self["StopCmd"]) == 0: self.ShouldBeStatus[node]=self["down"] self.cluster_stable(self["DeadTime"]) return 1 else: self.log ("Could not stop %s on node %s" %(self["Name"], node)) return None def StopaCMnoBlock(self, node): '''Stop the cluster manager on a given node with none-block mode''' self.debug("Stopping %s on node %s" %(self["Name"], node)) self.rsh.noBlock(node, self["StopCmd"]) self.ShouldBeStatus[node]=self["down"] return 1 def cluster_stable(self, timeout = None): time.sleep(self["StableTime"]) return 1 def node_stable(self, node): return 1 def RereadCM(self, node): '''Force the cluster manager on a given node to reread its config This may be a no-op on certain cluster managers. ''' rc=self.rsh(node, self["RereadCmd"]) if rc == 0: return 1 else: self.log ("Could not force %s on node %s to reread its config" % (self["Name"], node)) return None def StataCM(self, node): '''Report the status of the cluster manager on a given node''' out=self.rsh.readaline(node, self["StatusCmd"]) ret= (string.find(out, 'stopped') == -1) try: if ret: if self.ShouldBeStatus[node] == self["down"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["up"], self.ShouldBeStatus[node])) else: if self.ShouldBeStatus[node] == self["up"]: self.log( "Node status for %s is %s but we think it should be %s" % (node, self["down"], self.ShouldBeStatus[node])) except KeyError: pass if ret: self.ShouldBeStatus[node]=self["up"] else: self.ShouldBeStatus[node]=self["down"] return ret def startall(self, nodelist=None): '''Start the cluster manager on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist=self.Env["nodes"] for node in nodelist: if self.ShouldBeStatus[node] == self["down"]: if not self.StartaCM(node): ret = 0 return ret def stopall(self, nodelist=None): '''Stop the cluster managers on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist=self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: if not self.StopaCM(node): ret = 0 return ret def rereadall(self, nodelist=None): '''Force the cluster managers on every node in the cluster to reread their config files. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist=self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == self["up"]: self.RereadCM(node) def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result={} if not nodelist: nodelist=self.Env["nodes"] for node in nodelist: if self.StataCM(node): result[node] = self["up"] else: result[node] = self["down"] return result def isolate_node(self, node): '''isolate the communication between the nodes''' rc = self.rsh(node, self["BreakCommCmd"]) if rc == 0: return 1 else: self.log("Could not break the communication between the nodes frome node: %s" % node) return None def unisolate_node(self, node): '''fix the communication between the nodes''' rc = self.rsh(node, self["FixCommCmd"]) if rc == 0: return 1 else: self.log("Could not fix the communication between the nodes from node: %s" % node) return None def reducecomm_node(self,node): '''reduce the communication between the nodes''' rc = self.rsh(node, self["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"])) if rc == 0: return 1 else: self.log("Could not reduce the communication between the nodes from node: %s" % node) return None def savecomm_node(self,node): '''save current the communication between the nodes''' rc = 0 if float(self.Env["XmitLoss"])!=0 or float(self.Env["RecvLoss"])!=0 : rc = self.rsh(node, self["SaveFileCmd"]); if rc == 0: return 1 else: self.log("Could not save the communication between the nodes from node: %s" % node) return None def restorecomm_node(self,node): '''restore the saved communication between the nodes''' rc = 0 if float(self.Env["XmitLoss"])!=0 or float(self.Env["RecvLoss"])!=0 : rc = self.rsh(node, self["RestoreCommCmd"]); if rc == 0: return 1 else: self.log("Could not restore the communication between the nodes from node: %s" % node) return None def SyncTestConfigs(self): '''Synchronize test configurations throughout the cluster. This one's a no-op for FailSafe, since it does that by itself. ''' fromdir=self["TestConfigDir"] if not os.access(fromdir, os.F_OK | os.R_OK | os.W_OK): raise ValueError("Directory [" + fromdir + "] not accessible (rwx)") for node in self.Env["nodes"]: if node == self.OurNode: continue self.log("Syncing test configurations on " + node) # Perhaps I ought to use rsync... self.rsh.cp("-r", fromdir, node + ":" + fromdir) def SetClusterConfig(self, configpath="default", nodelist=None): '''Activate the named test configuration throughout the cluster. It would be useful to implement this :-) ''' pass return 1 def ResourceGroups(self): "Return a list of resource type/instance pairs for the cluster" raise ValueError("Abstract Class member (ResourceGroups)") def InternalCommConfig(self): "Return a list of paths: each patch consists of a tuple" raise ValueError("Abstract Class member (InternalCommConfig)") def HasQuorum(self, node_list): "Return TRUE if the cluster currently has quorum" # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes raise ValueError("Abstract Class member (HasQuorum)") def Components(self): raise ValueError("Abstract Class member (Components)") def RestartClusterLogging(self): self.log("WARN: Restarting logging on cluster nodes") for node in self.Env["nodes"]: cmd=self.Env["logrestartcmd"] if self.rsh.noBlock(node, cmd) != 0: self.log ("ERROR: Cannot restart logging on %s [%s failed]" % (node, cmd)) def TestLogging(self): self.TestLoggingLevel=self.TestLoggingLevel+1 ret=1 if self.TestLoggingLevel > 3: self.log("ERROR: Unable to fix remote logging. Stopping tests.") self.TestLoggingLevel=self.TestLoggingLevel-1 return None patterns= [] prefix="Test message from " for node in self.Env["nodes"]: patterns.append(prefix + node) watch = LogWatcher(self["LogFileName"], patterns, 30 + len(self.Env["nodes"])) watch.setwatch() logpri = self.Env["logfacility"] + ".info" for node in self.Env["nodes"]: cmd="logger -p %s %s%s" % (logpri, prefix, node) if self.rsh.noBlock(node, cmd) != 0: self.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) watch_result = watch.lookforall() if watch.unmatched: self.log("ERROR: Remote logging is not working.") for regex in watch.unmatched: self.log ("ERROR: Test message [%s] not found in logs." % (regex)) self.RestartClusterLogging() time.sleep(30*self.TestLoggingLevel) ret=self.TestLogging() if ret: self.log("NOTE: Cluster logging now working.") self.TestLoggingLevel=self.TestLoggingLevel-1 return ret def CheckDf(self): dfcmd="df -k /var/log | tail -1 | tr -s ' ' | cut -d' ' -f2" dfmin=500000 result=1 for node in self.Env["nodes"]: dfout=self.rsh.readaline(node, dfcmd) if not dfout: self.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) else: try: idfout = int(dfout) except (ValueError, TypeError): self.log("Warning: df output from %s was invalid [%s]" % (node, dfout)) else: if idfout == 0: self.log("CRIT: Completely out of log disk space on %s" % node) result=None elif idfout <= 1000: self.log("WARN: Low on log disk space (%d Mbytes) on %s" % (idfout, node)) return result class Resource: ''' This is an HA resource (not a resource group). A resource group is just an ordered list of Resource objects. ''' def __init__(self, cm, rsctype=None, instance=None): self.CM = cm self.ResourceType = rsctype self.Instance = instance self.needs_quorum = 1 def Type(self): return self.ResourceType def Instance(self, nodename): return self.Instance def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. It is analagous to the "status" operation on SystemV init scripts and heartbeat scripts. FailSafe calls it the "exclusive" operation. ''' raise ValueError("Abstract Class member (IsRunningOn)") return None def IsWorkingCorrectly(self, nodename): ''' This member function returns true if our resource is operating correctly on the given node in the cluster. Heartbeat does not require this operation, but it might be called the Monitor operation, which is what FailSafe calls it. For remotely monitorable resources (like IP addresses), they *should* be monitored remotely for testing. ''' raise ValueError("Abstract Class member (IsWorkingCorrectly)") return None def Start(self, nodename): ''' This member function starts or activates the resource. ''' raise ValueError("Abstract Class member (Start)") return None def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' raise ValueError("Abstract Class member (Stop)") return None def __repr__(self): if (self.Instance and len(self.Instance) > 1): return "{" + self.ResourceType + "::" + self.Instance + "}" else: return "{" + self.ResourceType + "}" class Component: def kill(self, node): None class Process(Component): def __init__(self, name, dc_only, pats, dc_pats, badnews_ignore, triggersreboot, cm): self.name = str(name) self.dc_only = dc_only self.pats = pats self.dc_pats = dc_pats self.CM = cm self.badnews_ignore = badnews_ignore self.triggersreboot = triggersreboot self.KillCmd = "killall -9 " + self.name def kill(self, node): if self.CM.rsh(node, self.KillCmd) != 0: self.CM.log ("ERROR: Kill %s failed on node %s" %(name,node)) return None return 1 class ScenarioComponent: def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return TRUE if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' raise ValueError("Abstract Class member (IsApplicable)") def SetUp(self, CM): '''Set up the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") class Scenario: ( '''The basic idea of a scenario is that of an ordered list of ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, and then after the tests have been run, they are torn down using TearDown() (in reverse order). A Scenario is applicable to a particular cluster manager iff each ScenarioComponent is applicable. A partially set up scenario is torn down if it fails during setup. ''') def __init__(self, Components): "Initialize the Scenario from the list of ScenarioComponents" for comp in Components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of" " ScenarioComponent") self.Components = Components def IsApplicable(self): ( '''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() ''' ) for comp in self.Components: if not comp.IsApplicable(): return None return 1 def SetUp(self, CM): '''Set up the Scenario. Return TRUE on success.''' j=0 while j < len(self.Components): if not self.Components[j].SetUp(CM): # OOPS! We failed. Tear partial setups down. CM.log("Tearing down partial setup") self.TearDown(CM, j) return None j=j+1 return 1 def TearDown(self, CM, max=None): '''Tear Down the Scenario - in reverse order.''' if max == None: max = len(self.Components)-1 j=max while j >= 0: self.Components[j].TearDown(CM) j=j-1 class InitClusterManager(ScenarioComponent): ( '''InitClusterManager is the most basic of ScenarioComponents. This ScenarioComponent simply starts the cluster manager on all the nodes. It is fairly robust as it waits for all nodes to come up before starting as they might have been rebooted or crashed for some reason beforehand. ''') def __init__(self, Env): pass def IsApplicable(self): '''InitClusterManager is so generic it is always Applicable''' return 1 def SetUp(self, CM): '''Basic Cluster Manager startup. Start everything''' CM.prepare() # Clear out the cobwebs ;-) self.TearDown(CM) for node in CM.Env["nodes"]: CM.rsh(node, CM["DelFileCommCmd"]+ "; true") # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all nodes.") return CM.startall() def TearDown(self, CM): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Stopping Cluster Manager on all nodes") return CM.stopall() class PingFest(ScenarioComponent): ( '''PingFest does a flood ping to each node in the cluster from the test machine. If the LabEnvironment Parameter PingSize is set, it will be used as the size of ping packet requested (via the -s option). If it is not set, it defaults to 1024 bytes. According to the manual page for ping: Outputs packets as fast as they come back or one hundred times per second, whichever is more. For every ECHO_REQUEST sent a period ``.'' is printed, while for every ECHO_REPLY received a backspace is printed. This provides a rapid display of how many packets are being dropped. Only the super-user may use this option. This can be very hard on a net- work and should be used with caution. ''' ) def __init__(self, Env): self.Env = Env def IsApplicable(self): '''PingFests are always applicable ;-) ''' return 1 def SetUp(self, CM): '''Start the PingFest!''' self.PingSize=1024 if CM.Env.has_key("PingSize"): self.PingSize=CM.Env["PingSize"] CM.log("Starting %d byte flood pings" % self.PingSize) self.PingPids=[] for node in CM.Env["nodes"]: self.PingPids.append(self._pingchild(node)) CM.log("Ping PIDs: " + repr(self.PingPids)) return 1 def TearDown(self, CM): '''Stop it right now! My ears are pinging!!''' for pid in self.PingPids: if pid != None: CM.log("Stopping ping process %d" % pid) os.kill(pid, signal.SIGKILL) def _pingchild(self, node): Args = ["ping", "-qfn", "-s", str(self.PingSize), node] sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid < 0: self.Env.log("Cannot fork ping child") return None if pid > 0: return pid # Otherwise, we're the child process. os.execvp("ping", Args) self.Env.log("Cannot execvp ping: " + repr(Args)) sys.exit(1) class PacketLoss(ScenarioComponent): ( ''' It would be useful to do some testing of CTS with a modest amount of packet loss enabled - so we could see that everything runs like it should with a certain amount of packet loss present. ''') def IsApplicable(self): '''always Applicable''' return 1 def SetUp(self, CM): '''Reduce the reliability of communications''' if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : return 1 for node in CM.Env["nodes"]: CM.reducecomm_node(node) CM.log("Reduce the reliability of communications") return 1 def TearDown(self, CM): '''Fix the reliability of communications''' if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : return 1 for node in CM.Env["nodes"]: CM.unisolate_node(node) CM.log("Fix the reliability of communications") class BasicSanityCheck(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["DoBSC"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on BSC node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on BSC node(s).") return CM.stopall() diff --git a/cts/CTStests.py.in b/cts/CTStests.py.in index 4dddfc0f54..c79801db22 100644 --- a/cts/CTStests.py.in +++ b/cts/CTStests.py.in @@ -1,2516 +1,2520 @@ #!@PYTHON@ '''CTS: Cluster Testing System: Tests module There are a few things we want to do here: ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. Add RecourceRecover testcase Zhao Kai ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # SPECIAL NOTE: # # Tests may NOT implement any cluster-manager-specific code in them. # EXTEND the ClusterManager object to provide the base capabilities # the test needs if you need to do something that the current CM classes # do not. Otherwise you screw up the whole point of the object structure # in CTS. # # Thank you. # import CTS from CM_hb import HBConfig import CTSaudits import time, os, re, types, string, tempfile, sys from CTSaudits import * from stat import * # List of all class objects for tests which we ought to # consider running. class RandomTests: ''' A collection of tests which are run at random. ''' def __init__(self, scenario, cm, tests, Audits): self.CM = cm self.Env = cm.Env self.Scenario = scenario self.Tests = [] self.Audits = [] self.ns=CTS.NodeStatus(self.Env) for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") if test.is_applicable(): self.Tests.append(test) if not scenario.IsApplicable(): raise ValueError("Scenario not applicable in" " given Environment") self.Stats = {"success":0, "failure":0, "BadNews":0} self.IndividualStats= {} for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be a subclass of ClusterAudit") if audit.is_applicable(): self.Audits.append(audit) def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def audit(self, BadNews, test): errcount=0 BadNewsDebug=0 #BadNews.debug=1 ignorelist = [] ignorelist.append(" CTS: ") ignorelist.append("BadNews:") ignorelist.extend(self.CM.errorstoignore()) if test: ignorelist.extend(test.errorstoignore()) while errcount < 1000: if BadNewsDebug: print "Looking for BadNews" match=BadNews.look(0) if match: if BadNewsDebug: print "BadNews found: "+match add_err = 1 for ignore in ignorelist: if add_err == 1 and re.search(ignore, match): if BadNewsDebug: print "Ignoring based on pattern: ("+ignore+")" add_err = 0 if add_err == 1: self.CM.log("BadNews: " + match) self.incr("BadNews") errcount=errcount+1 else: break else: self.CM.log("Big problems. Shutting down.") self.CM.stopall() self.summarize() raise ValueError("Looks like we hit the jackpot! :-)") for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " FAILED.") self.incr("auditfail") if test: test.incr("auditfail") def summarize(self): self.CM.log("****************") self.CM.log("Overall Results:" + repr(self.Stats)) self.CM.log("****************") self.CM.log("Detailed Results") for test in self.Tests: self.CM.log("Test %s: \t%s" %(test.name, repr(test.Stats))) self.CM.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def run(self, max=1): ( ''' Set up the given scenario, then run the selected tests at random for the selected number of iterations. ''') BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"] , timeout=0) BadNews.setwatch() self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.log("Enabling oprofile on %s" % node) self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init") self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start") if not self.Scenario.SetUp(self.CM): return None for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.rsh.remote_py( node, "os", "system", "opcontrol --save=cts.setup") testcount=1 time.sleep(30) # This makes sure everything is stabilized before starting... self.audit(BadNews, None) while testcount <= max: test = self.Env.RandomGen.choice(self.Tests) # Some tests want a node as an argument. nodechoice = self.Env.RandomNode() #logsize = os.stat(self.CM["LogFileName"])[ST_SIZE] #self.CM.log("Running test %s (%s) \t[%d : %d]" # % (test.name, nodechoice, testcount, logsize)) self.CM.log("Running test %s (%s) \t[%d]" % (test.name, nodechoice, testcount)) testcount = testcount + 1 starttime=time.time() test.starttime=starttime ret=test(nodechoice) for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.rsh.remote_py( node, "os", "system", "opcontrol --save=cts.%d" % (testcount-1)) if ret: self.incr("success") else: self.incr("failure") self.CM.log("Test %s (%s) \t[FAILED]" %(test.name,nodechoice)) # Better get the current info from the cluster... self.CM.statall() # Make sure logging is working and we have enough disk space... if not self.CM.Env["DoBSC"]: if not self.CM.TestLogging(): sys.exit(1) if not self.CM.CheckDf(): sys.exit(1) stoptime=time.time() elapsed_time = stoptime - starttime test_time = stoptime - test.starttime if not test.has_key("min_time"): test["elapsed_time"] = elapsed_time test["min_time"] = test_time test["max_time"] = test_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if test_time < test["min_time"]: test["min_time"] = test_time if test_time > test["max_time"]: test["max_time"] = test_time self.audit(BadNews, test) self.Scenario.TearDown(self.CM) for node in self.CM.Env["nodes"]: if node in self.CM.Env["oprofile"]: self.CM.log("Disabling oprofile on %s" % node) self.CM.rsh.remote_py(node, "os", "system", "opcontrol --shutdown") self.audit(BadNews, None) for test in self.Tests: self.IndividualStats[test.name] = test.Stats return self.Stats, self.IndividualStats AllTestClasses = [ ] class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.timeout=120 self.starttime=0 def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def failure(self, reason="none"): '''Increment the failure count''' self.incr("failure") self.CM.log("Test " + self.name + " failed [reason:" + reason + "]") return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def canrunnow(self): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] ################################################################### class StopTest(CTSTest): ################################################################### '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="Stop" - self.uspat = self.CM["Pat:We_stopped"] - self.thempat = self.CM["Pat:They_stopped"] def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: return self.skipped() patterns = [] # Technically we should always be able to notice ourselves stopping patterns.append(self.CM["Pat:We_stopped"] % node) if self.CM.Env["use_logd"]: patterns.append(self.CM["Pat:Logd_stopped"] % node) # Any active node needs to notice this one left # NOTE: This wont work if we have multiple partitions for other in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node: patterns.append(self.CM["Pat:They_stopped"] %(other, node)) #self.debug("Checking %s will notice %s left"%(other, node)) watch = CTS.LogWatcher( self.CM["LogFileName"], patterns, self.CM["DeadTime"]) watch.setwatch() if node == self.CM.OurNode: self.incr("us") else: if self.CM.upcount() <= 1: self.incr("all") else: self.incr("them") self.CM.StopaCM(node) watch_result = watch.lookforall() failreason=None UnmatchedList = "||" if watch.unmatched: (rc, output) = self.CM.rsh.remote_py(node, "os", "system", "/bin/ps axf") for line in output: self.CM.debug(line) for regex in watch.unmatched: self.CM.log ("ERROR: Shutdown pattern not found: %s" % (regex)) UnmatchedList += regex + "||"; failreason="Missing shutdown pattern" self.CM.cluster_stable(self.CM["DeadTime"]) if not watch.unmatched or self.CM.upcount() == 0: return self.success() if len(watch.unmatched) >= self.CM.upcount(): return self.failure("no match against (%s)" % UnmatchedList) if failreason == None: return self.success() else: return self.failure(failreason) # # We don't register StopTest because it's better when called by # another test... # ################################################################### class StartTest(CTSTest): ################################################################### '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name="start" self.debug = debug - self.uspat = self.CM["Pat:We_started"] - self.thempat = self.CM["Pat:They_started"] def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self.CM.upcount() == 0: self.incr("us") else: self.incr("them") if self.CM.ShouldBeStatus[node] != self.CM["down"]: return self.skipped() elif self.CM.StartaCM(node): return self.success() else: return self.failure("Startup %s on node %s failed" %(self.CM["Name"], node)) def is_applicable(self): '''StartTest is always applicable''' return 1 # # We don't register StartTest because it's better when called by # another test... # ################################################################### class FlipTest(CTSTest): ################################################################### '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'Flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("stopped") ret = self.stop(node) type="up->down" # Give the cluster time to recognize it's gone... time.sleep(self.CM["StableTime"]) elif self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("started") ret = self.start(node) type="down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) def is_applicable(self): '''FlipTest is always applicable''' return 1 # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) ################################################################### class RestartTest(CTSTest): ################################################################### '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Restart" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) ret1 = 1 if self.CM.StataCM(node): self.incr("WasStopped") if not self.start(node): return self.failure("start (setup) failure: "+node) self.starttime=time.time() if not self.stop(node): return self.failure("stop failure: "+node) if not self.start(node): return self.failure("start failure: "+node) return self.success() def is_applicable(self): '''RestartTest is always applicable''' return 1 # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) ################################################################### class StonithTest(CTSTest): ################################################################### '''Reboot a node by whacking it with stonith.''' def __init__(self, cm, timeout=900): CTSTest.__init__(self,cm) self.name="Stonith" self.theystopped = self.CM["Pat:They_dead"] self.allstopped = self.CM["Pat:All_stopped"] - self.usstart = self.CM["Pat:We_started"] - self.themstart = self.CM["Pat:They_started"] + self.usstart = self.CM["Pat:Master_started"] + self.themstart = self.CM["Pat:Slave_started"] self.timeout = timeout self.ssherror = False def _reset(self, node): StonithWorked=False for tries in 1,2,3,4,5: if self.CM.Env.ResetNode(node): StonithWorked=True break return StonithWorked def setup(self, target_node): # nothing to do return 1 def __call__(self, node): '''Perform the 'stonith' test. (whack the node)''' self.incr("calls") stopwatch = 0 rc = 0 if not self.setup(node): return self.failure("Setup failed") # Figure out what log message to look for when/if it goes down # # Any active node needs to notice this one left # NOTE: This wont work if we have multiple partitions stop_patterns = [] for other in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node: stop_patterns.append(self.CM["Pat:They_stopped"] %(other, node)) stopwatch = 1 #self.debug("Checking %s will notice %s left"%(other, node)) if self.CM.ShouldBeStatus[node] == self.CM["down"]: # actually no-one will notice this node die since HA isnt running stopwatch = 0 # Figure out what log message to look for when it comes up if self.CM.upcount() == 1 and self.CM.ShouldBeStatus[node] == self.CM["up"]: uppat = (self.usstart % node) else: uppat = (self.themstart % node) upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat] , timeout=self.timeout) if stopwatch == 1: watch = CTS.LogWatcher(self.CM["LogFileName"], stop_patterns , timeout=self.CM["DeadTime"]+10) watch.setwatch() # Reset (stonith) the node self.CM.debug("Resetting: "+node) StonithWorked = self._reset(node) if not StonithWorked: return self.failure("Stonith didn't work") if self.ssherror == True: self.CM.log("NOTE: Stonith command reported success but node %s did not restart (atd, reboot or ssh error)" % node) return self.success() upwatch.setwatch() # Look() and see if the machine went down if stopwatch == 0: # Allow time for the node to die time.sleep(self.CM["DeadTime"]+10) elif not watch.lookforall(): if watch.unmatched: for regex in watch.unmatched: self.CM.log("Warn: STONITH pattern not found: %s"%regex) # !!no-one!! saw this node die if len(watch.unmatched) == len(stop_patterns): return self.failure("No-one saw %s die" %node) # else: syslog* lost a message # Alas I dont think this check is plausable (beekhof) # # Check it really stopped... #self.CM.ShouldBeStatus[node] = self.CM["down"] #if self.CM.StataCM(node) == 1: # ret1=0 # Look() and see if the machine came back up rc=0 if upwatch.look(): self.CM.debug("Startup pattern found: %s" %uppat) rc=1 else: self.CM.log("Warn: Startup pattern not found: %s" %uppat) # Check it really started... self.CM.ShouldBeStatus[node] = self.CM["up"] if rc == 0 and self.CM.StataCM(node) == 1: rc=1 # wait for the cluster to stabilize self.CM.cluster_stable() if node in self.CM.Env["oprofile"]: self.CM.log("Enabling oprofile on %s" % node) self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init") self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start") # return case processing if rc == 0: return self.failure("Node %s did not restart" %node) else: return self.success() def is_applicable(self): '''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE''' # for v2, stonithd test is a better test to run. if self.CM["Name"] == "linux-ha-v2": return None if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 # Register StonithTest as a good test to run AllTestClasses.append(StonithTest) ################################################################### class StonithdTest(StonithTest): ################################################################### def __init__(self, cm, timeout=600): StonithTest.__init__(self, cm, timeout=600) self.name="Stonithd" self.startall = SimulStartLite(cm) self.start = StartTest(cm) self.stop = StopTest(cm) self.init_node = None def _reset(self, target_node): if len(self.CM.Env["nodes"]) < 2: return self.skipped() StonithWorked = False SshNotWork = 0 for tries in range(1,5): # For some unknown reason, every now and then the ssh plugin just # can't kill the target_node - everything works fine with stonithd # and the plugin, but atd, reboot or ssh (or maybe something else) # doesn't do its job and target_node remains alive. So look for # the indicative messages and bubble-up the error via ssherror watchpats = [] watchpats.append("Initiating ssh-reset") watchpats.append("CRIT: still able to ping") watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+30) watch.setwatch() fail_reasons = [] if self.CM.Env.ResetNode2(self.init_node, target_node, fail_reasons): StonithWorked = True break if watch.lookforall(): SshNotWork = SshNotWork + 1 continue for reason in fail_reasons: self.CM.log(reason) if StonithWorked == False and SshNotWork == tries: StonithWorked = True self.ssherror = True return StonithWorked def setup(self, target_node): if len(self.CM.Env["nodes"]) < 2: return 1 self.init_node = self.CM.Env.RandomNode() while self.init_node == target_node: self.init_node = self.CM.Env.RandomNode() if not self.startall(None): return self.failure("Test setup failed") return 1 def is_applicable(self): if not self.CM["Name"] == "linux-ha-v2": return 0 if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 AllTestClasses.append(StonithdTest) ################################################################### class IPaddrtest(CTSTest): ################################################################### '''Find the machine supporting a particular IP address, and knock it down. [Hint: This code isn't finished yet...] ''' def __init__(self, cm, IPaddrs): CTSTest.__init__(self,cm) self.name="IPaddrtest" self.IPaddrs = IPaddrs self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, IPaddr): ''' Perform the IPaddr test... ''' self.incr("calls") node = self.CM.Env.RandomNode() self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["StableTime"]) ret2 = self.start(node) if not ret1: return self.failure("Could not stop") if not ret2: return self.failure("Could not start") return self.success() def is_applicable(self): '''IPaddrtest is always applicable (but shouldn't be)''' return 1 ################################################################### class StartOnebyOne(CTSTest): ################################################################### '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StartOnebyOne" self.stopall = SimulStopLite(cm) self.start = StartTest(cm) self.ns=CTS.NodeStatus(cm.Env) def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Test setup failed") failed=[] self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.start(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to start: " + repr(failed)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''StartOnebyOne is always applicable''' return 1 # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) ################################################################### class SimulStart(CTSTest): ################################################################### '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStart" self.stopall = SimulStopLite(cm) self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... ret = self.stopall(None) if not ret: return self.failure("Setup failed") self.CM.clear_all_caches() if not self.startall(None): return self.failure("Startall failed") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''SimulStart is always applicable''' return 1 # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) ################################################################### class SimulStop(CTSTest): ################################################################### '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStop" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.stopall(None): return self.failure("Stopall failed") return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''SimulStop is always applicable''' return 1 # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) ################################################################### class StopOnebyOne(CTSTest): ################################################################### '''Stop all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StopOnebyOne" self.startall = SimulStartLite(cm) self.stop = StopTest(cm) def __call__(self, dummy): '''Perform the 'StopOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") failed=[] self.starttime=time.time() for node in self.CM.Env["nodes"]: if not self.stop(node): failed.append(node) if len(failed) > 0: return self.failure("Some node failed to stop: " + repr(failed)) self.CM.clear_all_caches() return self.success() def is_applicable(self): '''StopOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(StopOnebyOne) ################################################################### class RestartOnebyOne(CTSTest): ################################################################### '''Restart all the nodes in order''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="RestartOnebyOne" self.startall = SimulStartLite(cm) def __call__(self, dummy): '''Perform the 'RestartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... ret = self.startall(None) if not ret: return self.failure("Setup failed") did_fail=[] self.starttime=time.time() self.restart = RestartTest(self.CM) for node in self.CM.Env["nodes"]: if not self.restart(node): did_fail.append(node) if did_fail: return self.failure("Could not restart %d nodes: %s" %(len(did_fail), repr(did_fail))) return self.success() def is_applicable(self): '''RestartOnebyOne is always applicable''' return 1 # Register StopOnebyOne as a good test to run AllTestClasses.append(RestartOnebyOne) ################################################################### class PartialStart(CTSTest): ################################################################### '''Start a node - but tell it to stop before it finishes starting up''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="PartialStart" self.startall = SimulStartLite(cm) self.stopall = SimulStopLite(cm) def __call__(self, node): '''Perform the 'PartialStart' test. ''' self.incr("calls") ret = self.stopall(None) if not ret: return self.failure("Setup failed") # FIXME! This should use the CM class to get the pattern # then it would be applicable in general watchpats = [] watchpats.append("Starting crmd") watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats, timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.StartaCMnoBlock(node) ret = watch.lookforall() if not ret: self.CM.log("Patterns not found: " + repr(watch.unmatched)) return self.failure("Setup of %s failed" % node) ret = self.stopall(None) if not ret: return self.failure("%s did not stop in time" % node) return self.success() def is_applicable(self): '''Partial is always applicable''' if self.CM["Name"] == "linux-ha-v2": return 1 else: return 0 # Register StopOnebyOne as a good test to run AllTestClasses.append(PartialStart) ################################################################### class StandbyTest(CTSTest): ################################################################### '''Put a node in standby mode''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby" self.successpat = self.CM["Pat:StandbyOK"] self.nostandbypat = self.CM["Pat:StandbyNONE"] self.transient = self.CM["Pat:StandbyTRANSIENT"] def __call__(self, node): '''Perform the 'standby' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() if self.CM.upcount() < 2: self.incr("nostandby") pat = self.nostandbypat else: self.incr("standby") pat = self.successpat # # You could make a good argument that the cluster manager # ought to give us good clues on when its a bad time to # switch over to the other side, but heartbeat doesn't... # It could also queue the request. But, heartbeat # doesn't do that either :-) # retrycount=0 while (retrycount < 10): watch = CTS.LogWatcher(self.CM["LogFileName"] , [pat, self.transient] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.rsh(node, self.CM["Standby"]) match = watch.look() if match: if re.search(self.transient, match): self.incr("retries") time.sleep(2) retrycount=retrycount+1 else: return self.success() else: break # No point in retrying... return self.failure("did not find pattern " + pat) def is_applicable(self): '''StandbyTest is applicable when the CM has a Standby command''' if not self.CM.has_key("Standby"): return None else: #if self.CM.Env.has_key("DoStandby"): #flag=self.CM.Env["DoStandby"] #if type(flag) == types.IntType: #return flag #if not re.match("[yt]", flag, re.I): #return None # # We need to strip off everything after the first blank # cmd=self.CM["Standby"] cmd = cmd.split()[0] if not os.access(cmd, os.X_OK): return None cf = self.CM.cf if not cf.Parameters.has_key("auto_failback"): return None elif cf.Parameters["auto_failback"][0] == "legacy": return None return 1 # Register StandbyTest as a good test to run AllTestClasses.append(StandbyTest) ####################################################################### class StandbyTest2(CTSTest): ####################################################################### '''Standby with CRM of HA release 2''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby2" self.start = StartTest(cm) self.startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resouces, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): self.incr("calls") ret=self.startall(None) if not ret: return self.failure("Start all nodes failed") self.CM.debug("Make sure node %s is active" % node) if self.CM.StandbyStatus(node) != "off": if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.CM.debug("Getting resources running on node %s" % node) rsc_on_node = [] for rsc in self.CM.Resources(): if rsc.IsRunningOn(node): rsc_on_node.append(rsc) self.CM.debug("Setting node %s to standby mode" % node) if not self.CM.SetStandbyMode(node, "on"): return self.failure("can't set node %s to standby mode" % node) time.sleep(30) # Allow time for the update to be applied and cause something self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "on": return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status)) self.CM.debug("Checking resources") for rsc in self.CM.Resources(): if rsc.IsRunningOn(node): return self.failure("%s set to standby, %s is still running on it" % (node, rsc.rid)) self.CM.debug("Setting node %s to active mode" % node) if not self.CM.SetStandbyMode(node, "off"): return self.failure("can't set node %s to active mode" % node) time.sleep(30) # Allow time for the update to be applied and cause something self.CM.cluster_stable() status = self.CM.StandbyStatus(node) if status != "off": return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) self.CM.debug("Checking resources") for rsc in rsc_on_node: if not rsc.IsRunningOn(node): return self.failure("%s set to active but %s is NOT back" % (node, rsc.rid)) return self.success() def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllTestClasses.append(StandbyTest2) ####################################################################### class Fastdetection(CTSTest): ####################################################################### '''Test the time which one node find out the other node is killed very quickly''' def __init__(self,cm,timeout=60): CTSTest.__init__(self, cm) self.name = "DetectionTime" self.they_stopped = self.CM["Pat:They_stopped"] self.timeout = timeout self.start = StartTest(cm) self.startall = SimulStartLite(cm) self.standby = StandbyTest(cm) self.__setitem__("min", 0) self.__setitem__("max", 0) self.__setitem__("totaltime", 0) def __call__(self, node): '''Perform the fastfailureDetection test''' self.incr("calls") ret=self.startall(None) if not ret: return self.failure("Test setup failed") if self.CM.upcount() < 2: return self.skipped() # Make sure they're not holding any resources ret = self.standby(node) if not ret: return ret stoppat = (self.they_stopped % ("", node)) stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout) stopwatch.setwatch() # # This test is CM-specific - FIXME!! # if self.CM.rsh(node, "killall -9 heartbeat")==0: Starttime = os.times()[4] if stopwatch.look(): Stoptime = os.times()[4] # This test is CM-specific - FIXME!! self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") Detectiontime = Stoptime-Starttime detectms = int(Detectiontime*1000+0.5) self.CM.log("...failure detection time: %d ms" % detectms) self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime if self.Stats["min"] == 0: self.Stats["min"] = Detectiontime if Detectiontime > self.Stats["max"]: self.Stats["max"] = Detectiontime if Detectiontime < self.Stats["min"]: self.Stats["min"] = Detectiontime self.CM.ShouldBeStatus[node] = self.CM["down"] self.start(node) return self.success() else: # This test is CM-specific - FIXME!! self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") self.CM.ShouldBeStatus[node] = self.CM["down"] ret=self.start(node) return self.failure("Didn't find the log message") else: return self.failure("Couldn't kill cluster manager") def is_applicable(self): '''This test is applicable when auto_failback != legacy''' return self.standby.is_applicable() # This test is CM-specific - FIXME!! def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ "ccm.*ERROR: ccm_control_process:failure to send protoversion request" , "ccm.*ERROR: Lost connection to heartbeat service. Need to bail out" ] AllTestClasses.append(Fastdetection) ############################################################################## class BandwidthTest(CTSTest): ############################################################################## # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which heartbeat uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) self.tempfile = tempfile.mktemp(".cts") self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount()<1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] port = int(port) ret = self.startall(None) if not ret: return self.failure("Test setup failed") time.sleep(5) # We get extra messages right after startup. fstmpfile = "/var/run/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile) rc = self.CM.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) self.CM.rsh.cp(farfile, self.tempfile) Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.CM.log("Could not compute bandwidth.") return self.success() intband = int(Bandwidth + 0.5) self.CM.log("...bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.CM.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): fp = open(file, "r") fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count=count+1 linesplit = string.split(line," ") for j in range(len(linesplit)-1): if linesplit[j]=="udp": break if linesplit[j]=="length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = string.split(T1,":") time2split = string.split(timesplit[2],".") time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = string.split(line," ") for j in range(len(linessplit)-1): if linessplit[j] =="udp": break if linesplit[j]=="length:": break try: sum=int(linessplit[j+1])+sum except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = string.split(T2,":") time2split = string.split(timesplit[2],".") time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return (sum*8)/time def is_applicable(self): '''BandwidthTest is always applicable''' return 0 AllTestClasses.append(BandwidthTest) ########################################################################## class RedundantpathTest(CTSTest): ########################################################################## '''In heartbeat, it has redundant path to communicate between the cluster''' # # Tests should not be cluster-manager specific # One needs to isolate what you need from the cluster manager and then # add a (new) API to do it. # def __init__(self,cm,timeout=60): CTSTest.__init__(self,cm) self.name = "RedundantpathTest" self.timeout = timeout def PathCount(self): '''Return number of communication paths''' Path = self.CM.InternalCommConfig() cf = self.CM.cf eths = [] serials = [] num = 0 for interface in Path["interface"]: if re.search("eth",interface): eths.append(interface) num = num + 1 if re.search("/dev",interface): serials.append(interface) num = num + 1 return (num, eths, serials) def __call__(self,node): '''Perform redundant path test''' self.incr("calls") if self.CM.ShouldBeStatus[node]!=self.CM["up"]: return self.skipped() (num, eths, serials) = self.PathCount() for eth in eths: if self.CM.rsh(node,"ifconfig %s down" % eth)==0: PathDown = "OK" break if PathDown != "OK": for serial in serials: if self.CM.rsh(node,"setserial %s uart none" % serial)==0: PathDown = "OK" break if PathDown != "OK": return self.failure("Cannot break the path") time.sleep(self.timeout) for audit in CTSaudits.AuditList(self.CM): if not audit(): for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.failure("Redundant path fail") for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.success() def is_applicable(self): '''It is applicable when you have more than one connection''' return self.PathCount()[0] > 1 # FIXME!! Why is this one commented out? #AllTestClasses.append(RedundantpathTest) ########################################################################## class DRBDTest(CTSTest): ########################################################################## '''In heartbeat, it provides replicated storage.''' def __init__(self,cm, timeout=10): CTSTest.__init__(self,cm) self.name = "DRBD" self.timeout = timeout def __call__(self, dummy): '''Perform the 'DRBD' test.''' self.incr("calls") for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() # Note: All these special cases with Start/Stop/StatusDRBD # should be reworked to use resource objects instead of # being hardwired to bypass the objects here. for node in self.CM.Env["nodes"]: done=time.time()+self.timeout+1 while (time.time()done: return self.failure("Can't start drbd, please check it") device={} for node in self.CM.Env["nodes"]: device[node]=self.getdevice(node) node = self.CM.Env["nodes"][0] done=time.time()+self.timeout+1 while 1: if (time.time()>done): return self.failure("the drbd could't sync") self.CM.rsh(node,"cp /proc/drbd /var/run >/dev/null 2>&1") if self.CM.rsh.cp("%s:/var/run/drbd" % node,"/var/run"): line = open("/tmp/var/run").readlines()[2] p = line.find("Primary") s1 = line.find("Secondary") s2 = line.rfind("Secondary") if s1!=s2: if self.CM.rsh(node,"drbdsetup %s primary" % device[node]): pass if p!=-1: if p/dev/null" % (self.rid, node)) watch.lookforall() self.CM.cluster_stable() recovernode=self.CM.ResourceLocation(self.rid) if len(recovernode)==1: self.CM.debug("Recovered: %s is running on %s" %(self.rid, recovernode[0])) if not watch.unmatched: return self.success() else: return self.failure("Patterns not found: %s" % repr(watch.unmatched)) elif len(recovernode)==0: return self.failure("%s was not recovered and is inactive" % self.rid) else: return self.failure("%s is now active on more than one node: %s" %(self.rid, str(recovernode))) def is_applicable(self): '''ResourceRecover is applicable only when there are resources running on our cluster and environment is linux-ha-v2''' if self.CM["Name"] == "linux-ha-v2": resourcelist=self.CM.Resources() if len(resourcelist)==0: self.CM.log("No resources on this cluster") return 0 else: return 1 return 0 def errorstoignore(self): '''Return list of errors which should be ignored''' return [ """Updating failcount for %s""" % self.rid, """Unknown operation: fail""", """ERROR: sending stonithRA op to stonithd failed.""", """ERROR: process_lrm_event: LRM operation %s_%s_%d""" % (self.rid, self.action, self.interval), """ERROR: process_graph_event: Action %s_%s_%d initiated outside of a transition""" % (self.rid, self.action, self.interval), ] AllTestClasses.append(ResourceRecover) ################################################################### class ComponentFail(CTSTest): ################################################################### def __init__(self, cm): CTSTest.__init__(self,cm) self.name="ComponentFail" self.startall = SimulStartLite(cm) self.complist = cm.Components() self.patterns = [] self.okerrpatterns = [] def __call__(self, node): '''Perform the 'ComponentFail' test. ''' self.incr("calls") self.patterns = [] self.okerrpatterns = [] # start all nodes ret = self.startall(None) if not ret: return self.failure("Setup failed") if not self.CM.cluster_stable(self.CM["StableTime"]): return self.failure("Setup failed - unstable") node_is_dc = self.CM.is_node_dc(node, None) # select a component to kill chosen = self.CM.Env.RandomGen.choice(self.complist) while chosen.dc_only == 1 and node_is_dc == 0: chosen = self.CM.Env.RandomGen.choice(self.complist) self.CM.log("...component %s (dc=%d,boot=%d)" % (chosen.name, node_is_dc,chosen.triggersreboot)) self.incr(chosen.name) self.patterns.extend(chosen.pats) # Make sure the node goes down and then comes back up if it should reboot... if chosen.triggersreboot: for other in self.CM.Env["nodes"]: if other != node: self.patterns.append(self.CM["Pat:They_stopped"] %(other, node)) - self.patterns.append(self.CM["Pat:They_started"] % node) + self.patterns.append(self.CM["Pat:Slave_started"] % node) + self.patterns.append(self.CM["Pat:Local_started"] % node) # In an ideal world, this next stuff should be in the "chosen" object as a member function if chosen.dc_only: if chosen.triggersreboot: # Sometimes these will be in the log, and sometimes they won't... self.okerrpatterns.append("%s crmd:.*Process %s:.* exited" %(node, chosen.name)) self.okerrpatterns.append("%s crmd:.*I_ERROR.*crmdManagedChildDied" %node) self.okerrpatterns.append("%s crmd:.*The %s subsystem terminated unexpectedly" %(node, chosen.name)) self.okerrpatterns.append("ERROR: Client .* exited with return code") else: self.patterns.append("%s crmd:.*Process %s:.* exited" %(node, chosen.name)) self.patterns.append("%s crmd:.*I_ERROR.*crmdManagedChildDied" %node) self.patterns.append("%s crmd:.*The %s subsystem terminated unexpectedly" %(node, chosen.name)) else: if chosen.triggersreboot: # Sometimes this won't be in the log... self.okerrpatterns.append("%s heartbeat.*%s.*killed by signal 9" %(node, chosen.name)) self.okerrpatterns.append("%s heartbeat.*Respawning client.*%s" %(node, chosen.name)) self.okerrpatterns.append("ERROR: Client .* exited with return code") else: self.patterns.append("%s heartbeat.*%s.*killed by signal 9" %(node, chosen.name)) self.patterns.append("%s heartbeat.*Respawning client.*%s" %(node, chosen.name)) if node_is_dc: self.patterns.extend(chosen.dc_pats) # supply a copy so self.patterns doesnt end up empty tmpPats = [] tmpPats.extend(self.patterns) self.patterns.extend(chosen.badnews_ignore) # set the watch for stable watch = CTS.LogWatcher( self.CM["LogFileName"], tmpPats, self.CM["DeadTime"] + self.CM["StableTime"] + self.CM["StartTime"]) watch.setwatch() # kill the component chosen.kill(node) # check to see Heartbeat noticed matched = watch.lookforall() if not matched: self.CM.log("Patterns not found: " + repr(watch.unmatched)) self.CM.cluster_stable(self.CM["StartTime"]) return self.failure("Didn't find all expected patterns") self.CM.debug("Found: "+ repr(matched)) # now watch it recover... for attempt in (1, 2, 3, 4, 5): self.CM.debug("Waiting for the cluster to recover...") if self.CM.cluster_stable(self.CM["StartTime"]): return self.success() return self.failure("Cluster did not become stable") def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 def errorstoignore(self): '''Return list of errors which should be ignored''' # Note that okerrpatterns refers to the last time we ran this test # The good news is that this works fine for us... self.okerrpatterns.extend(self.patterns) return self.okerrpatterns AllTestClasses.append(ComponentFail) #################################################################### class Split_brainTest2(CTSTest): #################################################################### '''It is used to test split-brain. when the path between the two nodes break check the two nodes both take over the resource''' def __init__(self,cm): CTSTest.__init__(self,cm) self.name = "Split_brain2" self.start = StartTest(cm) self.startall = SimulStartLite(cm) def __call__(self, node): '''Perform split-brain test''' self.incr("calls") ret = self.startall(None) if not ret: return self.failure("Setup failed") count1 = self.CM.Env.RandomGen.randint(1,len(self.CM.Env["nodes"])-1) partition1 = [] while len(partition1) < count1: select = self.CM.Env.RandomGen.choice(self.CM.Env["nodes"]) if not select in partition1: partition1.append(select) partition2 = [] for member in self.CM.Env["nodes"]: if not member in partition1: partition2.append(member) allownodes1 = "" for member in partition1: allownodes1 += member + " " allownodes2 = "" for member in partition2: allownodes2 += member + " " self.CM.log("Partition1: " + str(partition1)) self.CM.log("Partition2: " + str(partition2)) '''isolate nodes, Look for node is dead message''' watchdeadpats = [ ] deadpat = self.CM["Pat:They_dead"] for member in self.CM.Env["nodes"]: thispat = (deadpat % member) watchdeadpats.append(thispat) watchdead = CTS.LogWatcher(self.CM["LogFileName"], watchdeadpats\ , timeout=self.CM["DeadTime"]+60) watchdead.ReturnOnlyMatch() watchdead.setwatch() for member in partition1: if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 : self.CM.savecomm_node(node) if not self.CM.isolate_node(member,allownodes1): return self.failure("Could not isolate the nodes") for member in partition2: if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 : self.CM.savecomm_node(node) if not self.CM.isolate_node(member,allownodes2): return self.failure("Could not isolate the nodes") if not watchdead.lookforall(): for member in self.CM.Env["nodes"]: self.CM.unisolate_node(member) self.CM.log("Patterns not found: " + repr(watchdead.unmatched)) return self.failure("Didn't find the log 'dead' message") dcnum=0 while dcnum < 2: dcnum = 0 for member in self.CM.Env["nodes"]: if self.CM.is_node_dc(member): dcnum += 1 time.sleep(1) ''' Unisolate the node, look for the return partition message and check whether they restart ''' watchpartitionpats = [self.CM["Pat:DC_IDLE"]] partitionpat = self.CM["Pat:Return_partition"] for member in self.CM.Env["nodes"]: thispat = (partitionpat % member) watchpartitionpats.append(thispat) watchpartition = CTS.LogWatcher(self.CM["LogFileName"], watchpartitionpats\ , timeout=self.CM["DeadTime"]+60) watchpartition.setwatch() for member in self.CM.Env["nodes"]: if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 : self.CM.restorecomm_node(node) self.CM.unisolate_node(member) if not watchpartition.lookforall(): self.CM.log("Patterns not found: " + repr(watchpartition.unmatched)) return self.failure("Didn't find return from partition messages") return self.success() def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [ "ERROR:.*Both machines own.*resources" , "ERROR:.*lost a lot of packets!" , "ERROR: Cannot rexmit pkt .*: seqno too low" , "ERROR: Irretrievably lost packet: node" ] #AllTestClasses.append(Split_brainTest2) #################################################################### class MemoryTest(CTSTest): #################################################################### '''Check to see if anyone is leaking memory''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Memory" # self.test = ElectionMemoryTest(cm) self.test = ResourceRecover(cm) self.startall = SimulStartLite(cm) self.before = {} self.after = {} def __call__(self, node): ps_command='''ps -eo ucomm,pid,pmem,tsiz,dsiz,rss,vsize | grep -e ccm -e ha_logd -e cib -e crmd -e lrmd -e tengine -e pengine''' memory_error = [ "", "", "", "Code", "Data", "Resident", "Total" ] ret = self.startall(None) if not ret: return self.failure("Test setup failed") time.sleep(10) for node in self.CM.Env["nodes"]: self.before[node] = {} rsh_pipe = self.CM.rsh.popen(node, ps_command) rsh_pipe.tochild.close() result = rsh_pipe.fromchild.readline() while result: tokens = result.split() self.before[node][tokens[1]] = result result = rsh_pipe.fromchild.readline() rsh_pipe.fromchild.close() self.lastrc = rsh_pipe.wait() # do something... if not self.test(node): return self.failure("Underlying test failed") time.sleep(10) for node in self.CM.Env["nodes"]: self.after[node] = {} rsh_pipe = self.CM.rsh.popen(node, ps_command) rsh_pipe.tochild.close() result = rsh_pipe.fromchild.readline() while result: tokens = result.split() self.after[node][tokens[1]] = result result = rsh_pipe.fromchild.readline() rsh_pipe.fromchild.close() self.lastrc = rsh_pipe.wait() failed_nodes = [] for node in self.CM.Env["nodes"]: failed = 0 for process in self.before[node]: messages = [] before_line = self.before[node][process] after_line = self.after[node][process] if not after_line: self.CM.log("%s %s[%s] exited during the test" %(node, before_tokens[0], before_tokens[1])) continue before_tokens = before_line.split() after_tokens = after_line.split() # 3 : Code size # 4 : Data size # 5 : Resident size # 6 : Total size for index in [ 3, 4, 6 ]: mem_before = int(before_tokens[index]) mem_after = int(after_tokens[index]) mem_diff = mem_after - mem_before mem_allow = mem_before * 0.01 # for now... mem_allow = 0 if mem_diff > mem_allow: failed = 1 messages.append("%s size grew by %dkB (%dkB)" %(memory_error[index], mem_diff, mem_after)) elif mem_diff < 0: messages.append("%s size shrank by %dkB (%dkB)" %(memory_error[index], mem_diff, mem_after)) if len(messages) > 0: self.CM.log("Process %s[%s] on %s: %s" %(before_tokens[0], before_tokens[1], node, repr(messages))) self.CM.debug("%s Before: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB" %(node, before_tokens[0], before_tokens[1], before_tokens[2], before_tokens[3], before_tokens[4], before_tokens[5], before_tokens[6])) self.CM.debug("%s After: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB" %(node, after_tokens[0], after_tokens[1], after_tokens[2], after_tokens[3], after_tokens[4], after_tokens[5], after_tokens[6])) if failed == 1: failed_nodes.append(node) if len(failed_nodes) > 0: return self.failure("Memory leaked on: " + repr(failed_nodes)) return self.success() def errorstoignore(self): '''Return list of errors which should be ignored''' return [ """ERROR: .* LRM operation.*monitor on .*: not running""", """pengine:.*Handling failed """] def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 #AllTestClasses.append(MemoryTest) #################################################################### class ElectionMemoryTest(CTSTest): #################################################################### '''Check to see if anyone is leaking memory''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Election" def __call__(self, node): self.rsh.readaline(node, self.CM["ElectionCmd"]%node) if self.CM.cluster_stable(): return self.success() return self.failure("Cluster not stable") def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): '''Never applicable, only for use by the memory test''' return 0 AllTestClasses.append(ElectionMemoryTest) #################################################################### class SpecialTest1(CTSTest): #################################################################### '''Set up a custom test to cause quorum failure issues for Andrew''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SpecialTest1" self.startall = SimulStartLite(cm) self.restart1 = RestartTest(cm) self.stopall = SimulStopLite(cm) def __call__(self, node): '''Perform the 'SpecialTest1' test for Andrew. ''' self.incr("calls") # Shut down all the nodes... ret = self.stopall(None) if not ret: return ret # Start the selected node ret = self.restart1(node) if not ret: return ret # Start all remaining nodes ret = self.startall(None) return ret def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): return 1 AllTestClasses.append(SpecialTest1) ################################################################### class NearQuorumPointTest(CTSTest): ################################################################### ''' This test brings larger clusters near the quorum point (50%). In addition, it will test doing starts and stops at the same time. Here is how I think it should work: - loop over the nodes and decide randomly which will be up and which will be down Use a 50% probability for each of up/down. - figure out what to do to get into that state from the current state - in parallel, bring up those going up and bring those going down. ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="NearQuorumPoint" def __call__(self, dummy): '''Perform the 'NearQuorumPoint' test. ''' self.incr("calls") startset = [] stopset = [] #decide what to do with each node for node in self.CM.Env["nodes"]: action = self.CM.Env.RandomGen.choice(["start","stop"]) #action = self.CM.Env.RandomGen.choice(["start","stop","no change"]) if action == "start" : startset.append(node) elif action == "stop" : stopset.append(node) self.CM.debug("start nodes:" + repr(startset)) self.CM.debug("stop nodes:" + repr(stopset)) #add search patterns watchpats = [ ] for node in stopset: if self.CM.ShouldBeStatus[node] == self.CM["up"]: watchpats.append(self.CM["Pat:We_stopped"] % node) for node in startset: if self.CM.ShouldBeStatus[node] == self.CM["down"]: - watchpats.append(self.CM["Pat:They_started"] % node) + #watchpats.append(self.CM["Pat:Slave_started"] % node) + watchpats.append(self.CM["Pat:Local_started"] % node) if len(watchpats) == 0: return self.skipped() if len(startset) != 0: watchpats.append(self.CM["Pat:DC_IDLE"]) watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() #begin actions for node in stopset: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.StopaCMnoBlock(node) for node in startset: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.StartaCMnoBlock(node) #get the result if watch.lookforall(): self.CM.cluster_stable() return self.success() self.CM.log("Warn: Patterns not found: " + repr(watch.unmatched)) #get the "bad" nodes upnodes = [] for node in stopset: if self.CM.StataCM(node) == 1: upnodes.append(node) downnodes = [] for node in startset: if self.CM.StataCM(node) == 0: downnodes.append(node) if upnodes == [] and downnodes == []: self.CM.cluster_stable() return self.success() if len(upnodes) > 0: self.CM.log("Warn: Unstoppable nodes: " + repr(upnodes)) if len(downnodes) > 0: self.CM.log("Warn: Unstartable nodes: " + repr(downnodes)) return self.failure() def errorstoignore(self): '''Return list of errors which should be ignored''' return [] def is_applicable(self): if self.CM["Name"] == "linux-ha-v2": return 1 return 0 AllTestClasses.append(NearQuorumPointTest) ################################################################### class BSC_AddResource(CTSTest): ################################################################### '''Add a resource to the cluster''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="AddResource" self.resource_offset = 0 self.cib_cmd="""@sbindir@/cibadmin -C -o %s -X '%s' """ def __call__(self, node): self.resource_offset = self.resource_offset + 1 r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset) start_pat = "crmd.*%s_start_0.*complete" patterns = [] patterns.append(start_pat % r_id) watch = CTS.LogWatcher( self.CM["LogFileName"], patterns, self.CM["DeadTime"]) watch.setwatch() fields = string.split(self.CM.Env["IPBase"], '.') fields[3] = str(int(fields[3])+1) ip = string.join(fields, '.') self.CM.Env["IPBase"] = ip if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip): return self.failure("Make resource %s failed" % r_id) failed = 0 watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.CM.log ("Warn: Pattern not found: %s" % (regex)) failed = 1 if failed: return self.failure("Resource pattern(s) not found") if not self.CM.cluster_stable(self.CM["DeadTime"]): return self.failure("Unstable cluster") return self.success() def make_ip_resource(self, node, id, rclass, type, ip): self.CM.log("Creating %s::%s:%s (%s) on %s" % (rclass,type,id,ip,node)) rsc_xml=""" """ % (id, rclass, type, id, id, ip) node_constraint=""" """ % (id, id, id, id, node) rc = 0 (rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("constraints", node_constraint)) if rc != 0: self.CM.log("Constraint creation failed: %d" % rc) return None (rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("resources", rsc_xml)) if rc != 0: self.CM.log("Resource creation failed: %d" % rc) return None return 1 def is_applicable(self): if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["DoBSC"]: return 1 return None def TestList(cm): result = [] for testclass in AllTestClasses: bound_test = testclass(cm) if bound_test.is_applicable(): result.append(bound_test) return result class SimulStopLite(CTSTest): ################################################################### '''Stop any active nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStopLite" def __call__(self, dummy): '''Perform the 'SimulStopLite' setup work. ''' self.incr("calls") self.CM.debug("Setup: " + self.name) # We ignore the "node" parameter... watchpats = [ ] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("WasStarted") watchpats.append(self.CM["Pat:All_stopped"] % node) if self.CM.Env["use_logd"]: watchpats.append(self.CM["Pat:Logd_stopped"] % node) if len(watchpats) == 0: self.CM.clear_all_caches() return self.skipped() # Stop all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.CM.StopaCMnoBlock(node) if watch.lookforall(): self.CM.clear_all_caches() return self.success() did_fail=0 up_nodes = [] for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 1: did_fail=1 up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: " + repr(up_nodes)) self.CM.log("Warn: All nodes stopped but CTS didnt detect: " + repr(watch.unmatched)) self.CM.clear_all_caches() return self.failure("Missing log message: "+repr(watch.unmatched)) def is_applicable(self): '''SimulStopLite is a setup test and never applicable''' return 0 ################################################################### class SimulStartLite(CTSTest): ################################################################### '''Start any stopped nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStartLite" def __call__(self, dummy): '''Perform the 'SimulStartList' setup work. ''' self.incr("calls") self.CM.debug("Setup: " + self.name) # We ignore the "node" parameter... watchpats = [ ] + uppat = self.CM["Pat:Slave_started"] + if self.CM.upcount() == 0: + uppat = self.CM["Pat:Local_started"] + for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") - watchpats.append(self.CM["Pat:They_started"] % node) + watchpats.append(uppat % node) if len(watchpats) == 0: return self.skipped() + + watchpats.append(self.CM["Pat:DC_IDLE"]) # Start all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.starttime=time.time() for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.CM.StartaCMnoBlock(node) if watch.lookforall(): for attempt in (1, 2, 3, 4, 5): if self.CM.cluster_stable(): return self.success() return self.failure("Cluster did not stabilize") did_fail=0 unstable = [] for node in self.CM.Env["nodes"]: if self.CM.StataCM(node) == 0: did_fail=1 unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: " + repr(unstable)) unstable = [] for node in self.CM.Env["nodes"]: if not self.CM.node_stable(node): did_fail=1 unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: " + repr(unstable)) self.CM.log("ERROR: All nodes started but CTS didnt detect: " + repr(watch.unmatched)) return self.failure() def is_applicable(self): '''SimulStartLite is a setup test and never applicable''' return 0 ################################################################### class LoggingTest(CTSTest): ################################################################### def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Logging" def __call__(self, dummy): '''Perform the 'Logging' test. ''' self.incr("calls") # Make sure logging is working and we have enough disk space... if not self.CM.TestLogging(): sys.exit(1) if not self.CM.CheckDf(): sys.exit(1) def is_applicable(self): '''ResourceRecover is applicable only when there are resources running on our cluster and environment is linux-ha-v2''' return self.CM.Env["DoBSC"] def errorstoignore(self): '''Return list of errors which should be ignored''' return [] #AllTestClasses.append(LoggingTest)