diff --git a/cts/CM_hb.py.in b/cts/CM_hb.py.in index 0cff75b759..7cd1762462 100755 --- a/cts/CM_hb.py.in +++ b/cts/CM_hb.py.in @@ -1,637 +1,638 @@ #!@PYTHON@ '''CTS: Cluster Testing System: heartbeat dependent modules... Classes related to testing high-availability clusters... Lots of things are implemented. Lots of things are not implemented. We have many more ideas of what to do than we've implemented. ''' __copyright__=''' Copyright (C) 2000,2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. from CTS import * class HeartbeatCM(ClusterManager): ''' The heartbeat cluster manager class. It implements the things we need to talk to and manipulate heartbeat clusters ''' def __init__(self, Environment, randseed=None): self.ResourceDirs = ["@sysconfdir@/ha.d/resource.d", "@sysconfdir@/rc.d/init.d", "@sysconfdir@/rc.d/"] self.ResourceFile = Environment["HAdir"] + "/haresources" self.ConfigFile = Environment["HAdir"]+ "/ha.cf" ClusterManager.__init__(self, Environment, randseed=randseed) self.update({ "Name" : "heartbeat", "DeadTime" : 30, "StartCmd" : "ulimit -c unlimited; MALLOC_CHECK_=2; export MALLOC_CHECK_; @libdir@/heartbeat/heartbeat -d 2>/dev/null", "StopCmd" : "@libdir@/heartbeat/heartbeat -k", "StatusCmd" : "@libdir@/heartbeat/heartbeat -s", "RereadCmd" : "@libdir@/heartbeat/heartbeat -r", "StartDRBDCmd" : "@sysconfdir@/init.d/drbd start >/dev/null 2>&1", "StopDRBDCmd" : "@sysconfdir@/init.d/drbd stop", "StatusDRBDCmd" : "@sysconfdir@/init.d/drbd status", "DRBDCheckconf" : "@sysconfdir@/init.d/drbd checkconfig >/tmp/drbdconf 2>&1", "BreakCommCmd" : "@libdir@/heartbeat/TestHeartbeatComm break-communication >/dev/null 2>&1", "FixCommCmd" : "@libdir@/heartbeat/TestHeartbeatComm fix-communication >/dev/null 2>&1", + "DelFileCommCmd" : "/usr/lib/heartbeat/TestHeartbeatComm delete-testingfile >/dev/null 2>&1", "IPaddrCmd" : "@sysconfdir@/ha.d/resource.d/IPaddr %s status", "Standby" : "@libdir@/heartbeat/hb_standby >/dev/null 2>&1", "TestConfigDir" : "@sysconfdir@/ha.d/testconfigs", "LogFileName" : Environment["LogFileName"], # Patterns to look for in the log files for various occasions... "Pat:We_started" : " (%s) .* Initial resource acquisition complete", "Pat:They_started" : " (%s) .* Initial resource acquisition complete", "Pat:We_stopped" : "Heartbeat shutdown complete", "Pat:They_stopped" : "node (%s).*: is dead", "Pat:All_stopped" : " (%s).*heartbeat.*Heartbeat shutdown complete", "Pat:StandbyOK" : "Standby resource acquisition done", "Pat:StandbyNONE" : "No reply to standby request", "Pat:StandbyTRANSIENT" : "standby message.*ignored.*in flux", "Pat:Return_partition" : "Cluster node %s returning after partition", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"Shutting down\.", r"Forcing shutdown\.", r"Both machines own .* resources!", r"No one owns .* resources!", r", exiting\.", r"ERROR:", r"CRIT.*:", ), }) self.cf=HBConfig(Environment["HAdir"]) self._finalConditions() def SetClusterConfig(self, configpath="default", nodelist=None): '''Activate the named test configuration throughout the cluster. This code is specialized to heartbeat. ''' rc=1 Command=''' cd %s%s%s; : cd to test configuration directory for j in * do if [ -f "@sysconfdir@/ha.d/$j" ]; then if cmp $j @sysconfdir@/ha.d/$j >/dev/null 2>&1; then : Config file $j is already up to correct. else echo "Touching $j" cp $j @sysconfdir@/ha.d/$j fi fi done ''' % (self["TestConfigDir"], os.sep, configpath) if nodelist == None: nodelist=self.Env["nodes"] for node in nodelist: if not self.rsh(node, Command): rc=None self.rereadall() return rc def ResourceGroups(self): ''' Return the list of resources groups defined in this configuration. This code is specialized to heartbeat. We make the assumption that the resource file on the local machine is the same as that of a cluster member. We aren't necessarily a member of the cluster (In fact, we usually aren't). ''' RscGroups=[] file = open(self.ResourceFile, "r") while (1): line = file.readline() if line == "": break idx=string.find(line, '#') if idx >= 0: line=line[:idx] if line == "": continue line = string.strip(line) # Is this wrong? tokens = re.split("[ \t]+", line) # Ignore the default server for this resource group del tokens[0] Group=[] for token in tokens: if token != "": idx=string.find(token, "::") if idx > 0: tuple=string.split(token, "::") else: # # Is this an IPaddr default resource type? # if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$" , token): tuple=["IPaddr", token] else: tuple = [token, None] Resource = self.hbResource(tuple[0], tuple[1]) Group.append(Resource) RscGroups.append(Group) file.close() return RscGroups def InternalCommConfig(self): ''' Return a list of communication paths. Each path consists of a tuple like this: mediatype serial | ip interface/dev name eth0 | /dev/ttyS0... protocol tcp?? | udp | None port Number | None ''' Path = {"mediatype" : None, "interface": None, "protocol" : None, "port": None} cf =HBConfig() for cfp in cf.Parameters: if cfp == "serial": if Path["mediatype"] == None: Path["mediatype"] = ["serial"] else: Path["mediatype"].append("serial") if Path["interface"] == None: Path["interface"] = cf.Parameters["serial"] else: for serial in cf.Parameters["serial"]: Path["interface"].append(serial) if cfp == "bcast" or cfp == "mcast" or cfp == "ucast" : if Path["mediatype"] == None: Path["mediatype"] = ["ip"] else: Path["mediatype"].append("ip") if cfp == "bcast": interfaces = cf.Parameters[cfp] if cfp == "ucast": interfaces = [cf.Parameters[cfp][0]] if cfp == "mcast": Path["port"] = [cf.Parameters[cfp][0][2]] Path["protocol"] = "udp" interfaces = [cf.Parameters[cfp][0][0]] if Path["interface"] == None: Path["interface"] = interfaces else: for interface in interfaces: if interface not in Path["interface"]: Path["interface"].append(interface) if cfp == "udpport": Path["port"] = cf.Parameters["udpport"] Path["protocol"] = ["udp"] return Path def HasQuorum(self): ( '''Return TRUE if the cluster currently has quorum. According to current heartbeat code this means one node is up. ''') return self.upcount() >= 1 def hbResource(self, type, instance): ''' Our job is to create the right kind of resource. For most resources, we just create an HBResource object, but for IP addresses, we create an HBipResource instead. Some other types of resources may also be added as special cases. ''' if type == "IPaddr": return HBipResource(self, type, instance) return HBResource(self, type, instance) class HBResource(Resource): def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. We call the status operation for the resource script. ''' return self._ResourceOperation("status", "OK|running", nodename) def _ResourceOperation(self, operation, pattern, nodename): ''' We call the requested operation for the resource script. We don't care what kind of operation we were called to do particularly. When we were created, we were bound to a cluster manager, which has its own remote execution method (which we use here). ''' if self.Instance == None: instance = "" else: instance = self.Instance Rlist = 'LIST="' for dir in self.CM.ResourceDirs: Rlist = Rlist + " " + dir Rlist = Rlist + '"; ' Script= Rlist + ''' T="''' + self.ResourceType + '''"; I="''' + instance + '''"; for dir in $LIST; do if [ -f "$dir/$T" -a -x "$dir/$T" ] then "$dir/$T" $I ''' + operation + ''' exit $? fi done 2>&1; exit 1;''' #print "Running " + Script + "\n" line = self.CM.rsh.readaline(nodename, Script) if operation == "status": if re.search(pattern, line): return 1 return self.CM.rsh.lastrc == 0 def Start(self, nodename): ''' This member function starts or activates the resource. ''' return self._ResourceOperation("start", None, nodename) def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' return self._ResourceOperation("stop", None, nodename) # def IsWorkingCorrectly(self, nodename): # "We default to returning TRUE for this one..." # if self.Instance == None: # self.CM.log("Faking out: " + self.ResourceType) # else: # self.CM.log("Faking out: " + self.ResourceType + self.Instance) # return 1 def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", "OK", nodename) class HBipResource(HBResource): ''' We are a specialized IP address resource which knows how to test to see if our resource type is actually being served. We are cheat and run the IPaddr resource script on the current machine, because it's a more interesting case. ''' def IsWorkingCorrectly(self, nodename): return self._ResourceOperation("monitor", "OK", self.CM.OurNode) # # A heartbeat configuration class... # It reads and parses the heartbeat config # files # class HBConfig: # Which options have multiple words on the line? MultiTokenKeywords = {"mcast" : None , "stonith_host": None} def __init__(self, configdir="/etc/ha.d"): self.Parameters = {} self.ResourceGroups = {} self._ReadConfig(os.path.join(configdir, "ha.cf")) FirstUp_NodeSelection() LastUp_NodeSelection() no_failback = NoAutoFailbackPolicy() auto_failback = AutoFailbackPolicy() # # We allow each resource group to have its own failover/back # policies # if self.Parameters.has_key("nice_failback") \ and self.Parameters["nice_failback"] == "on": HBConfig.DefaultFailbackPolicy = no_failback elif self.Parameters.has_key("auto_failback") \ and self.Parameters["auto_failback"] == "off": HBConfig.DefaultFailbackPolicy = no_failback else: HBConfig.DefaultFailbackPolicy = auto_failback HBConfig.DefaultNodeSelectionPolicy = NodeSelectionPolicies["FirstUp"] self._ReadResourceGroups(os.path.join(configdir, "haresources")) # Read ha.cf config file def _ReadConfig(self, ConfigFile): self.ConfigPath = ConfigFile; fp = open(ConfigFile) while 1: line=fp.readline() if not line: return line = re.sub("#.*", "", line) line = string.rstrip(line) if len(line) < 1: continue tokens = line.split() key = tokens[0] values = tokens[1:] if HBConfig.MultiTokenKeywords.has_key(key): # group items from this line together, and separate # from the items on other lines values = [values] if self.Parameters.has_key(key): self.Parameters[key].append(values[0]) else: self.Parameters[key] = values # Read a line from the haresources file... # - allow for \ continuations... def _GetRscLine(self, fp): linesofar = None continuation=1 while continuation: continuation = 0 line=fp.readline() if not line: break line = re.sub("#.*", "", line) if line[len(line)-2] == "\\": line = line[0:len(line)-2] + "\n" continuation=1 if linesofar == None: linesofar = line else: linesofar = linesofar + line return linesofar def _ReadResourceGroups(self, RscFile): self.RscPath = RscFile; fp = open(RscFile) thisline = "" while 1: line=self._GetRscLine(fp) if not line: return line = line.strip() if len(line) < 1: continue tokens = line.split() node = tokens[0] resources = tokens[1:] rscargs=[] for resource in resources: name=resource.split("::", 1) if len(name) > 1: args=name[1].split("::") else: args=None name = name[0] rscargs.append(Resource(name, args)) name = tokens[0] + "__" + tokens[1] assert not self.ResourceGroups.has_key(name) # # Create the resource group # self.ResourceGroups[name] = ResourceGroup(name \ , rscargs , node.split(",") # Provide default value , HBConfig.DefaultNodeSelectionPolicy , HBConfig.DefaultFailbackPolicy) # # Return the list of nodes in the cluster... # def nodes(self): result = self.Parameters["node"] result.sort() return result class ClusterState: pass class ResourceGroup: def __init__(self, name, resourcelist, possiblenodes , nodeselection_policy, failback_policy): self.name = name self.resourcelist = resourcelist self.possiblenodes = possiblenodes self.nodeselection_policy = nodeselection_policy self.failback_policy = failback_policy self.state = None self.attributes = {} self.history = [] def __str__(self): result = string.join(self.possiblenodes, ",") for rsc in self.resourcelist: result = result + " " + str(rsc) return result class Resource: def __init__(self, name, arguments=None): self.name = name self.arguments = arguments def __str__(self): result = self.name try: for arg in self.arguments: result = result + "::" + arg except TypeError: pass return result ####################################################################### # # Base class defining policies for where we put resources # when we're starting, or when a failure has occurred... # ####################################################################### NodeSelectionPolicies = {} class NodeSelectionPolicy: def __init__(self, name): self.name = name NodeSelectionPolicies[name] = self def name(self): return self.name # # nodenames: the list of nodes eligible to run this resource # ResourceGroup: the group to be started... # ClusterState: Cluster state information # def SelectNode(self, nodenames, ResourceGroup, ClusterState): return None # # Choose the first node in the list... # class FirstUp_NodeSelection(NodeSelectionPolicy): def __init__(self): NodeSelectionPolicy.__init__(self, "FirstUp") def SelectNode(self, nodenames, ResourceGroup, ClusterState): return nodenames[0] # # Choose the last node in the list... # (kind of a dumb example) # class LastUp_NodeSelection(NodeSelectionPolicy): def __init__(self): NodeSelectionPolicy.__init__(self, "LastUp") def SelectNode(self, nodenames, ResourceGroup, ClusterState): return nodenames[len(nodenames)-1] ####################################################################### # # Failback policies... # # Where to locate a resource group when an eligible node rejoins # the cluster... # ####################################################################### FailbackPolicies = {} class FailbackPolicy: def __init__(self, name): self.name = name FailbackPolicies[name] = self def name(self): return self.name # # currentnode: The node the service is currently on # returningnode: The node which just rejoined # eligiblenodes: Permitted nodes which are up # SelectionPolicy: the normal NodeSelectionPolicy # Cluster state information... # def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup, ClusterState): return None # # This FailbackPolicy is like "normal failback" in heartbeat # class AutoFailbackPolicy(FailbackPolicy): def __init__(self): FailbackPolicy.__init__(self, "failback") def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup, ClusterState): # Select where it should run based on current normal policy # just as though we were starting it for the first time. return SelectionPolicy(eligiblenodes, ResourceGroup, ClusterState) # # This FailbackPolicy is like "nice failback" in heartbeat # class NoAutoFailbackPolicy(FailbackPolicy): def __init__(self): FailbackPolicy.__init__(self, "failuresonly") def SelectNewNode(self, currentnode, returningnode, eligiblenodes , SelectionPolicy, ResourceGroup): # Always leave the resource where it is... return currentnode ####################################################################### # # A little test code... # # Which you are advised to completely ignore... # ####################################################################### if __name__ == '__main__': FirstUp_NodeSelection() LastUp_NodeSelection() no_failback = NoAutoFailbackPolicy() auto_failback = AutoFailbackPolicy() cf=HBConfig("/etc/ha.d") print "Cluster configuration:\n" print "Nodes:", cf.nodes(), "\n" print "Config Parameters:", cf.Parameters, "\n" for groupname in cf.ResourceGroups.keys(): print "Resource Group %s:\n\t%s\n" % (groupname, cf.ResourceGroups[groupname])