Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4624035
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
223 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/cts/CIB.py.in b/cts/CIB.py.in
index 75b5918f75..7132c14e5b 100644
--- a/cts/CIB.py.in
+++ b/cts/CIB.py.in
@@ -1,247 +1,247 @@
#!@PYTHON@
'''CTS: Cluster Testing System: CIB generator
'''
__copyright__='''
Author: Jia Ming Pan <jmltc@cn.ibm.com>
Copyright (C) 2006 International Business Machines
'''
from UserDict import UserDict
import sys, time, types, syslog, os, struct, string, signal, traceback
from CTS import ClusterManager
from CM_hb import HeartbeatCM
class CIB:
cib_option_template = '''
<cluster_property_set id="cib-bootstrap-options">
<attributes>
<nvpair id="cib-bootstrap-options-default-action-timeout" name="default-action-timeout" value="10s"/>
<nvpair id="cib-bootstrap-options-cluster-delay" name="cluster-delay" value="60s"/>
<nvpair id="cib-bootstrap-options-symmetric-cluster" name="symmetric-cluster" value="true"/>
<nvpair id="cib-bootstrap-options-stop-orphan-resources" name="stop-orphan-resources" value="true"/>
<nvpair id="cib-bootstrap-options-stop-orphan-actions" name="stop-orphan-actions" value="true"/>
<nvpair id="cib-bootstrap-options-remove-after-stop" name="remove-after-stop" value="false"/>
<nvpair id="cib-bootstrap-options-is-managed-default" name="is-managed-default" value="true"/>
<nvpair id="cib-bootstrap-options-no-quorum-policy" name="no-quorum-policy" value="stop"/>
<nvpair id="cib-bootstrap-options-stonith-action" name="stonith-action" value="reboot"/>
<nvpair id="cib-bootstrap-options-stonith-enabled" name="stonith-enabled" value="%d"/>
<nvpair id="cib-bootstrap-options-pe-error-series-max" name="pe-error-series-max" value="-1"/>
<nvpair id="cib-bootstrap-options-pe-warn-series-max" name="pe-warn-series-max" value="-1"/>
<nvpair id="cib-bootstrap-options-pe-input-series-max" name="pe-input-series-max" value="-1"/>
<nvpair id="cib-bootstrap-options-default-resource-stickiness" name="default-resource-stickiness" value="0"/>
<nvpair id="cib-bootstrap-options-default-resource-failure-stickiness" name="default-resource-failure-stickiness" value="0"/>
<nvpair id="cib-bootstrap-options-shutdown-escalation" name="shutdown-escalation" value="5min"/>
- <!-- *** For CTS testing only *** _NEVER_ make this the default -->
- <nvpair id="cib-bootstrap-optionsstartup-fencing" name="startup-fencing" value="false"/>
+ <!-- *** For CTS testing only *** _NEVER_ make this the default -->
+ <nvpair id="cib-bootstrap-optionsstartup-fencing" name="startup-fencing" value="false"/>
</attributes>
</cluster_property_set>'''
ipaddr_template = '''
<primitive id="%s" class="ocf" type="IPaddr" provider="heartbeat">
<operations>
<op id="%s-mon" name="monitor" interval="5s"/>
</operations>
<instance_attributes id="%s">
<attributes>
<nvpair id="%s-ip" name="ip" value="%s"/>
</attributes>
</instance_attributes>
</primitive> '''
hb_ipaddr_template = '''
<primitive id="%s" class="heartbeat" type="IPaddr">
<operations>
<op id="%s-mon" name="monitor" interval="5s"/>
</operations>
<instance_attributes id="%s">
<attributes>
<nvpair id="%s-ip" name="1" value="%s"/>
</attributes>
</instance_attributes>
</primitive> '''
lsb_resource = '''
<primitive id="lsb_dummy" class="lsb" type="@libdir@/heartbeat/cts/LSBDummy" provider="heartbeat">
<operations>
<op id="ocf_lsb_monitor" name="monitor" interval="5s"/>
</operations>
</primitive> '''
dummy_resource_template = '''
<primitive id="%s" class="ocf" type="Dummy" provider="heartbeat">
<operations>
<op id="%s-mon" name="monitor" interval="10s"/>
</operations>
<instance_attributes id="%s">
<attributes>
<nvpair id="%s-migrate" name="allow_migrate" value="1"/>
</attributes>
</instance_attributes>
</primitive> '''
clustermon_resource_template = '''
<primitive id="cluster_mon" class="ocf" type="ClusterMon" provider="heartbeat">
<operations>
<op id="cluster_mon-1" name="monitor" interval="5s" prereq="nothing"/>
<op id="cluster_mon-2" name="start" prereq="nothing"/>
</operations>
<instance_attributes id="ClusterMon">
<attributes>
<nvpair id="cluster_mon-1" name="htmlfile" value="/suse/abeekhof/Export/cluster.html"/>
<nvpair id="cluster_mon-2" name="update" value="10"/>
<nvpair id="cluster_mon-3" name="extra_options" value="-n -r"/>
<nvpair id="cluster_mon-4" name="user" value="abeekhof"/>
</attributes>
</instance_attributes>
</primitive> '''
clustermon_location_constraint = '''
<rsc_location id="run_cluster_mon" rsc="cluster_mon">
<rule id="cant_run_cluster_mon" score="-INFINITY" boolean_op="and">
<expression id="mon_expr" attribute="#is_dc" operation="eq" value="false"/>
</rule>
</rsc_location> '''
master_slave_resource = '''
<master_slave id="master_rsc_1">
<instance_attributes id="master_rsc">
<attributes>
<nvpair id="clone_max_1" name="clone_max" value="%d"/>
<nvpair id="clone_node_max_2" name="clone_node_max" value="%d"/>
<nvpair id="master_max_3" name="master_max" value="%d"/>
<nvpair id="master_node_max_4" name="master_node_max" value="%d"/>
</attributes>
</instance_attributes>
<primitive id="ocf_msdummy" class="ocf" type="Stateful" provider="heartbeat">
<operations>
<op id="ocf_msdummy_monitor" name="monitor" interval="5s"/>
<op id="ocf_msdummy_monitor_master" name="monitor" interval="6s" role="Master"/>
</operations>
</primitive>
</master_slave>'''
resource_group_template = '''<group id="group-1">%s %s %s</group>'''
per_node_constraint_template = '''
<rsc_location id="run_%s" rsc="%s">
<rule id="pref_run_%s" score="100" boolean_op="and">
<expression id="%s_loc_expr" attribute="#uname" operation="eq" value="%s"/>
</rule>
</rsc_location> '''
stonith_resource_template = """
<clone id="DoFencing" globally_unique="false">
<instance_attributes id="fencing">
<attributes>
<nvpair id="DoFencing-1" name="clone_node_max" value="1"/>
</attributes>
</instance_attributes>
<primitive id="child_DoFencing" class="stonith" type="%s">
<operations>
<op id="DoFencing-1" name="monitor" interval="60s" prereq="nothing" timeout="300s"/>
<op id="DoFencing-2" name="start" prereq="nothing"/>
</operations>
<instance_attributes id="fencing-child">
<attributes>
<nvpair id="child_DoFencing-1" name="%s" value="%s"/>
</attributes>
</instance_attributes>
</primitive>
</clone>"""
cib_template ='''
<cib cib_feature_revision="1" have_quorum="false" ignore_dtd="false">
<configuration>
<crm_config> %s
</crm_config>
<nodes/>
<resources> %s
</resources>
<constraints> %s
</constraints>
</configuration>
<status/>
</cib> '''
def NextIP(self):
fields = string.split(self.CM.Env["IPBase"], '.')
fields[3] = str(int(fields[3])+1)
ip = string.join(fields, '.')
self.CM.Env["IPBase"]=ip
return ip
def __init__(self, CM):
self.CM = CM
#make up crm config
cib_options = self.cib_option_template % CM.Env["DoFencing"]
#create resources and their constraints
resources = ""
constraints = ""
if self.CM.Env["DoBSC"] == 1:
- cib_options = cib_options + '''
+ cib_options = cib_options + '''
<cluster_property_set id="bsc-options">
<attributes>
- <nvpair id="bsc-options-ident-string" name="ident-string" value="Linux-HA TEST configuration file - REMOVEME!!"/>
+ <nvpair id="bsc-options-ident-string" name="ident-string" value="Linux-HA TEST configuration file - REMOVEME!!"/>
</attributes>
</cluster_property_set>'''
if self.CM.Env["CIBResource"] != 1:
- # generate cib
- self.cts_cib = self.cib_template % (cib_options, resources, constraints)
- return
-
- if self.CM.cluster_monitor == 1:
- resources += self.clustermon_resource_template
- constraints += self.clustermon_location_constraint
-
- ip1=self.NextIP()
- ip2=self.NextIP()
- ip3=self.NextIP()
+ # generate cib
+ self.cts_cib = self.cib_template % (cib_options, resources, constraints)
+ return
+
+ if self.CM.cluster_monitor == 1:
+ resources += self.clustermon_resource_template
+ constraints += self.clustermon_location_constraint
+
+ ip1=self.NextIP()
+ ip2=self.NextIP()
+ ip3=self.NextIP()
ip1_rsc = self.ipaddr_template % (ip1, ip1, ip1, ip1, ip1)
ip2_rsc = self.hb_ipaddr_template % (ip2, ip2, ip2, ip2, ip2)
ip3_rsc = self.ipaddr_template % (ip3, ip3, ip3, ip3, ip3)
- resources += self.resource_group_template % (ip1_rsc, ip2_rsc, ip3_rsc)
+ resources += self.resource_group_template % (ip1_rsc, ip2_rsc, ip3_rsc)
- # lsb resource
- resources += self.lsb_resource
+ # lsb resource
+ resources += self.lsb_resource
# Mirgator
resources += self.dummy_resource_template % \
("migrator", "migrator", "migrator", "migrator")
- # per node resource
- fields = string.split(self.CM.Env["IPBase"], '.')
- for node in self.CM.Env["nodes"]:
- ip = self.NextIP()
- per_node_resources = self.ipaddr_template % \
- ("rsc_"+node, "rsc_"+node, "rsc_"+node, "rsc_"+node, ip)
-
- per_node_constraint = self.per_node_constraint_template % \
- ("rsc_"+node, "rsc_"+node, "rsc_"+node, "rsc_"+node, node)
-
- resources += per_node_resources
- constraints += per_node_constraint
+ # per node resource
+ fields = string.split(self.CM.Env["IPBase"], '.')
+ for node in self.CM.Env["nodes"]:
+ ip = self.NextIP()
+ per_node_resources = self.ipaddr_template % \
+ ("rsc_"+node, "rsc_"+node, "rsc_"+node, "rsc_"+node, ip)
+
+ per_node_constraint = self.per_node_constraint_template % \
+ ("rsc_"+node, "rsc_"+node, "rsc_"+node, "rsc_"+node, node)
+
+ resources += per_node_resources
+ constraints += per_node_constraint
- # fencing resource
- nodelist = ""
- len = 0
- for node in self.CM.Env["nodes"]:
- nodelist += node + " "
- len = len + 1
+ # fencing resource
+ nodelist = ""
+ len = 0
+ for node in self.CM.Env["nodes"]:
+ nodelist += node + " "
+ len = len + 1
stonith_resource = self.stonith_resource_template % \
(self.CM.Env["reset"].stonithtype, self.CM.Env["reset"].configName, self.CM.Env["reset"].configValue)
- resources += stonith_resource
-
- #master slave resource
- resources += self.master_slave_resource % (2*len, 2, len, 1)
+ resources += stonith_resource
+
+ #master slave resource
+ resources += self.master_slave_resource % (2*len, 2, len, 1)
- # generate cib
+ # generate cib
self.cts_cib = self.cib_template % (cib_options, resources, constraints)
def cib(self):
return self.cts_cib
diff --git a/cts/CM_LinuxHAv2.py.in b/cts/CM_LinuxHAv2.py.in
index 40cd92bab0..f934f9e7b9 100755
--- a/cts/CM_LinuxHAv2.py.in
+++ b/cts/CM_LinuxHAv2.py.in
@@ -1,599 +1,599 @@
#!@PYTHON@
'''CTS: Cluster Testing System: LinuxHA v2 dependent modules...
'''
__copyright__='''
Author: Huang Zhen <zhenhltc@cn.ibm.com>
Copyright (C) 2004 International Business Machines
Additional Audits, Revised Start action, Default Configuration:
Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import os,sys,CTS,CTSaudits,CTStests, warnings
from CTS import *
from CM_hb import HeartbeatCM
from CTSaudits import ClusterAudit
from CTStests import *
from CIB import *
try:
from xml.dom.minidom import *
except ImportError:
sys.__stdout__.write("Python module xml.dom.minidom not found\n")
sys.__stdout__.write("Please install python-xml or similar before continuing\n")
sys.__stdout__.flush()
sys.exit(1)
#######################################################################
#
# LinuxHA v2 dependent modules
#
#######################################################################
class LinuxHAv2(HeartbeatCM):
'''
The linux-ha version 2 cluster manager class.
It implements the things we need to talk to and manipulate
linux-ha version 2 clusters
'''
def __init__(self, Environment, randseed=None):
HeartbeatCM.__init__(self, Environment, randseed=randseed)
- self.clear_cache = 0
+ self.clear_cache = 0
self.cib_installed = 0
self.config = None
self.cluster_monitor = 0
self.use_short_names = 1
self.update({
"Name" : "linux-ha-v2",
"DeadTime" : 300,
"StartTime" : 300, # Max time to start up
"StableTime" : 30,
"StartCmd" : "@INITDIR@/heartbeat@INIT_EXT@ start > /dev/null 2>&1",
"StopCmd" : "@INITDIR@/heartbeat@INIT_EXT@ stop > /dev/null 2>&1",
"ElectionCmd" : "@sbindir@/crmadmin -E %s",
"StatusCmd" : "@sbindir@/crmadmin -S %s 2>/dev/null",
"EpocheCmd" : "@sbindir@/ccm_tool -e",
"QuorumCmd" : "@sbindir@/ccm_tool -q",
"CibQuery" : "@sbindir@/cibadmin -Ql",
"ParitionCmd" : "@sbindir@/ccm_tool -p",
"IsRscRunning" : "@libdir@/heartbeat/lrmadmin -E %s monitor 0 0 EVERYTIME 2>/dev/null|grep return",
"ExecuteRscOp" : "@libdir@/heartbeat/lrmadmin -n %s -E %s %s 0 %d EVERYTIME 2>/dev/null",
"CIBfile" : "%s:@HA_VARLIBDIR@/heartbeat/crm/cib.xml",
"TmpDir" : "/tmp",
"BreakCommCmd2" : "@libdir@/heartbeat/TestHeartbeatComm break-communication %s>/dev/null 2>&1",
"IsIPAddrRscRunning" : "",
"StandbyCmd" : "@sbindir@/crm_standby -U %s -v %s 2>/dev/null",
"UUIDQueryCmd" : "@sbindir@/crmadmin -N",
"StandbyQueryCmd" : "@sbindir@/crm_standby -GQ -U %s 2>/dev/null",
# Patterns to look for in the log files for various occasions...
"Pat:DC_IDLE" : "crmd.*State transition.*-> S_IDLE",
# This wont work if we have multiple partitions
# Use: "Pat:They_started" : "%s crmd:.*State transition.*-> S_NOT_DC",
"Pat:They_started" : "Updating node state to member for %s",
"Pat:We_started" : "%s crmd:.* State transition.*-> S_IDLE",
"Pat:We_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete",
"Pat:Logd_stopped" : "%s logd:.*Exiting write process",
"Pat:They_stopped" : "%s crmd:.*LOST:.* %s ",
"Pat:All_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete",
"Pat:They_dead" : "node %s.*: is dead",
"Pat:TransitionComplete" : "Transition status: Complete: complete",
# Bad news Regexes. Should never occur.
"BadRegexes" : (
r"ERROR:",
r"CRIT:",
r"Shutting down\.",
r"Forcing shutdown\.",
r"Timer I_TERMINATE just popped",
r"input=I_ERROR",
r"input=I_FAIL",
r"input=I_INTEGRATED cause=C_TIMER_POPPED",
r"input=I_FINALIZED cause=C_TIMER_POPPED",
r"input=I_ERROR",
r", exiting\.",
r"WARN.*Ignoring HA message.*vote.*not in our membership list",
r"pengine.*Attempting recovery of resource",
r"tengine.*is taking more than 2x its timeout",
r"Confirm not received from",
r"Welcome reply not received from",
r"Attempting to schedule .* after a stop",
r"Resource .* was active at shutdown",
r"duplicate entries for call_id",
r"Search terminated:",
r"No need to invoke the TE",
r":global_timer_callback",
r"Updating failcount for ",
r"Faking parameter digest creation",
r"Parameters to .* action changed:",
r"apply_xml_diff: Diff application failed!",
),
})
del self["Standby"]
if self.Env["DoBSC"]:
del self["Pat:They_stopped"]
del self["Pat:Logd_stopped"]
self.Env["use_logd"] = 0
self.check_transitions = 0
self.check_elections = 0
self.CIBsync = {}
self.default_cts_cib=CIB(self).cib()
self.debug(self.default_cts_cib)
def errorstoignore(self):
# At some point implement a more elegant solution that
# also produces a report at the end
'''Return list of errors which are known and very noisey should be ignored'''
if 1:
return [
"crmadmin:",
"ERROR: Message hist queue is filling up"
]
return []
def install_config(self, node):
if not self.ns.WaitForNodeToComeUp(node):
self.log("Node %s is not up." % node)
return None
if not self.CIBsync.has_key(node) and self.Env["ClobberCIB"] == 1:
self.CIBsync[node] = 1
self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml")
self.rsh.remote_py(node, "os", "system", "rm -f @HA_VARLIBDIR@/heartbeat/crm/cib.xml.sig")
# Only install the CIB on the first node, all the other ones will pick it up from there
if self.cib_installed == 1:
return None
self.cib_installed = 1
if self.Env["CIBfilename"] == None:
self.debug("Installing Generated CIB on node %s" %(node))
warnings.filterwarnings("ignore")
cib_file=os.tmpnam()
warnings.resetwarnings()
os.system("rm -f "+cib_file)
self.debug("Creating new CIB for " + node + " in: " + cib_file)
os.system("echo \'" + self.default_cts_cib + "\' > " + cib_file)
if 0!=self.rsh.echo_cp(None, cib_file, node, "@HA_VARLIBDIR@/heartbeat/crm/cib.xml"):
raise ValueError("Can not create CIB on %s "%node)
os.system("rm -f "+cib_file)
else:
self.debug("Installing CIB (%s) on node %s" %(self.Env["CIBfilename"], node))
if 0!=self.rsh.cp(self.Env["CIBfilename"], "root@" + (self["CIBfile"]%node)):
raise ValueError("Can not scp file to %s "%node)
self.rsh.remote_py(node, "os", "system", "chown @HA_CCMUSER@ @HA_VARLIBDIR@/heartbeat/crm/cib.xml")
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
for node in self.Env["nodes"]:
self.ShouldBeStatus[node] = ""
self.StataCM(node)
def test_node_CM(self, node):
'''Report the status of the cluster manager on a given node'''
watchpats = [ ]
watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)")
watchpats.append(self["Pat:They_started"]%node)
idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats)
idle_watch.setwatch()
out=self.rsh.readaline(node, self["StatusCmd"]%node)
ret= (string.find(out, 'ok') != -1)
self.debug("Node %s status: %s" %(node, out))
if not ret:
if self.ShouldBeStatus[node] == self["up"]:
self.log(
"Node status for %s is %s but we think it should be %s"
%(node, self["down"], self.ShouldBeStatus[node]))
self.ShouldBeStatus[node]=self["down"]
return 0
if self.ShouldBeStatus[node] == self["down"]:
self.log(
"Node status for %s is %s but we think it should be %s: %s"
%(node, self["up"], self.ShouldBeStatus[node], out))
self.ShouldBeStatus[node]=self["up"]
# check the output first - because syslog-ng looses messages
if string.find(out, 'S_NOT_DC') != -1:
# Up and stable
return 2
if string.find(out, 'S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" %(node, out))
return 1
# Up and stable
return 2
# Is the node up or is the node down
def StataCM(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_CM(node) > 0:
return 1
return None
# Being up and being stable is not the same question...
def node_stable(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_CM(node) == 2:
return 1
self.log("Warn: Node %s not stable" %(node))
return None
def cluster_stable(self, timeout=None):
watchpats = [ ]
watchpats.append("Current ping state: S_IDLE")
watchpats.append(self["Pat:DC_IDLE"])
if timeout == None:
timeout = self["DeadTime"]
idle_watch = CTS.LogWatcher(self["LogFileName"], watchpats, timeout)
idle_watch.setwatch()
any_up = 0
for node in self.Env["nodes"]:
# have each node dump its current state
if self.ShouldBeStatus[node] == self["up"]:
self.rsh.readaline(node, (self["StatusCmd"] %node) )
any_up = 1
if any_up == 0 or idle_watch.look():
return 1
self.log("Warn: Cluster Master not IDLE")
return None
def is_node_dc(self, node, status_line=None):
rc = 0
if not status_line:
status_line = self.rsh.readaline(node, self["StatusCmd"]%node)
if not status_line:
rc = 0
elif string.find(status_line, 'S_IDLE') != -1:
rc = 1
elif string.find(status_line, 'S_INTEGRATION') != -1:
rc = 1
elif string.find(status_line, 'S_FINALIZE_JOIN') != -1:
rc = 1
elif string.find(status_line, 'S_POLICY_ENGINE') != -1:
rc = 1
elif string.find(status_line, 'S_TRANSITION_ENGINE') != -1:
rc = 1
if rc == 1:
self.debug("%s _is_ the DC" % node)
return rc
def active_resources(self, node):
(rc, output) = self.rsh.remote_py(
node, "os", "system", """@sbindir@/crm_mon -1 | grep "Started %s" """ % node)
resources = []
for line in output:
fields = line.split()
resources.append(fields[0])
return resources
def ResourceOp(self, resource, op, node, interval=0, app="lrmadmin"):
'''
Execute an operation on a resource
'''
self.rsh.readaline(node, self["ExecuteRscOp"]
% (app, resource, op, interval))
return self.rsh.lastrc
def ResourceLocation(self, rid):
ResourceNodes = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == self["up"]:
if self.ResourceOp(rid, "monitor", node) == 0:
ResourceNodes.append(node)
return ResourceNodes
def isolate_node(self, node, allowlist):
'''isolate the communication between the nodes'''
rc = self.rsh(node, self["BreakCommCmd2"]%allowlist)
if rc == 0:
return 1
else:
self.log("Could not break the communication from node: %s",node)
return None
def Configuration(self):
if self.config:
return self.config.getElementsByTagName('configuration')[0]
warnings.filterwarnings("ignore")
cib_file=os.tmpnam()
warnings.resetwarnings()
os.system("rm -f "+cib_file)
if self.Env["ClobberCIB"] == 1:
if self.Env["CIBfilename"] == None:
self.debug("Creating new CIB in: " + cib_file)
os.system("echo \'"+ self.default_cts_cib +"\' > "+ cib_file)
else:
os.system("cp "+self.Env["CIBfilename"]+" "+cib_file)
else:
if 0 != self.rsh.echo_cp(
self.Env["nodes"][0], "@HA_VARLIBDIR@/heartbeat/crm/cib.xml", None, cib_file):
raise ValueError("Can not copy file to %s, maybe permission denied"%cib_file)
self.config = parse(cib_file)
os.remove(cib_file)
return self.config.getElementsByTagName('configuration')[0]
def Resources(self):
ResourceList = []
#read resources in cib
configuration = self.Configuration()
resources = configuration.getElementsByTagName('resources')[0]
rscs = configuration.getElementsByTagName('primitive')
incs = configuration.getElementsByTagName('clone')
groups = configuration.getElementsByTagName('group')
for rsc in rscs:
if rsc in resources.childNodes:
ResourceList.append(HAResource(self,rsc))
for grp in groups:
for rsc in rscs:
if rsc in grp.childNodes:
if self.use_short_names:
resource = HAResource(self,rsc)
else:
resource = HAResource(self,rsc,grp.getAttribute('id'))
ResourceList.append(resource)
for inc in incs:
max = 0
inc_name = inc.getAttribute("id")
instance_attributes = inc.getElementsByTagName('instance_attributes')[0]
attributes = instance_attributes.getElementsByTagName('attributes')[0]
nvpairs = attributes.getElementsByTagName('nvpair')
for nvpair in nvpairs:
if nvpair.getAttribute("name") == "clone_max":
max = int(nvpair.getAttribute("value"))
inc_rsc = inc.getElementsByTagName('primitive')[0]
for i in range(0,max):
rsc = HAResource(self,inc_rsc)
rsc.inc_no = i
rsc.inc_name = inc_name
rsc.inc_max = max
if self.use_short_names:
rsc.rid = rsc.rid + ":%d"%i
else:
rsc.rid = inc_name+":"+rsc.rid + ":%d"%i
rsc.Instance = rsc.rid
ResourceList.append(rsc)
return ResourceList
def ResourceGroups(self):
GroupList = []
#read resources in cib
configuration = self.Configuration()
groups = configuration.getElementsByTagName('group')
rscs = configuration.getElementsByTagName('primitive')
for grp in groups:
group = []
GroupList.append(group)
for rsc in rscs:
if rsc in grp.childNodes:
if self.use_short_names:
resource = HAResource(self,rsc)
else:
resource = HAResource(self,rsc,grp.getAttribute('id'))
group.append(resource)
return GroupList
def Dependencies(self):
DependencyList = []
#read dependency in cib
configuration=self.Configuration()
constraints=configuration.getElementsByTagName('constraints')[0]
rsc_to_rscs=configuration.getElementsByTagName('rsc_to_rsc')
for node in rsc_to_rscs:
dependency = {}
dependency["id"]=node.getAttribute('id')
dependency["from"]=node.getAttribute('from')
dependency["to"]=node.getAttribute('to')
dependency["type"]=node.getAttribute('type')
dependency["strength"]=node.getAttribute('strength')
DependencyList.append(dependency)
return DependencyList
def find_partitions(self):
ccm_partitions = []
for node in self.Env["nodes"]:
self.debug("Retrieving partition details for %s" %node)
if self.ShouldBeStatus[node] == self["up"]:
partition = self.rsh.readaline(node, self["ParitionCmd"])
if not partition:
self.log("no partition details for %s" %node)
elif len(partition) > 2:
partition = partition[:-1]
found=0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" %(node, partition))
ccm_partitions.append(partition)
else:
self.log("bad partition details for %s" %node)
return ccm_partitions
def HasQuorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.Env["nodes"]
for node in node_list:
if self.ShouldBeStatus[node] == self["up"]:
quorum = self.rsh.readaline(node, self["QuorumCmd"])
if string.find(quorum, "1") != -1:
return 1
elif string.find(quorum, "0") != -1:
return 0
else:
self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum)
return 0
def Components(self):
complist = [Process("lrmd",self),Process("crmd",self)]
if self.Env["DoFencing"] == 1 :
complist.append(Process("stonithd",self))
complist.append(Process("heartbeat",self))
return complist
def NodeUUID(self, node):
lines = self.rsh.readlines(node, self["UUIDQueryCmd"])
for line in lines:
self.debug("UUIDLine:"+ line)
m = re.search(r'%s.+\((.+)\)' % node, line)
if m:
return m.group(1)
return ""
def StandbyStatus(self, node):
out=self.rsh.readaline(node, self["StandbyQueryCmd"]%node)
if not out:
return "off"
out = out[:-1]
self.debug("Standby result: "+out)
return out
# status == "on" : Enter Standby mode
# status == "off": Enter Active mode
def SetStandbyMode(self, node, status):
current_status = self.StandbyStatus(node)
cmd = self["StandbyCmd"] % (node, status)
ret = self.rsh(node, cmd)
return True
class HAResource(Resource):
def __init__(self, cm, node, group=None):
'''
Get information from xml node
'''
if group == None :
self.rid = str(node.getAttribute('id'))
else :
self.rid = group + ":" + str(node.getAttribute('id'))
self.rclass = str(node.getAttribute('class'))
self.rtype = str(node.getAttribute('type'))
self.inc_name = None
self.inc_no = -1
self.inc_max = -1
self.rparameters = {}
nvpairs = []
list = node.getElementsByTagName('instance_attributes')
if len(list) > 0:
attributes = list[0]
list = attributes.getElementsByTagName('attributes')
if len(list) > 0:
parameters = list[0]
nvpairs = parameters.getElementsByTagName('nvpair')
for nvpair in nvpairs:
name=nvpair.getAttribute('name')
value=nvpair.getAttribute('value')
self.rparameters[name]=value
# This should normally be called first... FIXME!
Resource.__init__(self, cm, self.rtype, self.rid)
# resources that dont need quorum will have:
# <op name="start" prereq="nothing"/>
ops = node.getElementsByTagName('op')
for op in ops:
if op.getAttribute('name') == "start" and op.getAttribute('prereq') == "nothing":
self.needs_quorum = 0
def IsRunningOn(self, nodename):
'''
This member function returns true if our resource is running
on the given node in the cluster.
We call the status operation for the resource script.
'''
rc = self.CM.ResourceOp(self.rid, "monitor", nodename)
return (rc == 0)
def RunningNodes(self):
return self.CM.ResourceLocation(self.rid)
def Start(self, nodename):
'''
This member function starts or activates the resource.
'''
return self.CM.ResourceOp(self.rid, "start", nodename)
def Stop(self, nodename):
'''
This member function stops or deactivates the resource.
'''
return self.CM.ResourceOp(self.rid, "stop", nodename)
def IsWorkingCorrectly(self, nodename):
return self.IsRunningOn(nodename)
#######################################################################
#
# A little test code...
#
# Which you are advised to completely ignore...
#
#######################################################################
if __name__ == '__main__':
pass
diff --git a/cts/CTS.py.in b/cts/CTS.py.in
index 9d044b65f2..5df3fd76fa 100755
--- a/cts/CTS.py.in
+++ b/cts/CTS.py.in
@@ -1,1185 +1,1185 @@
#!@PYTHON@
'''CTS: Cluster Testing System: Main module
Classes related to testing high-availability clusters...
Lots of things are implemented.
Lots of things are not implemented.
We have many more ideas of what to do than we've implemented.
'''
__copyright__='''
Copyright (C) 2000, 2001 Alan Robertson <alanr@unix.sh>
Licensed under the GNU GPL.
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import types, string, select, sys, time, re, os, struct, os, signal
import base64, pickle, binascii
from UserDict import UserDict
from syslog import *
from popen2 import Popen3
class RemoteExec:
'''This is an abstract remote execution class. It runs a command on another
machine - somehow. The somehow is up to us. This particular
class uses ssh.
Most of the work is done by fork/exec of ssh or scp.
'''
def __init__(self):
# -n: no stdin, -x: no X11
self.Command = "@SSH@ -l root -n -x"
# -f: ssh to background
self.CommandnoBlock = "@SSH@ -f -l root -n -x"
# -B: batch mode, -q: no stats (quiet)
self.CpCommand = "@SCP@ -B -q"
self.OurNode=string.lower(os.uname()[1])
def setcmd(self, rshcommand):
'''Set the name of the remote shell command'''
self.Command = rshcommand
def _fixcmd(self, cmd):
return re.sub("\'", "'\\''", cmd)
def _cmd(self, *args):
'''Compute the string that will run the given command on the
given remote system'''
args= args[0]
sysname = args[0]
command = args[1]
#print "sysname: %s, us: %s" % (sysname, self.OurNode)
if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost":
ret = command
else:
ret = self.Command + " " + sysname + " '" + self._fixcmd(command) + "'"
#print ("About to run %s\n" % ret)
return ret
def _cmd_noblock(self, *args):
'''Compute the string that will run the given command on the
given remote system'''
args= args[0]
sysname = args[0]
command = args[1]
#print "sysname: %s, us: %s" % (sysname, self.OurNode)
if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost":
ret = command + " &"
else:
ret = self.CommandnoBlock + " " + sysname + " '" + self._fixcmd(command) + "'"
#print ("About to run %s\n" % ret)
return ret
def __call__(self, *args):
'''Run the given command on the given remote system
If you call this class like a function, this is the function that gets
called. It just runs it roughly as though it were a system() call
on the remote machine. The first argument is name of the machine to
run it on.
'''
count=0;
rc = 0;
while count < 3:
rc = os.system(self._cmd(args))
if rc == 0: return rc
print "Retrying command %s" % self._cmd(args)
count=count+1
return rc
def popen(self, *args):
'''popen the given remote command on the remote system.
As in __call__, the first argument is name of the machine to run it on.
'''
#print "Now running %s\n" % self._cmd(args)
return Popen3(self._cmd(args), None)
def readaline(self, *args):
'''Run a command on the remote machine and capture 1 line of
stdout from the given remote command
As in __call__, the first argument is name of the machine to run it on.
'''
p = self.popen(args[0], args[1])
p.tochild.close()
result = p.fromchild.readline()
p.fromchild.close()
self.lastrc = p.wait()
return result
def readlines(self, *args):
p = self.popen(args[0], args[1])
p.tochild.close()
result = p.fromchild.readlines()
p.fromchild.close()
self.lastrc = p.wait()
return result
def cp(self, *args):
'''Perform a remote copy'''
cpstring=self.CpCommand
for arg in args:
cpstring = cpstring + " \'" + arg + "\'"
count=0;
rc = 0;
for i in range(3):
rc = os.system(cpstring)
if rc == 0:
return rc
print "Retrying command %s" % cpstring
return rc
def echo_cp(self, src_host, src_file, dest_host, dest_file):
'''Perform a remote copy via echo'''
(rc, lines) = self.remote_py(src_host, "os", "system", "cat %s" % src_file)
if rc != 0:
print "Copy of %s:%s failed" % (src_host, src_file)
elif dest_host == None:
fd = open(dest_file, "w")
fd.writelines(lines)
fd.close()
else:
big_line=""
for line in lines:
big_line = big_line + line
(rc, lines) = self.remote_py(dest_host, "os", "system", "echo '%s' > %s" % (big_line, dest_file))
return rc
def noBlock(self, *args):
'''Perform a remote execution without waiting for it to finish'''
sshnoBlock = self._cmd_noblock(args)
count=0;
rc = 0;
for i in range(3):
rc = os.system(sshnoBlock)
if rc == 0:
return rc
print "Retrying command %s" % sshnoBlock
return rc
def remote_py(self, node, module, func, *args):
'''Execute a remote python function
If the call success, lastrc == 0 and return result.
If the call fail, lastrc == 1 and return the reason (string)
'''
encode_args = binascii.b2a_base64(pickle.dumps(args))
encode_cmd = string.join(["@hb_libdir@/cts/CTSproxy.py",module,func,encode_args])
#print "%s: %s.%s %s" % (node, module, func, repr(args))
result = self.readlines(node, encode_cmd)
if result != None:
result.pop()
if self.lastrc == 0:
last_line=""
if result != None:
array_len = len(result)
if array_len > 0:
last_line=result.pop()
#print "result: %s" % repr(last_line)
return pickle.loads(binascii.a2b_base64(last_line)), result
return -1, result
class LogWatcher:
'''This class watches logs for messages that fit certain regular
expressions. Watching logs for events isn't the ideal way
to do business, but it's better than nothing :-)
On the other hand, this class is really pretty cool ;-)
The way you use this class is as follows:
Construct a LogWatcher object
Call setwatch() when you want to start watching the log
Call look() to scan the log looking for the patterns
'''
def __init__(self, log, regexes, timeout=10, debug=None):
'''This is the constructor for the LogWatcher class. It takes a
log name to watch, and a list of regular expressions to watch for."
'''
# Validate our arguments. Better sooner than later ;-)
for regex in regexes:
assert re.compile(regex)
self.regexes = regexes
self.filename = log
self.debug=debug
self.whichmatch = -1
self.unmatched = None
if self.debug:
print "Debug now on for for log", log
self.Timeout = int(timeout)
self.returnonlymatch = None
if not os.access(log, os.R_OK):
raise ValueError("File [" + log + "] not accessible (r)")
def setwatch(self, frombeginning=None):
'''Mark the place to start watching the log from.
'''
self.file = open(self.filename, "r")
self.size = os.path.getsize(self.filename)
if not frombeginning:
self.file.seek(0,2)
def ReturnOnlyMatch(self, onlymatch=1):
'''Mark the place to start watching the log from.
'''
self.returnonlymatch = onlymatch
def look(self, timeout=None):
'''Examine the log looking for the given patterns.
It starts looking from the place marked by setwatch().
This function looks in the file in the fashion of tail -f.
It properly recovers from log file truncation, but not from
removing and recreating the log. It would be nice if it
recovered from this as well :-)
We return the first line which matches any of our patterns.
'''
last_line=None
first_line=None
if timeout == None: timeout = self.Timeout
done=time.time()+timeout+1
if self.debug:
print "starting search: timeout=%d" % timeout
for regex in self.regexes:
print "Looking for regex: ", regex
while (timeout <= 0 or time.time() <= done):
newsize=os.path.getsize(self.filename)
if self.debug > 4: print "newsize = %d" % newsize
if newsize < self.size:
# Somebody truncated the log!
if self.debug: print "Log truncated!"
self.setwatch(frombeginning=1)
continue
if newsize > self.file.tell():
line=self.file.readline()
if self.debug > 2: print "Looking at line:", line
if line:
last_line=line
if not first_line:
first_line=line
if self.debug: print "First line: "+ line
which=-1
for regex in self.regexes:
which=which+1
if self.debug > 3: print "Comparing line to ", regex
#matchobj = re.search(string.lower(regex), string.lower(line))
matchobj = re.search(regex, line)
if matchobj:
self.whichmatch=which
if self.returnonlymatch:
return matchobj.group(self.returnonlymatch)
else:
if self.debug: print "Returning line"
return line
newsize=os.path.getsize(self.filename)
if self.file.tell() == newsize:
if timeout > 0:
time.sleep(0.025)
else:
if self.debug: print "End of file"
if self.debug: print "Last line: "+last_line
return None
if self.debug: print "Timeout"
if self.debug: print "Last line: "+last_line
return None
def lookforall(self, timeout=None):
'''Examine the log looking for ALL of the given patterns.
It starts looking from the place marked by setwatch().
We return when the timeout is reached, or when we have found
ALL of the regexes that were part of the watch
'''
if timeout == None: timeout = self.Timeout
save_regexes = self.regexes
returnresult = []
while (len(self.regexes) > 0):
oneresult = self.look(timeout)
if not oneresult:
self.unmatched = self.regexes
self.regexes = save_regexes
return None
returnresult.append(oneresult)
del self.regexes[self.whichmatch]
self.unmatched = None
self.regexes = save_regexes
return returnresult
# In case we ever want multiple regexes to match a single line...
#- del self.regexes[self.whichmatch]
#+ tmp_regexes = self.regexes
#+ self.regexes = []
#+ which = 0
#+ for regex in tmp_regexes:
#+ matchobj = re.search(regex, oneresult)
#+ if not matchobj:
#+ self.regexes.append(regex)
class NodeStatus:
def __init__(self, Env):
self.Env = Env
self.rsh = RemoteExec()
def IsNodeBooted(self, node):
'''Return TRUE if the given node is booted (responds to pings)'''
return os.system("@PING@ -nq -c1 @PING_TIMEOUT_OPT@ %s >/dev/null 2>&1" % node) == 0
def IsSshdUp(self, node):
return self.rsh(node, "true") == 0;
def WaitForNodeToComeUp(self, node, Timeout=300):
'''Return TRUE when given node comes up, or None/FALSE if timeout'''
timeout=Timeout
anytimeouts=0
while timeout > 0:
if self.IsNodeBooted(node) and self.IsSshdUp(node):
if anytimeouts:
# Fudge to wait for the system to finish coming up
time.sleep(30)
self.Env.log("Node %s now up" % node)
return 1
time.sleep(1)
if (not anytimeouts):
self.Env.log("Waiting for node %s to come up" % node)
anytimeouts=1
timeout = timeout - 1
self.Env.log("%s did not come up within %d tries" % (node, Timeout))
def WaitForAllNodesToComeUp(self, nodes, timeout=300):
'''Return TRUE when all nodes come up, or FALSE if timeout'''
for node in nodes:
if not self.WaitForNodeToComeUp(node, timeout):
return None
return 1
class ClusterManager(UserDict):
'''The Cluster Manager class.
This is an subclass of the Python dictionary class.
(this is because it contains lots of {name,value} pairs,
not because it's behavior is that terribly similar to a
dictionary in other ways.)
This is an abstract class which class implements high-level
operations on the cluster and/or its cluster managers.
Actual cluster managers classes are subclassed from this type.
One of the things we do is track the state we think every node should
be in.
'''
def __InitialConditions(self):
#if os.geteuid() != 0:
# raise ValueError("Must Be Root!")
None
def _finalConditions(self):
for key in self.keys():
if self[key] == None:
raise ValueError("Improper derivation: self[" + key
+ "] must be overridden by subclass.")
def __init__(self, Environment, randseed=None):
self.Env = Environment
self.__InitialConditions()
- self.clear_cache = 0
+ self.clear_cache = 0
self.data = {
"up" : "up", # Status meaning up
"down" : "down", # Status meaning down
"StonithCmd" : "@sbindir@/stonith -t baytech -p '10.10.10.100 admin admin' %s",
"DeadTime" : 30, # Max time to detect dead node...
"StartTime" : 90, # Max time to start up
#
# These next values need to be overridden in the derived class.
#
"Name" : None,
"StartCmd" : None,
"StopCmd" : None,
"StatusCmd" : None,
"RereadCmd" : None,
"StartDRBDCmd" : None,
"StopDRBDCmd" : None,
"StatusDRBDCmd" : None,
"DRBDCheckconf" : None,
"BreakCommCmd" : None,
"FixCommCmd" : None,
"TestConfigDir" : None,
"LogFileName" : None,
"Pat:We_started" : None,
"Pat:They_started" : None,
"Pat:We_stopped" : None,
"Pat:They_stopped" : None,
"BadRegexes" : None, # A set of "bad news" regexes
# to apply to the log
}
self.rsh = RemoteExec()
self.ShouldBeStatus={}
self.OurNode=string.lower(os.uname()[1])
self.ShouldBeStatus={}
self.ns = NodeStatus(self.Env)
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return []
def log(self, args):
self.Env.log(args)
def debug(self, args):
self.Env.debug(args)
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
for node in self.Env["nodes"]:
if self.StataCM(node):
self.ShouldBeStatus[node]=self["up"]
else:
self.ShouldBeStatus[node]=self["down"]
def upcount(self):
'''How many nodes are up?'''
count=0
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node]==self["up"]:
count=count+1
return count
def TruncLogs(self):
'''Truncate the log for the cluster manager so we can start clean'''
if self["LogFileName"] != None:
os.system("cp /dev/null " + self["LogFileName"])
def install_config(self, node):
return None
def clear_all_caches(self):
- if self.clear_cache:
- for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] == self["down"]:
- self.debug("Removing cache file on: "+node)
- self.rsh.remote_py(node, "os", "system",
- "rm -f @HA_VARLIBDIR@/heartbeat/hostcache")
- else:
- self.debug("NOT Removing cache file on: "+node)
+ if self.clear_cache:
+ for node in self.Env["nodes"]:
+ if self.ShouldBeStatus[node] == self["down"]:
+ self.debug("Removing cache file on: "+node)
+ self.rsh.remote_py(node, "os", "system",
+ "rm -f @HA_VARLIBDIR@/heartbeat/hostcache")
+ else:
+ self.debug("NOT Removing cache file on: "+node)
def StartaCM(self, node):
'''Start up the cluster manager on a given node'''
self.debug("Starting %s on node %s" %(self["Name"], node))
if not self.ShouldBeStatus.has_key(node):
self.ShouldBeStatus[node] = self["down"]
if self.ShouldBeStatus[node] != self["down"]:
return 1
patterns = []
# Technically we should always be able to notice ourselves starting
if self.upcount() == 0:
patterns.append(self["Pat:We_started"] % node)
else:
patterns.append(self["Pat:They_started"] % node)
watch = LogWatcher(
self["LogFileName"], patterns, timeout=self["StartTime"]+10)
watch.setwatch()
self.install_config(node)
self.ShouldBeStatus[node] = "any"
if self.StataCM(node) and self.cluster_stable(self["DeadTime"]):
self.log ("%s was already started" %(node))
return 1
# Clear out the host cache so autojoin can be exercised
if self.clear_cache:
self.debug("Removing cache file on: "+node)
self.rsh.remote_py(node, "os", "system",
"rm -f @HA_VARLIBDIR@/heartbeat/hostcache")
if self.rsh(node, self["StartCmd"]) != 0:
self.log ("Warn: Start command failed on node %s" %(node))
return None
self.ShouldBeStatus[node]=self["up"]
watch_result = watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.log ("Warn: Startup pattern not found: %s" %(regex))
if watch_result:
#self.debug("Found match: "+ repr(watch_result))
self.cluster_stable(self["DeadTime"])
return 1
if self.StataCM(node) and self.cluster_stable(self["DeadTime"]):
return 1
self.log ("Warn: Start failed for node %s" %(node))
return None
def StartaCMnoBlock(self, node):
'''Start up the cluster manager on a given node with none-block mode'''
self.debug("Starting %s on node %s" %(self["Name"], node))
# Clear out the host cache so autojoin can be exercised
if self.clear_cache:
self.debug("Removing cache file on: "+node)
self.rsh.remote_py(node, "os", "system",
"rm -f @HA_VARLIBDIR@/heartbeat/hostcache")
self.rsh.noBlock(node, self["StartCmd"])
self.ShouldBeStatus[node]=self["up"]
return 1
def StopaCM(self, node):
'''Stop the cluster manager on a given node'''
self.debug("Stopping %s on node %s" %(self["Name"], node))
if self.ShouldBeStatus[node] != self["up"]:
return 1
if self.rsh(node, self["StopCmd"]) == 0:
self.ShouldBeStatus[node]=self["down"]
self.cluster_stable(self["DeadTime"])
return 1
else:
self.log ("Could not stop %s on node %s" %(self["Name"], node))
return None
def StopaCMnoBlock(self, node):
'''Stop the cluster manager on a given node with none-block mode'''
self.debug("Stopping %s on node %s" %(self["Name"], node))
self.rsh.noBlock(node, self["StopCmd"])
self.ShouldBeStatus[node]=self["down"]
return 1
def cluster_stable(self, timeout = None):
time.sleep(self["StableTime"])
return 1
def node_stable(self, node):
return 1
def RereadCM(self, node):
'''Force the cluster manager on a given node to reread its config
This may be a no-op on certain cluster managers.
'''
rc=self.rsh(node, self["RereadCmd"])
if rc == 0:
return 1
else:
self.log ("Could not force %s on node %s to reread its config"
% (self["Name"], node))
return None
def StataCM(self, node):
'''Report the status of the cluster manager on a given node'''
out=self.rsh.readaline(node, self["StatusCmd"])
ret= (string.find(out, 'stopped') == -1)
try:
if ret:
if self.ShouldBeStatus[node] == self["down"]:
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, self["up"], self.ShouldBeStatus[node]))
else:
if self.ShouldBeStatus[node] == self["up"]:
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, self["down"], self.ShouldBeStatus[node]))
except KeyError: pass
if ret: self.ShouldBeStatus[node]=self["up"]
else: self.ShouldBeStatus[node]=self["down"]
return ret
def startall(self, nodelist=None):
'''Start the cluster manager on every node in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
ret = 1
map = {}
if not nodelist:
nodelist=self.Env["nodes"]
for node in nodelist:
if self.ShouldBeStatus[node] == self["down"]:
if not self.StartaCM(node):
ret = 0
return ret
def stopall(self, nodelist=None):
'''Stop the cluster managers on every node in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
ret = 1
map = {}
if not nodelist:
nodelist=self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == self["up"]:
if not self.StopaCM(node):
ret = 0
return ret
def rereadall(self, nodelist=None):
'''Force the cluster managers on every node in the cluster
to reread their config files. We can do it on a subset of the
cluster if nodelist is not None.
'''
map = {}
if not nodelist:
nodelist=self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == self["up"]:
self.RereadCM(node)
def statall(self, nodelist=None):
'''Return the status of the cluster managers in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
result={}
if not nodelist:
nodelist=self.Env["nodes"]
for node in nodelist:
if self.StataCM(node):
result[node] = self["up"]
else:
result[node] = self["down"]
return result
def isolate_node(self, node):
'''isolate the communication between the nodes'''
rc = self.rsh(node, self["BreakCommCmd"])
if rc == 0:
return 1
else:
self.log("Could not break the communication between the nodes frome node: %s" % node)
return None
def unisolate_node(self, node):
'''fix the communication between the nodes'''
rc = self.rsh(node, self["FixCommCmd"])
if rc == 0:
return 1
else:
self.log("Could not fix the communication between the nodes from node: %s" % node)
return None
def reducecomm_node(self,node):
'''reduce the communication between the nodes'''
rc = self.rsh(node, self["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"]))
if rc == 0:
return 1
else:
self.log("Could not reduce the communication between the nodes from node: %s" % node)
return None
def savecomm_node(self,node):
'''save current the communication between the nodes'''
rc = 0
if float(self.Env["XmitLoss"])!=0 or float(self.Env["RecvLoss"])!=0 :
rc = self.rsh(node, self["SaveFileCmd"]);
if rc == 0:
return 1
else:
self.log("Could not save the communication between the nodes from node: %s" % node)
return None
def restorecomm_node(self,node):
'''restore the saved communication between the nodes'''
rc = 0
if float(self.Env["XmitLoss"])!=0 or float(self.Env["RecvLoss"])!=0 :
rc = self.rsh(node, self["RestoreCommCmd"]);
if rc == 0:
return 1
else:
self.log("Could not restore the communication between the nodes from node: %s" % node)
return None
def SyncTestConfigs(self):
'''Synchronize test configurations throughout the cluster.
This one's a no-op for FailSafe, since it does that by itself.
'''
fromdir=self["TestConfigDir"]
if not os.access(fromdir, os.F_OK | os.R_OK | os.W_OK):
raise ValueError("Directory [" + fromdir + "] not accessible (rwx)")
for node in self.Env["nodes"]:
if node == self.OurNode: continue
self.log("Syncing test configurations on " + node)
# Perhaps I ought to use rsync...
self.rsh.cp("-r", fromdir, node + ":" + fromdir)
def SetClusterConfig(self, configpath="default", nodelist=None):
'''Activate the named test configuration throughout the cluster.
It would be useful to implement this :-)
'''
pass
return 1
def ResourceGroups(self):
"Return a list of resource type/instance pairs for the cluster"
raise ValueError("Abstract Class member (ResourceGroups)")
def InternalCommConfig(self):
"Return a list of paths: each patch consists of a tuple"
raise ValueError("Abstract Class member (InternalCommConfig)")
def HasQuorum(self, node_list):
"Return TRUE if the cluster currently has quorum"
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
raise ValueError("Abstract Class member (HasQuorum)")
def Components(self):
raise ValueError("Abstract Class member (Components)")
def TestLogging(self):
patterns= []
prefix="Test message from "
for node in self.Env["nodes"]:
patterns.append(prefix + node)
watch = LogWatcher(self["LogFileName"], patterns, 30 + len(self.Env["nodes"]))
watch.setwatch()
logpri = self.Env["logfacility"] + ".info"
for node in self.Env["nodes"]:
cmd="logger -p %s %s%s" % (logpri, prefix, node)
if self.rsh.noBlock(node, cmd) != 0:
self.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
watch_result = watch.lookforall()
if watch.unmatched:
self.log("ERROR: Remote logging is not working. Please fix before continuing.")
for regex in watch.unmatched:
self.log ("ERROR: Test message [%s] not found in logs." % (regex))
return None
return 1
def CheckDf(self):
dfcmd="df -k /var/log | tail -1 | tr -s ' ' | cut -d' ' -f2"
dfmin=500000
result=1
for node in self.Env["nodes"]:
dfout=self.rsh.readaline(node, dfcmd)
if not dfout:
self.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
else:
try:
idfout = int(dfout)
except (ValueError, TypeError):
self.log("Warning: df output from %s was invalid [%s]" % (node, dfout))
else:
if idfout == 0:
self.log("CRIT: Completely out of log disk space on %s" % node)
result=None
elif idfout <= 500:
self.log("WARN: Low on log disk space (%d Mbytes) on %s" % (idfout, node))
return result
class Resource:
'''
This is an HA resource (not a resource group).
A resource group is just an ordered list of Resource objects.
'''
def __init__(self, cm, rsctype=None, instance=None):
self.CM = cm
self.ResourceType = rsctype
self.Instance = instance
self.needs_quorum = 1
def Type(self):
return self.ResourceType
def Instance(self, nodename):
return self.Instance
def IsRunningOn(self, nodename):
'''
This member function returns true if our resource is running
on the given node in the cluster.
It is analagous to the "status" operation on SystemV init scripts and
heartbeat scripts. FailSafe calls it the "exclusive" operation.
'''
raise ValueError("Abstract Class member (IsRunningOn)")
return None
def IsWorkingCorrectly(self, nodename):
'''
This member function returns true if our resource is operating
correctly on the given node in the cluster.
Heartbeat does not require this operation, but it might be called
the Monitor operation, which is what FailSafe calls it.
For remotely monitorable resources (like IP addresses), they *should*
be monitored remotely for testing.
'''
raise ValueError("Abstract Class member (IsWorkingCorrectly)")
return None
def Start(self, nodename):
'''
This member function starts or activates the resource.
'''
raise ValueError("Abstract Class member (Start)")
return None
def Stop(self, nodename):
'''
This member function stops or deactivates the resource.
'''
raise ValueError("Abstract Class member (Stop)")
return None
def __repr__(self):
if (self.Instance and len(self.Instance) > 1):
return "{" + self.ResourceType + "::" + self.Instance + "}"
else:
return "{" + self.ResourceType + "}"
class Component:
def kill(self, node):
None
class Process(Component):
def __init__(self, name, cm):
self.name = str(name)
self.CM = cm
self.KillCmd = "killall -9 " + self.name
def kill(self, node):
if self.CM.rsh(node, self.KillCmd) != 0:
self.log ("Warn: Kill %s failed on node %s" %(name,node))
return None
return 1
class ScenarioComponent:
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''Return TRUE if the current ScenarioComponent is applicable
in the given LabEnvironment given to the constructor.
'''
raise ValueError("Abstract Class member (IsApplicable)")
def SetUp(self, CM):
'''Set up the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
def TearDown(self, CM):
'''Tear down (undo) the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
class Scenario:
(
'''The basic idea of a scenario is that of an ordered list of
ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn,
and then after the tests have been run, they are torn down using TearDown()
(in reverse order).
A Scenario is applicable to a particular cluster manager iff each
ScenarioComponent is applicable.
A partially set up scenario is torn down if it fails during setup.
''')
def __init__(self, Components):
"Initialize the Scenario from the list of ScenarioComponents"
for comp in Components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of"
" ScenarioComponent")
self.Components = Components
def IsApplicable(self):
(
'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable()
'''
)
for comp in self.Components:
if not comp.IsApplicable():
return None
return 1
def SetUp(self, CM):
'''Set up the Scenario. Return TRUE on success.'''
j=0
while j < len(self.Components):
if not self.Components[j].SetUp(CM):
# OOPS! We failed. Tear partial setups down.
CM.log("Tearing down partial setup")
self.TearDown(CM, j)
return None
j=j+1
return 1
def TearDown(self, CM, max=None):
'''Tear Down the Scenario - in reverse order.'''
if max == None:
max = len(self.Components)-1
j=max
while j >= 0:
self.Components[j].TearDown(CM)
j=j-1
class InitClusterManager(ScenarioComponent):
(
'''InitClusterManager is the most basic of ScenarioComponents.
This ScenarioComponent simply starts the cluster manager on all the nodes.
It is fairly robust as it waits for all nodes to come up before starting
as they might have been rebooted or crashed for some reason beforehand.
''')
def __init__(self, Env):
pass
def IsApplicable(self):
'''InitClusterManager is so generic it is always Applicable'''
return 1
def SetUp(self, CM):
'''Basic Cluster Manager startup. Start everything'''
CM.prepare()
# Clear out the cobwebs ;-)
self.TearDown(CM)
for node in CM.Env["nodes"]:
CM.rsh(node, CM["DelFileCommCmd"]+ "; true")
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all nodes.")
return CM.startall()
def TearDown(self, CM):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Stopping Cluster Manager on all nodes")
return CM.stopall()
class PingFest(ScenarioComponent):
(
'''PingFest does a flood ping to each node in the cluster from the test machine.
If the LabEnvironment Parameter PingSize is set, it will be used as the size
of ping packet requested (via the -s option). If it is not set, it defaults
to 1024 bytes.
According to the manual page for ping:
Outputs packets as fast as they come back or one hundred times per
second, whichever is more. For every ECHO_REQUEST sent a period ``.''
is printed, while for every ECHO_REPLY received a backspace is printed.
This provides a rapid display of how many packets are being dropped.
Only the super-user may use this option. This can be very hard on a net-
work and should be used with caution.
''' )
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''PingFests are always applicable ;-)
'''
return 1
def SetUp(self, CM):
'''Start the PingFest!'''
self.PingSize=1024
if CM.Env.has_key("PingSize"):
self.PingSize=CM.Env["PingSize"]
CM.log("Starting %d byte flood pings" % self.PingSize)
self.PingPids=[]
for node in CM.Env["nodes"]:
self.PingPids.append(self._pingchild(node))
CM.log("Ping PIDs: " + repr(self.PingPids))
return 1
def TearDown(self, CM):
'''Stop it right now! My ears are pinging!!'''
for pid in self.PingPids:
if pid != None:
CM.log("Stopping ping process %d" % pid)
os.kill(pid, signal.SIGKILL)
def _pingchild(self, node):
Args = ["ping", "-qfn", "-s", str(self.PingSize), node]
sys.stdin.flush()
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid < 0:
self.Env.log("Cannot fork ping child")
return None
if pid > 0:
return pid
# Otherwise, we're the child process.
os.execvp("ping", Args)
self.Env.log("Cannot execvp ping: " + repr(Args))
sys.exit(1)
class PacketLoss(ScenarioComponent):
(
'''
It would be useful to do some testing of CTS with a modest amount of packet loss
enabled - so we could see that everything runs like it should with a certain
amount of packet loss present.
''')
def IsApplicable(self):
'''always Applicable'''
return 1
def SetUp(self, CM):
'''Reduce the reliability of communications'''
if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
return 1
for node in CM.Env["nodes"]:
CM.reducecomm_node(node)
CM.log("Reduce the reliability of communications")
return 1
def TearDown(self, CM):
'''Fix the reliability of communications'''
if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
return 1
for node in CM.Env["nodes"]:
CM.unisolate_node(node)
CM.log("Fix the reliability of communications")
class BasicSanityCheck(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["DoBSC"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on BSC node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on BSC node(s).")
return CM.stopall()
diff --git a/cts/CTSaudits.py.in b/cts/CTSaudits.py.in
index 2e35a2001f..d6edfa87a4 100755
--- a/cts/CTSaudits.py.in
+++ b/cts/CTSaudits.py.in
@@ -1,672 +1,672 @@
#!@PYTHON@
'''CTS: Cluster Testing System: Audit module
'''
__copyright__='''
Copyright (C) 2000, 2001,2005 Alan Robertson <alanr@unix.sh>
Licensed under the GNU GPL.
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import time, os, popen2, string, re
import CTS
import os
import popen2
class ClusterAudit:
def __init__(self, cm):
self.CM = cm
def __call__(self):
raise ValueError("Abstract Class member (__call__)")
def is_applicable(self):
'''Return TRUE if we are applicable in the current test configuration'''
raise ValueError("Abstract Class member (is_applicable)")
return 1
def name(self):
raise ValueError("Abstract Class member (name)")
AllAuditClasses = [ ]
class ResourceAudit(ClusterAudit):
def name(self):
return "ResourceAudit"
def _doauditRsc(self, resource):
ResourceNodes = []
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
if resource.IsRunningOn(node):
ResourceNodes.append(node)
return ResourceNodes
def _doaudit(self):
'''Check to see if all resources are running in exactly one place
in the cluster.
We also verify that the members of a resource group are all
running on the same node in the cluster,
and we monitor that they are all running "properly".
'''
Fatal = 0
result = []
# Thought: use self.CM.find_partitions() and make this audit
# aware of partitions. Since in a split cluster one
# partition may have quorum (and permission to run resources)
# and the other not.
Groups = self.CM.ResourceGroups()
for group in Groups:
GrpServedBy = None
lastResource = None
for resource in group:
#
# _doauditRsc returns the set of nodes serving
# the given resource. This is normally a single node.
#
ResourceNodes = self._doauditRsc(resource)
# Is the resource served without quorum present?
if not self.CM.HasQuorum(None) and len(ResourceNodes) != 0 and resource.needs_quorum:
result.append("Resource " + repr(resource)
+ " active without Quorum: "
+ repr(ResourceNodes))
# Is the resource served at all?
elif len(ResourceNodes) == 0 and self.CM.HasQuorum(None):
result.append("Resource " + repr(resource)
+ " not served anywhere.")
# Is the resource served too many times?
elif len(ResourceNodes) > 1:
result.append("Resource " + repr(resource)
+ " served too many times: "
+ repr(ResourceNodes))
self.CM.log("Resource " + repr(resource)
+ " served too many times: "
+ repr(ResourceNodes))
Fatal = 1
elif GrpServedBy == None:
GrpServedBy = ResourceNodes
# Are all the members of the Rsc Grp served by the same node?
elif GrpServedBy != ResourceNodes:
result.append("Resource group resources" + repr(resource)
+ " running on different nodes: "
+ repr(ResourceNodes)+" vs "+repr(GrpServedBy)
+ "(otherRsc = " + repr(lastResource) + ")")
self.CM.log("Resource group resources" + repr(resource)
+ " running on different nodes: "
+ repr(ResourceNodes)+" vs "+repr(GrpServedBy)
+ "(otherRsc = " + repr(lastResource) + ")")
Fatal = 1
if self.CM.Env.has_key("SuppressMonitoring") and \
self.CM.Env["SuppressMonitoring"]:
continue
# Is the resource working correctly ?
if not Fatal and len(ResourceNodes) == 1:
beforearpchild = popen2.Popen3("date;/sbin/arp -n|cut -c1-15,26-50,75-"
, None)
beforearpchild.tochild.close() # /dev/null
if not resource.IsWorkingCorrectly(ResourceNodes[0]):
afterarpchild = popen2.Popen3("/sbin/arp -n|cut -c1-15,26-50,75-"
, None)
afterarpchild.tochild.close() # /dev/null
result.append("Resource " + repr(resource)
+ " not operating properly."
+ " Resource is running on " + ResourceNodes[0]);
Fatal = 1
self.CM.log("ARP table before failure ========");
for line in beforearpchild.fromchild.readlines():
self.CM.log(line)
self.CM.log("ARP table after failure ========");
for line in afterarpchild.fromchild.readlines():
self.CM.log(line)
self.CM.log("End of ARP tables ========");
try:
beforearpchild.wait()
afterarpchild.wait()
except OSError: pass
afterarpchild.fromchild.close()
beforearpchild.fromchild.close()
lastResource = resource
if (Fatal):
result.insert(0, "FATAL") # Kludgy.
return result
def __call__(self):
#
# Audit the resources. Since heartbeat doesn't really
# know when resource acquisition is complete, we will
# poll until things get stable.
#
# Having a resource duplicately implemented is a Fatal Error
# with no tolerance granted.
#
audresult = self._doaudit()
#
# Probably the constant below should be a CM parameter.
# Then it could be 0 for FailSafe.
# Of course, it really depends on what resources
# you have in the test suite, and how long it takes
# for them to settle.
# Recently, we've changed heartbeat so we know better when
# resource acquisition is done.
#
audcount=5;
while(audcount > 0):
audresult = self._doaudit()
if (len(audresult) <= 0 or audresult[0] == "FATAL"):
audcount=0
else:
audcount = audcount - 1
if (audcount > 0):
time.sleep(1)
if (len(audresult) > 0):
self.CM.log("Fatal Audit error: " + repr(audresult))
return (len(audresult) == 0)
def is_applicable(self):
if self.CM["Name"] == "heartbeat":
return 1
return 0
class HAResourceAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
def _RscRunningNodes(self, resource):
ResourceNodes = []
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
if resource.IsRunningOn(node):
ResourceNodes.append(node)
return ResourceNodes
def __call__(self):
passed = 1
NodeofRsc = {}
NumofInc = {}
MaxofInc = {}
self.CM.debug("Do Audit HAResourceAudit")
#Calculate the count of active nodes
up_count = 0;
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
up_count += 1
#Make sure the resouces are running on one and only one node
Resources = self.CM.Resources()
for resource in Resources :
RunningNodes = self._RscRunningNodes(resource)
NodeofRsc[resource.rid]=RunningNodes
if resource.inc_name == None:
#Is the resource served without quorum present?
if not self.CM.HasQuorum(None) and len(RunningNodes) != 0 and resource.needs_quorum:
self.CM.log("Resource " + repr(resource)
+ " active without Quorum: "
+ repr(RunningNodes))
passed = 0
#Is the resource served at all?
elif len(RunningNodes) == 0 and self.CM.HasQuorum(None):
self.CM.log("Resource " + repr(resource)
+ " not served anywhere.")
passed = 0
# Is the resource served too many times?
elif len(RunningNodes) > 1:
self.CM.log("Resource " + repr(resource)
+ " served too many times: "
+ repr(RunningNodes))
passed = 0
else:
if not NumofInc.has_key(resource.inc_name):
NumofInc[resource.inc_name]=0
MaxofInc[resource.inc_name]=resource.inc_max
running = 1
#Is the resource served without quorum present?
if not self.CM.HasQuorum(None) and len(RunningNodes) != 0 and resource.needs_quorum == 1:
self.CM.log("Resource " + repr(resource)
+ " active without Quorum: "
+ repr(RunningNodes))
passed = 0
#Is the resource served at all?
elif len(RunningNodes) == 0 :
running = 0
# Is the resource served too many times?
elif len(RunningNodes) > 1:
self.CM.log("Resource " + repr(resource)
+ " served too many times: "
+ repr(RunningNodes))
passed = 0
if running:
NumofInc[resource.inc_name] += 1
if self.CM.HasQuorum(None):
for inc_name in NumofInc.keys():
if NumofInc[inc_name] != min(up_count, MaxofInc[inc_name]):
passed = 0
self.CM.log("Cloned resource "+ str(inc_name)
- +" has "+ str(NumofInc[inc_name])
- +" active instances (max: "
- + str(MaxofInc[inc_name])
- +", active nodes: "+ str(up_count) + ")")
-
+ +" has "+ str(NumofInc[inc_name])
+ +" active instances (max: "
+ + str(MaxofInc[inc_name])
+ +", active nodes: "+ str(up_count) + ")")
+
Groups = self.CM.ResourceGroups()
for group in Groups :
- group_printed = 0
- first_rsc = group[0].rid
+ group_printed = 0
+ first_rsc = group[0].rid
RunningNodes = NodeofRsc[first_rsc]
for rsc in group :
if RunningNodes != NodeofRsc[rsc.rid]:
passed = 0
- if group_printed == 0:
- group_printed = 1
- self.CM.log("Group audit failed for: %s" % repr(group))
- if not NodeofRsc[first_rsc] or len(NodeofRsc[first_rsc]) == 0:
- self.CM.log("* %s not running" % first_rsc)
- else:
- self.CM.log("* %s running on %s"
- %(first_rsc, repr(NodeofRsc[first_rsc])))
-
- if not NodeofRsc[rsc.rid] or len(NodeofRsc[rsc.rid]) == 0:
- self.CM.log("* %s not running" % rsc.rid)
- else:
- self.CM.log("* %s running on %s"
- %(rsc.rid, repr(NodeofRsc[rsc.rid])))
+ if group_printed == 0:
+ group_printed = 1
+ self.CM.log("Group audit failed for: %s" % repr(group))
+ if not NodeofRsc[first_rsc] or len(NodeofRsc[first_rsc]) == 0:
+ self.CM.log("* %s not running" % first_rsc)
+ else:
+ self.CM.log("* %s running on %s"
+ %(first_rsc, repr(NodeofRsc[first_rsc])))
+
+ if not NodeofRsc[rsc.rid] or len(NodeofRsc[rsc.rid]) == 0:
+ self.CM.log("* %s not running" % rsc.rid)
+ else:
+ self.CM.log("* %s running on %s"
+ %(rsc.rid, repr(NodeofRsc[rsc.rid])))
# Make sure the resouces with "must","placement" constraint
# are running on the same node
Dependancies = self.CM.Dependencies()
for dependency in Dependancies:
if dependency["type"] == "placement" and dependency["strength"] == "must":
if NodeofRsc[dependency["from"]] != NodeofRsc[dependency["to"]]:
print dependency["from"] + " and " + dependency["to"] + " should be run on same node"
passed = 0
return passed
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["ResCanStop"] == 0:
return 1
return 0
def name(self):
return "HAResourceAudit"
class CrmdStateAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
def has_key(self, key):
return self.Stats.has_key(key)
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not self.Stats.has_key(name):
self.Stats[name]=0
self.Stats[name] = self.Stats[name]+1
def __call__(self):
passed = 1
up_are_down = 0
down_are_up = 0
unstable_list = []
self.CM.debug("Do Audit %s"%self.name())
for node in self.CM.Env["nodes"]:
- should_be = self.CM.ShouldBeStatus[node]
- rc = self.CM.test_node_CM(node)
- if rc > 0:
- if should_be == self.CM["down"]:
+ should_be = self.CM.ShouldBeStatus[node]
+ rc = self.CM.test_node_CM(node)
+ if rc > 0:
+ if should_be == self.CM["down"]:
down_are_up = down_are_up + 1
- if rc == 1:
- unstable_list.append(node)
- elif should_be == self.CM["up"]:
- up_are_down = up_are_down + 1
+ if rc == 1:
+ unstable_list.append(node)
+ elif should_be == self.CM["up"]:
+ up_are_down = up_are_down + 1
if len(unstable_list) > 0:
passed = 0
self.CM.log("Cluster is not stable: %d (of %d): %s"
%(len(unstable_list), self.CM.upcount(), repr(unstable_list)))
if up_are_down > 0:
passed = 0
self.CM.log("%d (of %d) nodes expected to be up were down."
%(up_are_down, len(self.CM.Env["nodes"])))
if down_are_up > 0:
passed = 0
self.CM.log("%d (of %d) nodes expected to be down were up."
%(down_are_up, len(self.CM.Env["nodes"])))
return passed
def name(self):
return "CrmdStateAudit"
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
class CIBAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
def has_key(self, key):
return self.Stats.has_key(key)
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not self.Stats.has_key(name):
self.Stats[name]=0
self.Stats[name] = self.Stats[name]+1
def __call__(self):
self.CM.debug("Do Audit %s"%self.name())
passed = 1
ccm_partitions = self.CM.find_partitions()
if len(ccm_partitions) == 0:
- self.CM.debug("\tNo partitions to audit")
+ self.CM.debug("\tNo partitions to audit")
return 1
for partition in ccm_partitions:
- self.CM.debug("\tAuditing CIB consistency for: %s" %partition)
+ self.CM.debug("\tAuditing CIB consistency for: %s" %partition)
partition_passed = 0
if self.audit_cib_contents(partition) == 0:
passed = 0
return passed
def audit_cib_contents(self, hostlist):
- passed = 1
- first_host = None
- first_host_xml = ""
- partition_hosts = hostlist.split()
- for a_host in partition_hosts:
- if first_host == None:
- first_host = a_host
- first_host_xml = self.store_remote_cib(a_host)
- #self.CM.debug("Retrieved CIB: %s" % first_host_xml)
- else:
- a_host_xml = self.store_remote_cib(a_host)
- diff_cmd="@sbindir@/crm_diff -c -VV -f -N \'%s\' -O '%s'" % (a_host_xml, first_host_xml)
-
- infile, outfile, errfile = os.popen3(diff_cmd)
- diff_lines = outfile.readlines()
- for line in diff_lines:
- if not re.search("<diff/>", line):
- passed = 0
- self.CM.log("CibDiff[%s-%s]: %s"
- % (first_host, a_host, line))
- else:
- self.CM.debug("CibDiff[%s-%s] Ignoring: %s"
- % (first_host, a_host, line))
-
- diff_lines = errfile.readlines()
- for line in diff_lines:
- passed = 0
- self.CM.log("CibDiff[%s-%s] ERROR: %s"
- % (first_host, a_host, line))
-
- return passed
-
+ passed = 1
+ first_host = None
+ first_host_xml = ""
+ partition_hosts = hostlist.split()
+ for a_host in partition_hosts:
+ if first_host == None:
+ first_host = a_host
+ first_host_xml = self.store_remote_cib(a_host)
+ #self.CM.debug("Retrieved CIB: %s" % first_host_xml)
+ else:
+ a_host_xml = self.store_remote_cib(a_host)
+ diff_cmd="@sbindir@/crm_diff -c -VV -f -N \'%s\' -O '%s'" % (a_host_xml, first_host_xml)
+
+ infile, outfile, errfile = os.popen3(diff_cmd)
+ diff_lines = outfile.readlines()
+ for line in diff_lines:
+ if not re.search("<diff/>", line):
+ passed = 0
+ self.CM.log("CibDiff[%s-%s]: %s"
+ % (first_host, a_host, line))
+ else:
+ self.CM.debug("CibDiff[%s-%s] Ignoring: %s"
+ % (first_host, a_host, line))
+
+ diff_lines = errfile.readlines()
+ for line in diff_lines:
+ passed = 0
+ self.CM.log("CibDiff[%s-%s] ERROR: %s"
+ % (first_host, a_host, line))
+
+ return passed
+
def store_remote_cib(self, node):
- combined = ""
- first_line = 1
- extra_debug = 0
+ combined = ""
+ first_line = 1
+ extra_debug = 0
#self.CM.debug("\tRetrieving CIB from: %s" % node)
lines = self.CM.rsh.readlines(node, self.CM["CibQuery"])
- if extra_debug:
- self.CM.debug("Start Cib[%s]" % node)
+ if extra_debug:
+ self.CM.debug("Start Cib[%s]" % node)
for line in lines:
- combined = combined + line[:-1]
- if first_line:
+ combined = combined + line[:-1]
+ if first_line:
self.CM.debug("[Cib]" + line)
- first_line = 0
- elif extra_debug:
+ first_line = 0
+ elif extra_debug:
self.CM.debug("[Cib]" + line)
- if extra_debug:
- self.CM.debug("End Cib[%s]" % node)
+ if extra_debug:
+ self.CM.debug("End Cib[%s]" % node)
- #self.CM.debug("Complete CIB: %s" % combined)
- return combined
+ #self.CM.debug("Complete CIB: %s" % combined)
+ return combined
def name(self):
return "CibAudit"
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
class PartitionAudit(ClusterAudit):
def __init__(self, cm):
self.CM = cm
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
self.NodeEpoche={}
self.NodeState={}
self.NodeQuorum={}
def has_key(self, key):
return self.Stats.has_key(key)
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not self.Stats.has_key(name):
self.Stats[name]=0
self.Stats[name] = self.Stats[name]+1
def __call__(self):
self.CM.debug("Do Audit %s"%self.name())
passed = 1
ccm_partitions = self.CM.find_partitions()
if ccm_partitions == None or len(ccm_partitions) == 0:
return 1
if len(ccm_partitions) > 1:
self.CM.log("Warn: %d cluster partitions detected:" %len(ccm_partitions))
for partition in ccm_partitions:
self.CM.log("\t %s" %partition)
for partition in ccm_partitions:
partition_passed = 0
if self.audit_partition(partition) == 0:
passed = 0
return passed
def trim_string(self, avalue):
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
def trim2int(self, avalue):
if not avalue:
return None
if len(avalue) > 1:
return int(avalue[:-1])
def audit_partition(self, partition):
passed = 1
dc_found = []
dc_allowed_list = []
lowest_epoche = None
node_list = partition.split()
self.CM.debug("Auditing partition: %s" %(partition))
for node in node_list:
if self.CM.ShouldBeStatus[node] != self.CM["up"]:
self.CM.log("Warn: Node %s appeared out of nowhere" %(node))
self.CM.ShouldBeStatus[node] = self.CM["up"]
- # not in itself a reason to fail the audit (not what we're
- # checking for in this audit)
-
- self.NodeState[node] = self.CM.rsh.readaline(
- node, self.CM["StatusCmd"]%node)
- self.NodeEpoche[node] = self.CM.rsh.readaline(
- node, self.CM["EpocheCmd"])
- self.NodeQuorum[node] = self.CM.rsh.readaline(
- node, self.CM["QuorumCmd"])
-
- self.NodeState[node] = self.trim_string(self.NodeState[node])
- self.NodeEpoche[node] = self.trim2int(self.NodeEpoche[node])
- self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node])
-
- if not self.NodeEpoche[node]:
- self.CM.log("Warn: Node %s dissappeared: cant determin epoche" %(node))
+ # not in itself a reason to fail the audit (not what we're
+ # checking for in this audit)
+
+ self.NodeState[node] = self.CM.rsh.readaline(
+ node, self.CM["StatusCmd"]%node)
+ self.NodeEpoche[node] = self.CM.rsh.readaline(
+ node, self.CM["EpocheCmd"])
+ self.NodeQuorum[node] = self.CM.rsh.readaline(
+ node, self.CM["QuorumCmd"])
+
+ self.NodeState[node] = self.trim_string(self.NodeState[node])
+ self.NodeEpoche[node] = self.trim2int(self.NodeEpoche[node])
+ self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node])
+
+ if not self.NodeEpoche[node]:
+ self.CM.log("Warn: Node %s dissappeared: cant determin epoche" %(node))
self.CM.ShouldBeStatus[node] = self.CM["down"]
- # not in itself a reason to fail the audit (not what we're
- # checking for in this audit)
+ # not in itself a reason to fail the audit (not what we're
+ # checking for in this audit)
elif lowest_epoche == None or self.NodeEpoche[node] < lowest_epoche:
lowest_epoche = self.NodeEpoche[node]
- if not lowest_epoche:
- self.CM.log("Lowest epoche not determined in %s" % (partition))
- passed = 0
+ if not lowest_epoche:
+ self.CM.log("Lowest epoche not determined in %s" % (partition))
+ passed = 0
for node in node_list:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
if self.CM.is_node_dc(node, self.NodeState[node]):
dc_found.append(node)
if self.NodeEpoche[node] == lowest_epoche:
self.CM.debug("%s: OK" % node)
- elif not self.NodeEpoche[node]:
+ elif not self.NodeEpoche[node]:
self.CM.debug("Check on %s ignored: no node epoche" % node)
elif not lowest_epoche:
self.CM.debug("Check on %s ignored: no lowest epoche" % node)
- else:
+ else:
self.CM.log("DC %s is not the oldest node (%d vs. %d)"
%(node, self.NodeEpoche[node], lowest_epoche))
passed = 0
if len(dc_found) == 0:
- self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)"
- %(len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
+ self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)"
+ %(len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
elif len(dc_found) > 1:
- self.CM.log("%d DCs (%s) found in cluster partition: %s"
- %(len(dc_found), str(dc_found), str(node_list)))
- passed = 0
+ self.CM.log("%d DCs (%s) found in cluster partition: %s"
+ %(len(dc_found), str(dc_found), str(node_list)))
+ passed = 0
elif self.CM.Env["CIBResource"] == 1 and self.NodeQuorum[dc_found[0]] == "1":
self.CM.debug("%s: %s" % (dc_found[0], self.NodeQuorum[dc_found[0]]))
Resources = self.CM.Resources()
for node in node_list:
- if self.CM.ShouldBeStatus[node] == self.CM["up"]:
- for resource in Resources:
- if resource.rid == "rsc_"+node:
- if resource.IsRunningOn(node) == 0:
- self.CM.log("Node %s is not running its own resource" %(node))
- passed = 0
+ if self.CM.ShouldBeStatus[node] == self.CM["up"]:
+ for resource in Resources:
+ if resource.rid == "rsc_"+node:
+ if resource.IsRunningOn(node) == 0:
+ self.CM.log("Node %s is not running its own resource" %(node))
+ passed = 0
elif self.CM.Env["CIBResource"] == 1:
# no quorum means no resource management
self.CM.debug("Not auditing resources - no quorum")
if passed == 0:
for node in node_list:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
self.CM.log("epoche %s : %s"
%(self.NodeEpoche[node], self.NodeState[node]))
return passed
def name(self):
return "PartitionAudit"
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
AllAuditClasses.append(CrmdStateAudit)
AllAuditClasses.append(PartitionAudit)
AllAuditClasses.append(ResourceAudit)
AllAuditClasses.append(HAResourceAudit)
AllAuditClasses.append(CIBAudit)
def AuditList(cm):
result = []
for auditclass in AllAuditClasses:
result.append(auditclass(cm))
return result
diff --git a/cts/CTSlab.py.in b/cts/CTSlab.py.in
index 1a73f24a08..b05317e5ff 100755
--- a/cts/CTSlab.py.in
+++ b/cts/CTSlab.py.in
@@ -1,823 +1,823 @@
#!@PYTHON@
'''CTS: Cluster Testing System: Lab environment module
'''
__copyright__='''
Copyright (C) 2001,2005 Alan Robertson <alanr@unix.sh>
Licensed under the GNU GPL.
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from UserDict import UserDict
import sys, time, types, string, syslog, random, os, string, signal, traceback
from CTS import ClusterManager
from CM_hb import HeartbeatCM
from CTStests import BSC_AddResource
from socket import gethostbyname_ex
tests = None
cm = None
old_handler = None
def sig_handler(signum, frame) :
if cm != None:
cm.log("Interrupted by signal %d"%signum)
if signum == 10 and tests != None :
tests.summarize()
if signum == 15 :
sys.exit(1)
class ResetMechanism:
def reset(self, node):
raise ValueError("Abstract class member (reset)")
class Stonith(ResetMechanism):
def __init__(self, sttype="external/ssh", pName=None, pValue=None
, path="@sbindir@/stonith"):
self.pathname=path
self.configName=pName
self.configValue=pValue
self.stonithtype=sttype
def reset(self, node):
if self.configValue == None :
config=node
else:
config=self.configValue
cmdstring = "%s -t '%s' -p '%s' '%s' 2>/dev/null" % (self.pathname
, self.stonithtype, config, node)
return (os.system(cmdstring) == 0)
class Stonithd(ResetMechanism):
def __init__(self, nodes, sttype = 'external/ssh'):
self.sttype = sttype
self.nodes = nodes
self.query_cmd_pat = '@libdir@/heartbeat/stonithdtest/apitest 0 %s 20000 0'
self.reset_cmd_pat = '@libdir@/heartbeat/stonithdtest/apitest 1 %s 20000 0'
self.poweron_cmd_pat = '@libdir@/heartbeat/stonithdtest/apitest 2 %s 20000 0'
self.poweroff_cmd_pat= '@libdir@/heartbeat/stonithdtest/apitest 3 %s 20000 0'
self.lrmd_add_pat = '@libdir@/heartbeat/lrmadmin -A %s stonith ' + sttype + ' NULL hostlist=%s'
self.lrmd_start_pat = '@libdir@/heartbeat/lrmadmin -E %s start 0 0 EVERYTIME'
self.lrmd_stop_pat = '@libdir@/heartbeat/lrmadmin -E %s stop 0 0 EVERYTIME'
self.lrmd_del_pat = '@libdir@/heartbeat/lrmadmin -D %s'
self.rsc_id = 'my_stonithd_id'
self.command = "@SSH@ -l root -n -x"
self.command_noblock = "@SSH@ -f -l root -n -x"
self.stonithd_started_nodes = []
self.fail_reason = ''
def _remote_exec(self, node, cmnd):
return (os.system("%s %s %s > /dev/null" % (self.command, node, cmnd)) == 0)
def _remote_readlines(self, node, cmnd):
f = os.popen("%s %s %s" % (self.command, node, cmnd))
return f.readlines()
def _stonithd_started(self, node):
return node in self.stonithd_started_nodes
def _start_stonithd(self, node, hosts):
hostlist = string.join(hosts, ',')
lrmd_add_cmd = self.lrmd_add_pat % (self.rsc_id, hostlist)
ret = self._remote_exec(node, lrmd_add_cmd)
if not ret:return ret
lrmd_start_cmd = self.lrmd_start_pat % self.rsc_id
ret = self._remote_exec(node, lrmd_start_cmd)
if not ret:return ret
self.stonithd_started_nodes.append(node)
return 1
def _stop_stonithd(self, node):
lrmd_stop_cmd = self.lrmd_stop_pat % self.rsc_id
ret = self._remote_exec(node, lrmd_stop_cmd)
if not ret:return ret
lrmd_del_cmd = self.lrmd_del_pat % self.rsc_id
ret = self._remote_exec(node, lrmd_del_cmd)
if not ret:return ret
self.stonithd_started_nodes.remove(node)
return 1
def _do_stonith(self, init_node, target_node, action):
stonithd_started = self._stonithd_started(init_node)
if not stonithd_started:
ret = self._start_stonithd(init_node, [target_node])
if not ret:
self.fail_reason = "failed to start stonithd on node %s" % init_node
return ret
command = ""
if action == "RESET":
command = self.reset_cmd_pat % target_node
elif action == "POWEROFF":
command = self.poweroff_cmd_pat % target_node
elif action == "POWERON":
command = self.poweron_cmd_pat % target_node
else:
self.fail_reason = "unknown opration type %s" % action
return 0
lines = self._remote_readlines(init_node, command)
result = "".join(lines)
if not stonithd_started:
self._stop_stonithd(init_node)
index = result.find("result=0")
if index == -1:
self.fail_reason = "unexpected stonithd status: %s" % result
return 0
return 1
# Should we randomly choose a node as init_node here if init_node not specified?
def reset(self, init_node, target_node):
return self._do_stonith(init_node, target_node, "RESET")
def poweron(self, init_node, target_node):
return self._do_stonith(init_node, target_node, "POWERON")
def poweroff(self, init_node, target_node):
return self._do_stonith(init_node, target_node, "POWEROFF")
class Logger:
TimeFormat = "%b %d %H:%M:%S\t"
def __call__(self, lines):
raise ValueError("Abstract class member (__call__)")
def write(self, line):
return self(line.rstrip())
def writelines(self, lines):
for s in lines:
self.write(s)
return 1
def flush(self):
return 1
def isatty(self):
return None
class SysLog(Logger):
# http://docs.python.org/lib/module-syslog.html
defaultsource="CTS"
defaultfacility= syslog.LOG_LOCAL7
map = {
- "kernel": syslog.LOG_KERN,
- "user": syslog.LOG_USER,
- "mail": syslog.LOG_MAIL,
- "daemon": syslog.LOG_MAIL,
- "auth": syslog.LOG_AUTH,
- "lpr": syslog.LOG_LPR,
- "news": syslog.LOG_NEWS,
- "uucp": syslog.LOG_UUCP,
- "cron": syslog.LOG_CRON,
- "local0": syslog.LOG_LOCAL0,
- "local1": syslog.LOG_LOCAL1,
- "local2": syslog.LOG_LOCAL2,
- "local3": syslog.LOG_LOCAL3,
- "local4": syslog.LOG_LOCAL4,
- "local5": syslog.LOG_LOCAL5,
- "local6": syslog.LOG_LOCAL6,
- "local7": syslog.LOG_LOCAL7,
+ "kernel": syslog.LOG_KERN,
+ "user": syslog.LOG_USER,
+ "mail": syslog.LOG_MAIL,
+ "daemon": syslog.LOG_MAIL,
+ "auth": syslog.LOG_AUTH,
+ "lpr": syslog.LOG_LPR,
+ "news": syslog.LOG_NEWS,
+ "uucp": syslog.LOG_UUCP,
+ "cron": syslog.LOG_CRON,
+ "local0": syslog.LOG_LOCAL0,
+ "local1": syslog.LOG_LOCAL1,
+ "local2": syslog.LOG_LOCAL2,
+ "local3": syslog.LOG_LOCAL3,
+ "local4": syslog.LOG_LOCAL4,
+ "local5": syslog.LOG_LOCAL5,
+ "local6": syslog.LOG_LOCAL6,
+ "local7": syslog.LOG_LOCAL7,
}
def __init__(self, labinfo):
if labinfo.has_key("syslogsource"):
self.source=labinfo["syslogsource"]
else:
self.source=SysLog.defaultsource
if labinfo.has_key("SyslogFacility"):
self.facility=labinfo["SyslogFacility"]
if SysLog.map.has_key(self.facility):
self.facility=SysLog.map[self.facility]
else:
self.facility=SysLog.defaultfacility
syslog.openlog(self.source, 0, self.facility)
def setfacility(self, facility):
self.facility = facility
if SysLog.map.has_key(self.facility):
self.facility=SysLog.map[self.facility]
syslog.closelog()
syslog.openlog(self.source, 0, self.facility)
def __call__(self, lines):
if isinstance(lines, types.StringType):
syslog.syslog(lines)
else:
for line in lines:
syslog.syslog(line)
def name(self):
return "Syslog"
class StdErrLog(Logger):
def __init__(self, labinfo):
pass
def __call__(self, lines):
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
if isinstance(lines, types.StringType):
sys.__stderr__.writelines([t, lines, "\n"])
else:
for line in lines:
sys.__stderr__.writelines([t, line, "\n"])
sys.__stderr__.flush()
def name(self):
return "StdErrLog"
class FileLog(Logger):
def __init__(self, labinfo, filename=None):
if filename == None:
filename=labinfo["LogFileName"]
self.logfile=filename
import os
self.hostname = os.uname()[1]+" "
self.source = "CTS: "
def __call__(self, lines):
fd = open(self.logfile, "a")
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
if isinstance(lines, types.StringType):
fd.writelines([t, self.hostname, self.source, lines, "\n"])
else:
for line in lines:
fd.writelines([t, self.hostname, self.source, line, "\n"])
fd.close()
def name(self):
return "FileLog"
class CtsLab(UserDict):
'''This class defines the Lab Environment for the Cluster Test System.
It defines those things which are expected to change from test
environment to test environment for the same cluster manager.
It is where you define the set of nodes that are in your test lab
what kind of reset mechanism you use, etc.
This class is derived from a UserDict because we hold many
different parameters of different kinds, and this provides
provide a uniform and extensible interface useful for any kind of
communication between the user/administrator/tester and CTS.
At this point in time, it is the intent of this class to model static
configuration and/or environmental data about the environment which
doesn't change as the tests proceed.
Well-known names (keys) are an important concept in this class.
The HasMinimalKeys member function knows the minimal set of
well-known names for the class.
The following names are standard (well-known) at this time:
nodes An array of the nodes in the cluster
reset A ResetMechanism object
logger An array of objects that log strings...
CMclass The type of ClusterManager we are running
(This is a class object, not a class instance)
RandSeed Random seed. It is a triple of bytes. (optional)
HAdir Base directory for HA installation
The CTS code ignores names it doesn't know about/need.
The individual tests have access to this information, and it is
perfectly acceptable to provide hints, tweaks, fine-tuning
directions or other information to the tests through this mechanism.
'''
def __init__(self, nodes):
self.data = {}
self["nodes"] = nodes
self.MinimalKeys=["nodes", "reset", "logger", "CMclass", "HAdir"]
def HasMinimalKeys(self):
'Return TRUE if our object has the minimal set of keys/values in it'
result = 1
for key in self.MinimalKeys:
if not self.has_key(key):
result = None
return result
def SupplyDefaults(self):
if not self.has_key("logger"):
self["logger"] = (SysLog(self), StdErrLog(self))
if not self.has_key("reset"):
self["reset"] = Stonith()
if not self.has_key("CMclass"):
self["CMclass"] = HeartbeatCM
if not self.has_key("LogFileName"):
self["LogFileName"] = "/var/log/ha-log"
if not self.has_key("logfacility"):
LogFacility = "local7"
#
# Now set up our random number generator...
#
self.RandomGen = random.Random()
# Get a random seed for the random number generator.
if self.has_key("RandSeed"):
randseed = self["RandSeed"]
self.log("Random seed is: " + str(randseed))
self.RandomGen.seed(str(randseed))
else:
randseed = int(time.time())
self.log("Random seed is: " + str(randseed))
self.RandomGen.seed(str(randseed))
def log(self, args):
"Log using each of the supplied logging methods"
for logfcn in self._logfunctions:
logfcn(string.strip(args))
def debug(self, args):
"Log using each of the supplied logging methods"
for logfcn in self._logfunctions:
if logfcn.name() != "StdErrLog":
logfcn("debug: %s" % string.strip(args))
def __setitem__(self, key, value):
'''Since this function gets called whenever we modify the
dictionary (object), we can (and do) validate those keys that we
know how to validate. For the most part, we know how to validate
the "MinimalKeys" elements.
'''
#
# List of nodes in the system
#
if key == "nodes":
self.Nodes = {}
for node in value:
# I don't think I need the IP address, etc. but this validates
# the node name against /etc/hosts and/or DNS, so it's a
# GoodThing(tm).
try:
self.Nodes[node] = gethostbyname_ex(node)
except:
print node+" not found in DNS... aborting"
raise
#
# Reset Mechanism
#
elif key == "reset":
if not issubclass(value.__class__, ResetMechanism):
raise ValueError("'reset' Value must be a subclass"
" of ResetMechanism")
#
# List of Logging Mechanism(s)
#
elif key == "logger":
if len(value) < 1:
raise ValueError("Must have at least one logging mechanism")
for logger in value:
if not callable(logger):
raise ValueError("'logger' elements must be callable")
self._logfunctions = value
#
# Cluster Manager Class
#
elif key == "CMclass":
if not issubclass(value, ClusterManager):
raise ValueError("'CMclass' must be a subclass of"
" ClusterManager")
#
# Initial Random seed...
#
#elif key == "RandSeed":
# if len(value) != 3:
# raise ValueError("'Randseed' must be a 3-element list/tuple")
# for elem in value:
# if not isinstance(elem, types.IntType):
# raise ValueError("'Randseed' list must all be ints")
self.data[key] = value
def IsValidNode(self, node):
'Return TRUE if the given node is valid'
return self.Nodes.has_key(node)
def __CheckNode(self, node):
"Raise a ValueError if the given node isn't valid"
if not self.IsValidNode(node):
raise ValueError("Invalid node [%s] in CheckNode" % node)
def RandomNode(self):
'''Choose a random node from the cluster'''
return self.RandomGen.choice(self["nodes"])
def ResetNode(self, node):
"Reset a node, (normally) using a hardware mechanism"
self.__CheckNode(node)
return self["reset"].reset(node)
def ResetNode2(self, init_node, target_node, reasons):
self.__CheckNode(target_node)
stonithd = Stonithd(self["nodes"])
ret = stonithd.reset(init_node, target_node)
if not ret:
reasons.append(stonithd.fail_reason)
return ret
def usage(arg):
print "Illegal argument " + arg
print "usage: " + sys.argv[0] \
+ " --directory config-directory" \
+ " -D config-directory" \
+ " --logfile system-logfile-name" \
+ " --trunc (truncate logfile before starting)" \
+ " -L system-logfile-name" \
+ " --limit-nodes maxnumnodes" \
+ " --xmit-loss lost-rate(0.0-1.0)" \
+ " --recv-loss lost-rate(0.0-1.0)" \
+ " --suppressmonitoring" \
+ " --syslog-facility syslog-facility" \
+ " --facility syslog-facility" \
+ " --choose testcase-name" \
+ " --test-ip-base ip" \
+ " --oprofile \"whitespace separated list of nodes to oprofile\"" \
+ " (-2 |"\
+ " -v2 |"\
+ " --crm |"\
+ " --classic)"\
+ " (--populate-resources | -r)" \
+ " --resource-can-stop" \
+ " --stonith (1 | 0 | yes | no)" \
+ " --stonith-type type" \
+ " --stonith-args name=value" \
+ " --standby (1 | 0 | yes | no)" \
+ " --fencing (1 | 0 | yes | no)" \
+ " --suppress_cib_writes (1 | 0 | yes | no)" \
+ " -lstests" \
+ " --seed" \
+ " [number-of-iterations]"
sys.exit(1)
#
# A little test code...
#
if __name__ == '__main__':
from CTSaudits import AuditList
from CTStests import TestList,RandomTests
from CTS import Scenario, InitClusterManager, PingFest, PacketLoss, BasicSanityCheck
import CM_hb
HAdir = "@sysconfdir@/ha.d"
LogFile = "/var/log/ha-log-local7"
DoStonith = 1
DoStandby = 1
DoFencing = 1
NumIter = 500
SuppressMonitoring = None
Version = 1
CIBfilename = None
CIBResource = 0
ClobberCIB = 0
LimitNodes = 0
TestCase = None
LogFacility = None
TruncateLog = 0
ResCanStop = 0
XmitLoss = "0.0"
RecvLoss = "0.0"
IPBase = "127.0.0.10"
SuppressCib = 1
DoBSC = 0
ListTests = 0
HaveSeed = 0
oprofile = None
StonithType = "ssh"
StonithParams = None
StonithParams = "hostlist=dynamic".split('=')
#
# The values of the rest of the parameters are now properly derived from
# the configuration files.
#
# Stonith is configurable because it's slow, I have a few machines which
# don't reboot very reliably, and it can mild damage to your machine if
# you're using a real power switch.
#
# Standby is configurable because the test is very heartbeat specific
# and I haven't written the code to set it properly yet. Patches are
# being accepted...
# Set the signal handler
signal.signal(15, sig_handler)
signal.signal(10, sig_handler)
# Process arguments...
skipthis=None
args=sys.argv[1:]
for i in range(0, len(args)):
if skipthis:
skipthis=None
continue
elif args[i] == "-D" or args[i] == "--directory":
skipthis=1
HAdir = args[i+1]
elif args[i] == "-l" or args[i] == "--limit-nodes":
skipthis=1
LimitNodes = int(args[i+1])
elif args[i] == "-r" or args[i] == "--populate-resources":
CIBResource = 1
elif args[i] == "-L" or args[i] == "--logfile":
skipthis=1
LogFile = args[i+1]
elif args[i] == "--test-ip-base":
skipthis=1
IPBase = args[i+1]
elif args[i] == "--oprofile":
skipthis=1
oprofile = args[i+1].split(' ')
elif args[i] == "--trunc":
TruncateLog=1
elif args[i] == "-v2":
Version=2
elif args[i] == "-lstests":
ListTests=1
elif args[i] == "--stonith":
skipthis=1
if args[i+1] == "1" or args[i+1] == "yes":
DoStonith=1
elif args[i+1] == "0" or args[i+1] == "no":
DoStonith=0
else:
usage(args[i+1])
elif args[i] == "--stonith-type":
StonithType = args[i+1]
skipthis=1
elif args[i] == "--stonith-args":
StonithParams = args[i+1].split('=')
skipthis=1
elif args[i] == "--suppress-cib-writes":
skipthis=1
if args[i+1] == "1" or args[i+1] == "yes":
SuppressCib=1
elif args[i+1] == "0" or args[i+1] == "no":
SuppressCib=0
else:
usage(args[i+1])
elif args[i] == "--bsc":
- DoBSC=1
+ DoBSC=1
elif args[i] == "--standby":
skipthis=1
if args[i+1] == "1" or args[i+1] == "yes":
DoStandby=1
elif args[i+1] == "0" or args[i+1] == "no":
DoStandby=0
else:
usage(args[i+1])
elif args[i] == "--fencing":
skipthis=1
if args[i+1] == "1" or args[i+1] == "yes":
DoFencing=1
elif args[i+1] == "0" or args[i+1] == "no":
DoFencing=0
else:
usage(args[i+1])
elif args[i] == "--suppressmonitoring":
SuppressMonitoring = 1
elif args[i] == "--resource-can-stop":
ResCanStop = 1
elif args[i] == "-2" or args[i] == "--crm":
Version = 2
elif args[i] == "-1" or args[i] == "--classic":
Version = 1
elif args[i] == "--clobber-cib" or args[i] == "-c":
ClobberCIB = 1
elif args[i] == "--cib-filename":
skipthis=1
CIBfilename = args[i+1]
elif args[i] == "--xmit-loss":
try:
float(args[i+1])
except ValueError:
print ("--xmit-loss parameter should be float")
usage(args[i+1])
skipthis=1
XmitLoss = args[i+1]
elif args[i] == "--recv-loss":
try:
float(args[i+1])
except ValueError:
print ("--recv-loss parameter should be float")
usage(args[i+1])
skipthis=1
RecvLoss = args[i+1]
elif args[i] == "--choose":
skipthis=1
TestCase = args[i+1]
elif args[i] == "--syslog-facility" or args[i] == "--facility":
skipthis=1
LogFacility = args[i+1]
elif args[i] == "--seed":
skipthis=1
Seed=args[i+1]
HaveSeed = 1
else:
NumIter=int(args[i])
if not oprofile:
- oprofile = []
+ oprofile = []
#
# This reading of HBconfig here is ugly, and I suppose ought to
# be done by the Cluster manager. This would probably mean moving the
# list of cluster nodes into the ClusterManager class. A good thought
# for our Copious Spare Time in the future...
#
config = CM_hb.HBConfig(HAdir)
node_list = config.Parameters["node"]
if DoBSC:
NumIter = 2
Version = 2
while len(node_list) > 1:
node_list.pop(len(node_list)-1)
if LogFacility == None:
if config.Parameters.has_key("logfacility"):
LogFacility = config.Parameters["logfacility"][0]
else:
LogFacility = "local7"
if LimitNodes > 0:
if len(node_list) > LimitNodes:
print("Limiting the number of nodes configured=%d (max=%d)"
%(len(node_list), LimitNodes))
while len(node_list) > LimitNodes:
node_list.pop(len(node_list)-1)
if StonithParams[0] == "hostlist":
StonithParams[1] = string.join(node_list, " ")
# alt_list = []
# for node in node_list:
# alt_list.append(string.lower(node))
# node_list = alt_list
Environment = CtsLab(node_list)
Environment["HAdir"] = HAdir
Environment["ClobberCIB"] = ClobberCIB
Environment["CIBfilename"] = CIBfilename
Environment["CIBResource"] = CIBResource
Environment["LogFileName"] = LogFile
Environment["DoStonith"] = DoStonith
Environment["SyslogFacility"] = LogFacility
Environment["DoStandby"] = DoStandby
Environment["DoFencing"] = DoFencing
Environment["ResCanStop"] = ResCanStop
Environment["SuppressMonitoring"] = SuppressMonitoring
Environment["XmitLoss"] = XmitLoss
Environment["RecvLoss"] = RecvLoss
Environment["IPBase"] = IPBase
Environment["SuppressCib"] = SuppressCib
Environment["DoBSC"] = 0
Environment["use_logd"] = 0
Environment["logfacility"] = LogFacility
Environment["oprofile"] = oprofile
if config.Parameters.has_key("use_logd"):
Environment["use_logd"] = 1
if Version == 2:
from CM_LinuxHAv2 import LinuxHAv2
Environment['CMclass']=LinuxHAv2
if HaveSeed:
Environment["RandSeed"] = Seed
Environment["reset"] = Stonith(sttype=StonithType, pName=StonithParams[0], pValue=StonithParams[1])
if DoBSC:
Environment["DoBSC"] = 1
Environment["ClobberCIB"] = 1
Environment["CIBResource"] = 0
Environment["logger"] = (FileLog(Environment), StdErrLog(Environment))
scenario = Scenario([ BasicSanityCheck(Environment) ])
else:
scenario = Scenario(
[ InitClusterManager(Environment), PacketLoss(Environment)])
Environment.SupplyDefaults()
# Your basic start up the world type of test scenario...
#scenario = Scenario(
#[ InitClusterManager(Environment)
#, PingFest(Environment)])
# Create the Cluster Manager object
cm = Environment['CMclass'](Environment)
if TruncateLog:
cm.log("Truncating %s" % LogFile)
lf = open(LogFile, "w");
if lf != None:
lf.truncate(0)
lf.close()
cm.log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TESTS ")
cm.log("HA configuration directory: " + Environment["HAdir"])
cm.log("System log files: " + Environment["LogFileName"])
cm.log("Enable Stonith: %d" % Environment["DoStonith"])
cm.log("Enable Fencing: %d" % Environment["DoFencing"])
cm.log("Enable Standby: %d" % Environment["DoStandby"])
cm.log("Enable Resources: %d" % Environment["CIBResource"])
if Environment.has_key("SuppressMonitoring") \
and Environment["SuppressMonitoring"]:
cm.log("Resource Monitoring is disabled")
cm.log("Cluster nodes: ")
for node in config.Parameters["node"]:
- (rc, lines) = cm.rsh.remote_py(node, "os", "system",
- "@sbindir@/crm_uuid")
+ (rc, lines) = cm.rsh.remote_py(node, "os", "system",
+ "@sbindir@/crm_uuid")
if not lines:
cm.log(" * %s: __undefined_uuid__" % node)
else:
- out=lines[0]
+ out=lines[0]
out = out[:-1]
cm.log(" * %s: %s" % (node, out))
Audits = AuditList(cm)
Tests = []
if Environment["DoBSC"]:
test = BSC_AddResource(cm)
Tests.append(test)
elif TestCase != None:
for test in TestList(cm):
if test.name == TestCase:
Tests.append(test)
if Tests == []:
usage("--choose: No applicable/valid tests chosen")
else:
Tests = TestList(cm)
if ListTests == 1 :
cm.log("Total %d tests"%len(Tests))
for test in Tests :
cm.log(str(test.name));
sys.exit(0)
tests = RandomTests(scenario, cm, Tests, Audits)
Environment.RandomTests = tests
try :
overall, detailed = tests.run(NumIter)
except :
cm.Env.log("Exception by %s" % sys.exc_info()[0])
for logmethod in Environment["logger"]:
traceback.print_exc(50, logmethod)
tests.summarize()
if tests.Stats["failure"] > 0:
sys.exit(tests.Stats["failure"])
elif tests.Stats["success"] != NumIter:
cm.Env.log("No failure count but success != requested iterations")
sys.exit(1)
diff --git a/cts/CTSproxy.py.in b/cts/CTSproxy.py.in
index 1d1572db7a..9581040048 100644
--- a/cts/CTSproxy.py.in
+++ b/cts/CTSproxy.py.in
@@ -1,63 +1,63 @@
#!@PYTHON@
''' proxy on remote node for remote python call
'''
__copyright__='''
Author: Huang Zhen <zhenhltc@cn.ibm.com>
Copyright (C) 2005 International Business Machines
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
import sys, pickle, base64, binascii
# check the number of arguments
if len(sys.argv) != 4 :
- print "usage %s module function args"%sys.argv[0]
- sys.exit(1)
+ print "usage %s module function args"%sys.argv[0]
+ sys.exit(1)
# import the module
try :
- module = __import__(sys.argv[1], globals(), locals(), [])
+ module = __import__(sys.argv[1], globals(), locals(), [])
except ImportError:
- print "can not find module %s"%sys.argv[1]
- sys.exit(1)
+ print "can not find module %s"%sys.argv[1]
+ sys.exit(1)
# find the function
try :
- func = getattr(module,sys.argv[2])
-except AttributeError:
- print "can not find function %s"%sys.argv[2]
- sys.exit(1)
+ func = getattr(module,sys.argv[2])
+except AttributeError:
+ print "can not find function %s"%sys.argv[2]
+ sys.exit(1)
# unpack the arguments of functions
try :
- args = pickle.loads(binascii.a2b_base64(sys.argv[3]))
-except IndexError:
- print "can not unpickle args %s"%sys.argv[3]
- sys.exit(1)
-
+ args = pickle.loads(binascii.a2b_base64(sys.argv[3]))
+except IndexError:
+ print "can not unpickle args %s"%sys.argv[3]
+ sys.exit(1)
+
# call the function and return packed result
try :
- result = apply(func,args)
- print binascii.b2a_base64(pickle.dumps(result))
- sys.exit(0)
+ result = apply(func,args)
+ print binascii.b2a_base64(pickle.dumps(result))
+ sys.exit(0)
except TypeError:
- print "parameter error"
- sys.exit(1)
+ print "parameter error"
+ sys.exit(1)
diff --git a/cts/CTStests.py.in b/cts/CTStests.py.in
index 3174514213..22028226b9 100644
--- a/cts/CTStests.py.in
+++ b/cts/CTStests.py.in
@@ -1,2450 +1,2450 @@
#!@PYTHON@
'''CTS: Cluster Testing System: Tests module
There are a few things we want to do here:
'''
__copyright__='''
Copyright (C) 2000, 2001 Alan Robertson <alanr@unix.sh>
Licensed under the GNU GPL.
Add RecourceRecover testcase Zhao Kai <zhaokai@cn.ibm.com>
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
-# SPECIAL NOTE:
+# SPECIAL NOTE:
#
-# Tests may NOT implement any cluster-manager-specific code in them.
-# EXTEND the ClusterManager object to provide the base capabilities
-# the test needs if you need to do something that the current CM classes
-# do not. Otherwise you screw up the whole point of the object structure
-# in CTS.
+# Tests may NOT implement any cluster-manager-specific code in them.
+# EXTEND the ClusterManager object to provide the base capabilities
+# the test needs if you need to do something that the current CM classes
+# do not. Otherwise you screw up the whole point of the object structure
+# in CTS.
#
-# Thank you.
+# Thank you.
#
import CTS
from CM_hb import HBConfig
import CTSaudits
import time, os, re, types, string, tempfile, sys
from CTSaudits import *
from stat import *
# List of all class objects for tests which we ought to
# consider running.
class RandomTests:
'''
A collection of tests which are run at random.
'''
def __init__(self, scenario, cm, tests, Audits):
self.CM = cm
self.Env = cm.Env
self.Scenario = scenario
self.Tests = []
self.Audits = []
self.ns=CTS.NodeStatus(self.Env)
for test in tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
if test.is_applicable():
self.Tests.append(test)
if not scenario.IsApplicable():
raise ValueError("Scenario not applicable in"
" given Environment")
self.Stats = {"success":0, "failure":0, "BadNews":0}
self.IndividualStats= {}
for audit in Audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be a subclass of ClusterAudit")
if audit.is_applicable():
self.Audits.append(audit)
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not self.Stats.has_key(name):
self.Stats[name]=0
self.Stats[name] = self.Stats[name]+1
def audit(self, BadNews, test):
errcount=0
BadNewsDebug=0
#BadNews.debug=1
while errcount < 1000:
if BadNewsDebug: print "Looking for BadNews"
match=BadNews.look(0)
if match:
if BadNewsDebug: print "BadNews found: "+match
add_err = 1
ignorelist = []
if test:
ignorelist=test.errorstoignore()
ignorelist.append(" CTS: ")
ignorelist.append("BadNews:")
for ignore in ignorelist:
if re.search(ignore, match):
if BadNewsDebug: print "Ignoring:"+match+" (pattern: "+ignore+")"
add_err = 0
if add_err == 1:
ignorelist=self.CM.errorstoignore()
for ignore in ignorelist:
if re.search(ignore, match):
if BadNewsDebug: print "Ignoring:"+match+" (pattern: "+ignore+")"
add_err = 0
if add_err == 1:
self.CM.log("BadNews: " + match)
self.incr("BadNews")
errcount=errcount+1
else:
break
else:
self.CM.log("Big problems. Shutting down.")
self.CM.stopall()
self.summarize()
raise ValueError("Looks like we hit the jackpot! :-)")
for audit in self.Audits:
if not audit():
self.CM.log("Audit " + audit.name() + " FAILED.")
self.incr("auditfail")
if test:
test.incr("auditfail")
def summarize(self):
self.CM.log("****************")
self.CM.log("Overall Results:" + repr(self.Stats))
self.CM.log("****************")
self.CM.log("Detailed Results")
for test in self.Tests:
self.CM.log("Test %s: \t%s" %(test.name, repr(test.Stats)))
self.CM.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def run(self, max=1):
(
'''
Set up the given scenario, then run the selected tests at
random for the selected number of iterations.
''')
BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"]
, timeout=0)
BadNews.setwatch()
self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"])
- for node in self.CM.Env["nodes"]:
- if node in self.CM.Env["oprofile"]:
+ for node in self.CM.Env["nodes"]:
+ if node in self.CM.Env["oprofile"]:
self.CM.log("Enabling oprofile on %s" % node)
self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init")
self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start")
if not self.Scenario.SetUp(self.CM):
return None
for node in self.CM.Env["nodes"]:
- if node in self.CM.Env["oprofile"]:
+ if node in self.CM.Env["oprofile"]:
self.CM.rsh.remote_py(
node, "os", "system", "opcontrol --save=cts.setup")
testcount=1
time.sleep(30)
# This makes sure everything is stabilized before starting...
self.audit(BadNews, None)
while testcount <= max:
test = self.Env.RandomGen.choice(self.Tests)
# Some tests want a node as an argument.
nodechoice = self.Env.RandomNode()
#logsize = os.stat(self.CM["LogFileName"])[ST_SIZE]
#self.CM.log("Running test %s (%s) \t[%d : %d]"
# % (test.name, nodechoice, testcount, logsize))
self.CM.log("Running test %s (%s) \t[%d]"
% (test.name, nodechoice, testcount))
testcount = testcount + 1
starttime=time.time()
test.starttime=starttime
ret=test(nodechoice)
- for node in self.CM.Env["nodes"]:
- if node in self.CM.Env["oprofile"]:
+ for node in self.CM.Env["nodes"]:
+ if node in self.CM.Env["oprofile"]:
self.CM.rsh.remote_py(
node, "os", "system",
"opcontrol --save=cts.%d" % (testcount-1))
if ret:
self.incr("success")
else:
self.incr("failure")
self.CM.log("Test %s (%s) \t[FAILED]" %(test.name,nodechoice))
# Better get the current info from the cluster...
self.CM.statall()
# Make sure logging is working and we have enough disk space...
if not self.CM.Env["DoBSC"]:
if not self.CM.TestLogging():
sys.exit(1)
if not self.CM.CheckDf():
sys.exit(1)
stoptime=time.time()
elapsed_time = stoptime - starttime
test_time = stoptime - test.starttime
if not test.has_key("min_time"):
test["elapsed_time"] = elapsed_time
test["min_time"] = test_time
test["max_time"] = test_time
else:
test["elapsed_time"] = test["elapsed_time"] + elapsed_time
if test_time < test["min_time"]:
test["min_time"] = test_time
if test_time > test["max_time"]:
test["max_time"] = test_time
self.audit(BadNews, test)
self.Scenario.TearDown(self.CM)
- for node in self.CM.Env["nodes"]:
- if node in self.CM.Env["oprofile"]:
+ for node in self.CM.Env["nodes"]:
+ if node in self.CM.Env["oprofile"]:
self.CM.log("Disabling oprofile on %s" % node)
self.CM.rsh.remote_py(node, "os", "system", "opcontrol --shutdown")
self.audit(BadNews, None)
for test in self.Tests:
self.IndividualStats[test.name] = test.Stats
return self.Stats, self.IndividualStats
AllTestClasses = [ ]
class CTSTest:
'''
A Cluster test.
We implement the basic set of properties and behaviors for a generic
cluster test.
Cluster tests track their own statistics.
We keep each of the kinds of counts we track as separate {name,value}
pairs.
'''
def __init__(self, cm):
#self.name="the unnamed test"
self.Stats = {"calls":0
, "success":0
, "failure":0
, "skipped":0
, "auditfail":0}
# if not issubclass(cm.__class__, ClusterManager):
# raise ValueError("Must be a ClusterManager object")
self.CM = cm
self.timeout=120
self.starttime=0
def has_key(self, key):
return self.Stats.has_key(key)
def __setitem__(self, key, value):
self.Stats[key] = value
def __getitem__(self, key):
return self.Stats[key]
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not self.Stats.has_key(name):
self.Stats[name]=0
self.Stats[name] = self.Stats[name]+1
def failure(self, reason="none"):
'''Increment the failure count'''
self.incr("failure")
self.CM.log("Test " + self.name + " failed [reason:" + reason + "]")
return None
def success(self):
'''Increment the success count'''
self.incr("success")
return 1
def skipped(self):
'''Increment the skipped count'''
self.incr("skipped")
return 1
def __call__(self, node):
'''Perform the given test'''
raise ValueError("Abstract Class member (__call__)")
self.incr("calls")
return self.failure()
def is_applicable(self):
'''Return TRUE if we are applicable in the current test configuration'''
raise ValueError("Abstract Class member (is_applicable)")
return 1
def canrunnow(self):
'''Return TRUE if we can meaningfully run right now'''
return 1
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return []
###################################################################
class StopTest(CTSTest):
###################################################################
'''Stop (deactivate) the cluster manager on a node'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name="Stop"
self.uspat = self.CM["Pat:We_stopped"]
self.thempat = self.CM["Pat:They_stopped"]
def __call__(self, node):
'''Perform the 'stop' test. '''
self.incr("calls")
if self.CM.ShouldBeStatus[node] != self.CM["up"]:
return self.skipped()
patterns = []
# Technically we should always be able to notice ourselves stopping
patterns.append(self.CM["Pat:We_stopped"] % node)
if self.CM.Env["use_logd"]:
patterns.append(self.CM["Pat:Logd_stopped"] % node)
# Any active node needs to notice this one left
# NOTE: This wont work if we have multiple partitions
for other in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node:
patterns.append(self.CM["Pat:They_stopped"] %(other, node))
#self.debug("Checking %s will notice %s left"%(other, node))
watch = CTS.LogWatcher(
self.CM["LogFileName"], patterns, self.CM["DeadTime"])
watch.setwatch()
if node == self.CM.OurNode:
self.incr("us")
else:
if self.CM.upcount() <= 1:
self.incr("all")
else:
self.incr("them")
self.CM.StopaCM(node)
watch_result = watch.lookforall()
UnmatchedList = "||"
if watch.unmatched:
for regex in watch.unmatched:
self.CM.log ("Warn: Shutdown pattern not found: %s" % (regex))
UnmatchedList += regex + "||";
self.CM.cluster_stable(self.CM["DeadTime"])
# because syslog looses so many messages we can only really fail
# the stop if _none_ of the CCM peers notice the node leave
if not watch.unmatched or self.CM.upcount() == 0:
return self.success()
elif len(watch.unmatched) >= self.CM.upcount():
return self.failure("no match against (%s)" % UnmatchedList)
return self.success()
#
# We don't register StopTest because it's better when called by
# another test...
#
###################################################################
class StartTest(CTSTest):
###################################################################
'''Start (activate) the cluster manager on a node'''
def __init__(self, cm, debug=None):
CTSTest.__init__(self,cm)
self.name="start"
self.debug = debug
self.uspat = self.CM["Pat:We_started"]
self.thempat = self.CM["Pat:They_started"]
def __call__(self, node):
'''Perform the 'start' test. '''
self.incr("calls")
if self.CM.upcount() == 0:
self.incr("us")
else:
self.incr("them")
if self.CM.ShouldBeStatus[node] != self.CM["down"]:
return self.skipped()
elif self.CM.StartaCM(node):
return self.success()
else:
return self.failure("Startup %s on node %s failed"
%(self.CM["Name"], node))
def is_applicable(self):
'''StartTest is always applicable'''
return 1
#
# We don't register StartTest because it's better when called by
# another test...
#
###################################################################
class FlipTest(CTSTest):
###################################################################
'''If it's running, stop it. If it's stopped start it.
Overthrow the status quo...
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="Flip"
self.start = StartTest(cm)
self.stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'Flip' test. '''
self.incr("calls")
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
self.incr("stopped")
ret = self.stop(node)
type="up->down"
# Give the cluster time to recognize it's gone...
time.sleep(self.CM["StableTime"])
elif self.CM.ShouldBeStatus[node] == self.CM["down"]:
self.incr("started")
ret = self.start(node)
type="down->up"
else:
return self.skipped()
self.incr(type)
if ret:
return self.success()
else:
return self.failure("%s failure" % type)
def is_applicable(self):
'''FlipTest is always applicable'''
return 1
# Register FlipTest as a good test to run
AllTestClasses.append(FlipTest)
###################################################################
class RestartTest(CTSTest):
###################################################################
'''Stop and restart a node'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="Restart"
self.start = StartTest(cm)
self.stop = StopTest(cm)
def __call__(self, node):
'''Perform the 'restart' test. '''
self.incr("calls")
self.incr("node:" + node)
ret1 = 1
if self.CM.StataCM(node):
self.incr("WasStopped")
if not self.start(node):
return self.failure("start (setup) failure: "+node)
self.starttime=time.time()
if not self.stop(node):
return self.failure("stop failure: "+node)
if not self.start(node):
return self.failure("start failure: "+node)
return self.success()
def is_applicable(self):
'''RestartTest is always applicable'''
return 1
# Register RestartTest as a good test to run
AllTestClasses.append(RestartTest)
###################################################################
class StonithTest(CTSTest):
###################################################################
'''Reboot a node by whacking it with stonith.'''
def __init__(self, cm, timeout=900):
CTSTest.__init__(self,cm)
self.name="Stonith"
self.theystopped = self.CM["Pat:They_dead"]
self.allstopped = self.CM["Pat:All_stopped"]
self.usstart = self.CM["Pat:We_started"]
self.themstart = self.CM["Pat:They_started"]
self.timeout = timeout
self.ssherror = False
def _reset(self, node):
StonithWorked=False
for tries in 1,2,3,4,5:
if self.CM.Env.ResetNode(node):
StonithWorked=True
break
return StonithWorked
def setup(self, target_node):
# nothing to do
return 1
def __call__(self, node):
'''Perform the 'stonith' test. (whack the node)'''
self.incr("calls")
stopwatch = 0
rc = 0
if not self.setup(node):
return self.failure("Setup failed")
# Figure out what log message to look for when/if it goes down
#
# Any active node needs to notice this one left
# NOTE: This wont work if we have multiple partitions
stop_patterns = []
for other in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[other] == self.CM["up"] and other != node:
stop_patterns.append(self.CM["Pat:They_stopped"] %(other, node))
stopwatch = 1
#self.debug("Checking %s will notice %s left"%(other, node))
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
# actually no-one will notice this node die since HA isnt running
stopwatch = 0
# Figure out what log message to look for when it comes up
if self.CM.upcount() == 1 and self.CM.ShouldBeStatus[node] == self.CM["up"]:
uppat = (self.usstart % node)
else:
uppat = (self.themstart % node)
upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat]
, timeout=self.timeout)
if stopwatch == 1:
watch = CTS.LogWatcher(self.CM["LogFileName"], stop_patterns
, timeout=self.CM["DeadTime"]+10)
watch.setwatch()
# Reset (stonith) the node
self.CM.debug("Resetting: "+node)
StonithWorked = self._reset(node)
if not StonithWorked:
return self.failure("Stonith didn't work")
if self.ssherror == True:
self.CM.log("NOTE: Stonith command reported success but node %s did not restart (atd, reboot or ssh error)" % node)
return self.success()
upwatch.setwatch()
# Look() and see if the machine went down
if stopwatch == 0:
# Allow time for the node to die
time.sleep(self.CM["DeadTime"]+10)
elif not watch.lookforall():
if watch.unmatched:
for regex in watch.unmatched:
self.CM.log("Warn: STONITH pattern not found: %s"%regex)
# !!no-one!! saw this node die
if len(watch.unmatched) == len(stop_patterns):
return self.failure("No-one saw %s die" %node)
# else: syslog* lost a message
# Alas I dont think this check is plausable (beekhof)
#
# Check it really stopped...
#self.CM.ShouldBeStatus[node] = self.CM["down"]
#if self.CM.StataCM(node) == 1:
# ret1=0
# Look() and see if the machine came back up
rc=0
if upwatch.look():
self.CM.debug("Startup pattern found: %s" %uppat)
rc=1
else:
self.CM.log("Warn: Startup pattern not found: %s" %uppat)
# Check it really started...
self.CM.ShouldBeStatus[node] = self.CM["up"]
if rc == 0 and self.CM.StataCM(node) == 1:
rc=1
# wait for the cluster to stabilize
self.CM.cluster_stable()
- if node in self.CM.Env["oprofile"]:
- self.CM.log("Enabling oprofile on %s" % node)
- self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init")
- self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start")
+ if node in self.CM.Env["oprofile"]:
+ self.CM.log("Enabling oprofile on %s" % node)
+ self.CM.rsh.remote_py(node, "os", "system", "opcontrol --init")
+ self.CM.rsh.remote_py(node, "os", "system", "opcontrol --start")
# return case processing
if rc == 0:
return self.failure("Node %s did not restart" %node)
else:
return self.success()
def is_applicable(self):
'''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE'''
# for v2, stonithd test is a better test to run.
if self.CM["Name"] == "linux-ha-v2":
return None
if self.CM.Env.has_key("DoStonith"):
return self.CM.Env["DoStonith"]
return 1
# Register StonithTest as a good test to run
AllTestClasses.append(StonithTest)
###################################################################
class StonithdTest(StonithTest):
###################################################################
def __init__(self, cm, timeout=600):
StonithTest.__init__(self, cm, timeout=600)
self.name="Stonithd"
self.startall = SimulStartLite(cm)
self.start = StartTest(cm)
self.stop = StopTest(cm)
self.init_node = None
def _reset(self, target_node):
if len(self.CM.Env["nodes"]) < 2:
return self.skipped()
StonithWorked = False
SshNotWork = 0
for tries in range(1,5):
# For some unknown reason, every now and then the ssh plugin just
# can't kill the target_node - everything works fine with stonithd
# and the plugin, but atd, reboot or ssh (or maybe something else)
# doesn't do its job and target_node remains alive. So look for
# the indicative messages and bubble-up the error via ssherror
watchpats = []
watchpats.append("Initiating ssh-reset")
watchpats.append("CRIT: still able to ping")
watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats
, timeout=self.CM["DeadTime"]+30)
watch.setwatch()
fail_reasons = []
if self.CM.Env.ResetNode2(self.init_node, target_node, fail_reasons):
StonithWorked = True
break
if watch.lookforall():
SshNotWork = SshNotWork + 1
continue
for reason in fail_reasons:
self.CM.log(reason)
if StonithWorked == False and SshNotWork == tries:
StonithWorked = True
self.ssherror = True
return StonithWorked
def setup(self, target_node):
if len(self.CM.Env["nodes"]) < 2:
return 1
self.init_node = self.CM.Env.RandomNode()
while self.init_node == target_node:
self.init_node = self.CM.Env.RandomNode()
if not self.startall(None):
return self.failure("Test setup failed")
return 1
def is_applicable(self):
if not self.CM["Name"] == "linux-ha-v2":
return 0
if self.CM.Env.has_key("DoStonith"):
return self.CM.Env["DoStonith"]
return 1
AllTestClasses.append(StonithdTest)
###################################################################
class IPaddrtest(CTSTest):
###################################################################
'''Find the machine supporting a particular IP address, and knock it down.
[Hint: This code isn't finished yet...]
'''
def __init__(self, cm, IPaddrs):
CTSTest.__init__(self,cm)
self.name="IPaddrtest"
self.IPaddrs = IPaddrs
self.start = StartTest(cm)
self.stop = StopTest(cm)
def __call__(self, IPaddr):
'''
Perform the IPaddr test...
'''
self.incr("calls")
node = self.CM.Env.RandomNode()
self.incr("node:" + node)
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
self.incr("WasStopped")
self.start(node)
ret1 = self.stop(node)
# Give the cluster time to recognize we're gone...
time.sleep(self.CM["StableTime"])
ret2 = self.start(node)
if not ret1:
return self.failure("Could not stop")
if not ret2:
return self.failure("Could not start")
return self.success()
def is_applicable(self):
'''IPaddrtest is always applicable (but shouldn't be)'''
return 1
###################################################################
class StartOnebyOne(CTSTest):
###################################################################
'''Start all the nodes ~ one by one'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="StartOnebyOne"
self.stopall = SimulStopLite(cm)
self.start = StartTest(cm)
self.ns=CTS.NodeStatus(cm.Env)
def __call__(self, dummy):
'''Perform the 'StartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Test setup failed")
failed=[]
self.starttime=time.time()
for node in self.CM.Env["nodes"]:
if not self.start(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to start: " + repr(failed))
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
def is_applicable(self):
'''StartOnebyOne is always applicable'''
return 1
# Register StartOnebyOne as a good test to run
AllTestClasses.append(StartOnebyOne)
###################################################################
class SimulStart(CTSTest):
###################################################################
'''Start all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="SimulStart"
self.stopall = SimulStopLite(cm)
self.startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStart' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
self.CM.clear_all_caches()
if not self.startall(None):
return self.failure("Startall failed")
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
def is_applicable(self):
'''SimulStart is always applicable'''
return 1
# Register SimulStart as a good test to run
AllTestClasses.append(SimulStart)
class SimulStop(CTSTest):
###################################################################
'''Stop all the nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="SimulStop"
self.startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, dummy):
'''Perform the 'SimulStop' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
if not self.stopall(None):
return self.failure("Stopall failed")
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
def is_applicable(self):
'''SimulStop is always applicable'''
return 1
# Register SimulStop as a good test to run
AllTestClasses.append(SimulStop)
class StopOnebyOne(CTSTest):
###################################################################
'''Stop all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="StopOnebyOne"
self.startall = SimulStartLite(cm)
self.stop = StopTest(cm)
def __call__(self, dummy):
'''Perform the 'StopOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
failed=[]
self.starttime=time.time()
for node in self.CM.Env["nodes"]:
if not self.stop(node):
failed.append(node)
if len(failed) > 0:
return self.failure("Some node failed to stop: " + repr(failed))
self.CM.clear_all_caches()
return self.success()
def is_applicable(self):
'''StopOnebyOne is always applicable'''
return 1
# Register StopOnebyOne as a good test to run
AllTestClasses.append(StopOnebyOne)
class RestartOnebyOne(CTSTest):
###################################################################
'''Restart all the nodes in order'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="RestartOnebyOne"
self.startall = SimulStartLite(cm)
def __call__(self, dummy):
'''Perform the 'RestartOnebyOne' test. '''
self.incr("calls")
# We ignore the "node" parameter...
# Start up all the nodes...
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
did_fail=[]
self.starttime=time.time()
self.restart = RestartTest(self.CM)
for node in self.CM.Env["nodes"]:
if not self.restart(node):
did_fail.append(node)
if did_fail:
return self.failure("Could not restart %d nodes: %s"
%(len(did_fail), repr(did_fail)))
return self.success()
def is_applicable(self):
'''RestartOnebyOne is always applicable'''
return 1
# Register StopOnebyOne as a good test to run
AllTestClasses.append(RestartOnebyOne)
class PartialStart(CTSTest):
###################################################################
'''Start a node - but tell it to stop before it finishes starting up'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="PartialStart"
self.startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, node):
'''Perform the 'PartialStart' test. '''
self.incr("calls")
ret = self.stopall(None)
if not ret:
return self.failure("Setup failed")
watchpats = []
watchpats.append("Starting crmd")
watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats,
timeout=self.CM["DeadTime"]+10)
watch.setwatch()
self.CM.StartaCMnoBlock(node)
ret = watch.lookforall()
if not ret:
self.CM.log("Patterns not found: " + repr(watch.unmatched))
return self.failure("Setup of %s failed" % node)
ret = self.stopall(None)
if not ret:
return self.failure("%s did not stop in time" % node)
return self.success()
def is_applicable(self):
'''Partial is always applicable'''
return 1
# Register StopOnebyOne as a good test to run
AllTestClasses.append(PartialStart)
###################################################################
class StandbyTest(CTSTest):
###################################################################
'''Put a node in standby mode'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="standby"
self.successpat = self.CM["Pat:StandbyOK"]
self.nostandbypat = self.CM["Pat:StandbyNONE"]
self.transient = self.CM["Pat:StandbyTRANSIENT"]
def __call__(self, node):
'''Perform the 'standby' test. '''
self.incr("calls")
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
return self.skipped()
if self.CM.upcount() < 2:
self.incr("nostandby")
pat = self.nostandbypat
else:
self.incr("standby")
pat = self.successpat
#
# You could make a good argument that the cluster manager
# ought to give us good clues on when its a bad time to
# switch over to the other side, but heartbeat doesn't...
# It could also queue the request. But, heartbeat
# doesn't do that either :-)
#
retrycount=0
while (retrycount < 10):
watch = CTS.LogWatcher(self.CM["LogFileName"]
, [pat, self.transient]
, timeout=self.CM["DeadTime"]+10)
watch.setwatch()
self.CM.rsh(node, self.CM["Standby"])
match = watch.look()
if match:
if re.search(self.transient, match):
self.incr("retries")
time.sleep(2)
retrycount=retrycount+1
else:
return self.success()
else:
break # No point in retrying...
return self.failure("did not find pattern " + pat)
def is_applicable(self):
'''StandbyTest is applicable when the CM has a Standby command'''
if not self.CM.has_key("Standby"):
return None
else:
#if self.CM.Env.has_key("DoStandby"):
#flag=self.CM.Env["DoStandby"]
#if type(flag) == types.IntType:
#return flag
#if not re.match("[yt]", flag, re.I):
#return None
#
# We need to strip off everything after the first blank
#
cmd=self.CM["Standby"]
cmd = cmd.split()[0]
if not os.access(cmd, os.X_OK):
return None
cf = self.CM.cf
if not cf.Parameters.has_key("auto_failback"):
return None
elif cf.Parameters["auto_failback"][0] == "legacy":
return None
return 1
# Register StandbyTest as a good test to run
AllTestClasses.append(StandbyTest)
#######################################################################
class StandbyTest2(CTSTest):
#######################################################################
'''Standby with CRM of HA release 2'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="standby2"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
# make sure the node is active
# set the node to standby mode
# check resources, none resource should be running on the node
# set the node to active mode
# check resouces, resources should have been migrated back (SHOULD THEY?)
def __call__(self, node):
self.incr("calls")
ret=self.startall(None)
if not ret:
return self.failure("Start all nodes failed")
self.CM.debug("Make sure node %s is active" % node)
if self.CM.StandbyStatus(node) != "off":
if not self.CM.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
self.CM.cluster_stable()
status = self.CM.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.CM.debug("Getting resources running on node %s" % node)
rsc_on_node = []
for rsc in self.CM.Resources():
if rsc.IsRunningOn(node):
rsc_on_node.append(rsc)
self.CM.debug("Setting node %s to standby mode" % node)
if not self.CM.SetStandbyMode(node, "on"):
return self.failure("can't set node %s to standby mode" % node)
time.sleep(30) # Allow time for the update to be applied and cause something
self.CM.cluster_stable()
status = self.CM.StandbyStatus(node)
if status != "on":
return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status))
self.CM.debug("Checking resources")
for rsc in self.CM.Resources():
if rsc.IsRunningOn(node):
return self.failure("%s set to standby, %s is still running on it" % (node, rsc.rid))
self.CM.debug("Setting node %s to active mode" % node)
if not self.CM.SetStandbyMode(node, "off"):
return self.failure("can't set node %s to active mode" % node)
time.sleep(30) # Allow time for the update to be applied and cause something
self.CM.cluster_stable()
status = self.CM.StandbyStatus(node)
if status != "off":
return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
self.CM.debug("Checking resources")
for rsc in rsc_on_node:
if not rsc.IsRunningOn(node):
return self.failure("%s set to active but %s is NOT back" % (node, rsc.rid))
return self.success()
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
AllTestClasses.append(StandbyTest2)
#######################################################################
class Fastdetection(CTSTest):
#######################################################################
'''Test the time which one node find out the other node is killed very quickly'''
def __init__(self,cm,timeout=60):
CTSTest.__init__(self, cm)
self.name = "DetectionTime"
self.they_stopped = self.CM["Pat:They_stopped"]
self.timeout = timeout
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.standby = StandbyTest(cm)
self.__setitem__("min", 0)
self.__setitem__("max", 0)
self.__setitem__("totaltime", 0)
def __call__(self, node):
'''Perform the fastfailureDetection test'''
self.incr("calls")
ret=self.startall(None)
if not ret:
return self.failure("Test setup failed")
if self.CM.upcount() < 2:
return self.skipped()
# Make sure they're not holding any resources
ret = self.standby(node)
if not ret:
return ret
stoppat = (self.they_stopped % ("", node))
stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout)
stopwatch.setwatch()
#
# This test is CM-specific - FIXME!!
#
if self.CM.rsh(node, "killall -9 heartbeat")==0:
Starttime = os.times()[4]
if stopwatch.look():
Stoptime = os.times()[4]
# This test is CM-specific - FIXME!!
self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true")
Detectiontime = Stoptime-Starttime
detectms = int(Detectiontime*1000+0.5)
self.CM.log("...failure detection time: %d ms" % detectms)
self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime
if self.Stats["min"] == 0:
self.Stats["min"] = Detectiontime
if Detectiontime > self.Stats["max"]:
self.Stats["max"] = Detectiontime
if Detectiontime < self.Stats["min"]:
self.Stats["min"] = Detectiontime
self.CM.ShouldBeStatus[node] = self.CM["down"]
self.start(node)
return self.success()
else:
# This test is CM-specific - FIXME!!
self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true")
self.CM.ShouldBeStatus[node] = self.CM["down"]
ret=self.start(node)
return self.failure("Didn't find the log message")
else:
return self.failure("Couldn't kill cluster manager")
def is_applicable(self):
'''This test is applicable when auto_failback != legacy'''
return self.standby.is_applicable()
# This test is CM-specific - FIXME!!
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return [ "ccm.*ERROR: ccm_control_process:failure to send protoversion request"
, "ccm.*ERROR: Lost connection to heartbeat service. Need to bail out"
]
AllTestClasses.append(Fastdetection)
##############################################################################
class BandwidthTest(CTSTest):
##############################################################################
# Tests should not be cluster-manager-specific
# If you need to find out cluster manager configuration to do this, then
# it should be added to the generic cluster manager API.
'''Test the bandwidth which heartbeat uses'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name = "Bandwidth"
self.start = StartTest(cm)
self.__setitem__("min",0)
self.__setitem__("max",0)
self.__setitem__("totalbandwidth",0)
self.tempfile = tempfile.mktemp(".cts")
self.startall = SimulStartLite(cm)
def __call__(self, node):
'''Perform the Bandwidth test'''
self.incr("calls")
if self.CM.upcount()<1:
return self.skipped()
Path = self.CM.InternalCommConfig()
if "ip" not in Path["mediatype"]:
return self.skipped()
port = Path["port"][0]
port = int(port)
ret = self.startall(None)
if not ret:
return self.failure("Test setup failed")
time.sleep(5) # We get extra messages right after startup.
fstmpfile = "/var/run/band_estimate"
dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \
% (port, fstmpfile)
rc = self.CM.rsh(node, dumpcmd)
if rc == 0:
farfile = "root@%s:%s" % (node, fstmpfile)
self.CM.rsh.cp(farfile, self.tempfile)
Bandwidth = self.countbandwidth(self.tempfile)
if not Bandwidth:
self.CM.log("Could not compute bandwidth.")
return self.success()
intband = int(Bandwidth + 0.5)
self.CM.log("...bandwidth: %d bits/sec" % intband)
self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth
if self.Stats["min"] == 0:
self.Stats["min"] = Bandwidth
if Bandwidth > self.Stats["max"]:
self.Stats["max"] = Bandwidth
if Bandwidth < self.Stats["min"]:
self.Stats["min"] = Bandwidth
self.CM.rsh(node, "rm -f %s" % fstmpfile)
os.unlink(self.tempfile)
return self.success()
else:
return self.failure("no response from tcpdump command [%d]!" % rc)
def countbandwidth(self, file):
fp = open(file, "r")
fp.seek(0)
count = 0
sum = 0
while 1:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count=count+1
linesplit = string.split(line," ")
for j in range(len(linesplit)-1):
if linesplit[j]=="udp": break
if linesplit[j]=="length:": break
try:
sum = sum + int(linesplit[j+1])
except ValueError:
self.CM.log("Invalid tcpdump line: %s" % line)
return None
T1 = linesplit[0]
timesplit = string.split(T1,":")
time2split = string.split(timesplit[2],".")
time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001
break
while count < 100:
line = fp.readline()
if not line:
return None
if re.search("udp",line) or re.search("UDP,", line):
count = count+1
linessplit = string.split(line," ")
for j in range(len(linessplit)-1):
if linessplit[j] =="udp": break
if linesplit[j]=="length:": break
try:
sum=int(linessplit[j+1])+sum
except ValueError:
self.CM.log("Invalid tcpdump line: %s" % line)
return None
T2 = linessplit[0]
timesplit = string.split(T2,":")
time2split = string.split(timesplit[2],".")
time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001
time = time2-time1
if (time <= 0):
return 0
return (sum*8)/time
def is_applicable(self):
'''BandwidthTest is always applicable'''
return 0
AllTestClasses.append(BandwidthTest)
##########################################################################
class RedundantpathTest(CTSTest):
##########################################################################
'''In heartbeat, it has redundant path to communicate between the cluster'''
#
# Tests should not be cluster-manager specific
# One needs to isolate what you need from the cluster manager and then
# add a (new) API to do it.
#
def __init__(self,cm,timeout=60):
CTSTest.__init__(self,cm)
self.name = "RedundantpathTest"
self.timeout = timeout
def PathCount(self):
'''Return number of communication paths'''
Path = self.CM.InternalCommConfig()
cf = self.CM.cf
eths = []
serials = []
num = 0
for interface in Path["interface"]:
if re.search("eth",interface):
eths.append(interface)
num = num + 1
if re.search("/dev",interface):
serials.append(interface)
num = num + 1
return (num, eths, serials)
def __call__(self,node):
'''Perform redundant path test'''
self.incr("calls")
if self.CM.ShouldBeStatus[node]!=self.CM["up"]:
return self.skipped()
(num, eths, serials) = self.PathCount()
for eth in eths:
if self.CM.rsh(node,"ifconfig %s down" % eth)==0:
PathDown = "OK"
break
if PathDown != "OK":
for serial in serials:
if self.CM.rsh(node,"setserial %s uart none" % serial)==0:
PathDown = "OK"
break
if PathDown != "OK":
return self.failure("Cannot break the path")
time.sleep(self.timeout)
for audit in CTSaudits.AuditList(self.CM):
if not audit():
for eth in eths:
self.CM.rsh(node,"ifconfig %s up" % eth)
for serial in serials:
self.CM.rsh(node,"setserial %s uart 16550" % serial)
return self.failure("Redundant path fail")
for eth in eths:
self.CM.rsh(node,"ifconfig %s up" % eth)
for serial in serials:
self.CM.rsh(node,"setserial %s uart 16550" % serial)
return self.success()
def is_applicable(self):
'''It is applicable when you have more than one connection'''
return self.PathCount()[0] > 1
# FIXME!! Why is this one commented out?
#AllTestClasses.append(RedundantpathTest)
##########################################################################
class DRBDTest(CTSTest):
##########################################################################
'''In heartbeat, it provides replicated storage.'''
def __init__(self,cm, timeout=10):
CTSTest.__init__(self,cm)
self.name = "DRBD"
self.timeout = timeout
def __call__(self, dummy):
'''Perform the 'DRBD' test.'''
self.incr("calls")
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
return self.skipped()
# Note: All these special cases with Start/Stop/StatusDRBD
# should be reworked to use resource objects instead of
# being hardwired to bypass the objects here.
for node in self.CM.Env["nodes"]:
done=time.time()+self.timeout+1
while (time.time()<done):
line=self.CM.rsh.readaline(node,self.CM["StatusDRBDCmd"])
if re.search("running",line):
break
else:
self.CM.rsh(node,self.CM["StartDRBDCmd"])
time.sleep(1)
if time.time()>done:
return self.failure("Can't start drbd, please check it")
device={}
for node in self.CM.Env["nodes"]:
device[node]=self.getdevice(node)
node = self.CM.Env["nodes"][0]
done=time.time()+self.timeout+1
while 1:
if (time.time()>done):
return self.failure("the drbd could't sync")
self.CM.rsh(node,"cp /proc/drbd /var/run >/dev/null 2>&1")
if self.CM.rsh.cp("%s:/var/run/drbd" % node,"/var/run"):
line = open("/tmp/var/run").readlines()[2]
p = line.find("Primary")
s1 = line.find("Secondary")
s2 = line.rfind("Secondary")
if s1!=s2:
if self.CM.rsh(node,"drbdsetup %s primary" % device[node]):
pass
if p!=-1:
if p<s1:
primarynode = node
secondarynode = self.CM.Env["nodes"][1]
break
else:
if s1!=-1:
primarynode = self.CM.Env["nodes"][1]
secondarynode = node
break
time.sleep(1)
self.CM.rsh(secondarynode, self.CM["StopCmd"])
self.CM.rsh(primarynode, self.CM["StopCmd"])
line1 = self.CM.rsh.readaline(node,"md5sum %s" % device[primarynode])
line2 = self.CM.rsh.readaline(node,"md5sum %s" % device[secondarynode])
self.CM.rsh(primarynode,self.CM["StartCmd"])
self.CM.rsh(secondarynode,self.CM["StartCmd"])
if string.split(line1," ")[0] == string.split(line2, " "):
return self.failure("Drbd desnt't work good")
return self.success()
def getdevice(self,node):
device=None
if self.CM.rsh(node,self.CM["DRBDCheckconf"])==0:
self.CM.rsh.cp("%s:/var/run/drbdconf" % node, "/var/run")
lines=open("/var/run/drbdconf","r")
for line in lines:
if line.find("%s:device" % node)!=-1:
device=string.split(line," ")[8]
break
return device
def is_applicable(self):
'''DRBD is applicable when there are drbd devices'''
for group in self.CM.ResourceGroups():
for resource in group:
if resource.Type() == "datadisk":
return 1
return None
AllTestClasses.append(DRBDTest)
####################################################################
class Split_brainTest(CTSTest):
####################################################################
'''It is used to test split-brain. when the path between the two nodes break
check the two nodes both take over the resource'''
def __init__(self,cm):
CTSTest.__init__(self,cm)
self.name = "Split_brain"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
def __call__(self, node):
'''Perform split-brain test'''
self.incr("calls")
ret = self.startall(None)
if not ret:
return self.failure("Test setup failed")
'''isolate node, Look for node is dead message'''
watchstoppats = [ ]
stoppat = self.CM["Pat:They_stopped"]
for member in self.CM.Env["nodes"]:
if member != node:
thispat = (stoppat % (node,member))
watchstoppats.append(thispat)
thatpat = (stoppat % (member,node))
watchstoppats.append(thatpat)
watchstop = CTS.LogWatcher(self.CM["LogFileName"], watchstoppats\
, timeout=self.CM["DeadTime"]+60)
watchstop.ReturnOnlyMatch()
watchstop.setwatch()
if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 :
self.CM.savecomm_node(node)
if not self.CM.isolate_node(node):
return self.failure("Could not isolate the nodes")
if not watchstop.lookforall():
self.CM.unisolate_node(node)
self.CM.log("Patterns not found: " + repr(watchstop.unmatched))
return self.failure("Didn't find the log 'dead' message")
'''
Unisolate the node, look for the return partition message
and check whether they restart
'''
watchpartitionpats = [ ]
partitionpat = self.CM["Pat:Return_partition"]
watchstartpats = [ ]
startpat = self.CM["Pat:We_started"]
for member in self.CM.Env["nodes"]:
thispat = (partitionpat % member)
thatpat = (startpat % member)
watchpartitionpats.append(thispat)
watchstartpats.append(thatpat)
watchpartition = CTS.LogWatcher(self.CM["LogFileName"], watchpartitionpats\
, timeout=self.CM["DeadTime"]+60)
watchstart = CTS.LogWatcher(self.CM["LogFileName"], watchstartpats\
, timeout=self.CM["DeadTime"]+60)
watchstart.ReturnOnlyMatch()
watchpartition.setwatch()
watchstart.setwatch()
self.CM.unisolate_node(node)
if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 :
self.CM.restorecomm_node(node)
if not watchpartition.lookforall():
self.CM.log("Patterns not found: " + repr(watchpartition.unmatched))
return self.failure("Didn't find return from partition messages")
if not watchstart.lookforall():
self.CM.log("Patterns not found: " + repr(watchstart.unmatched))
return self.failure("Both nodes didn't restart")
return self.success()
def is_applicable(self):
'''Split_brain is applicable for 1.X'''
if self.CM["Name"] == "heartbeat":
return 1
return 0
#
# FIXME!! This test has hard-coded cluster-manager-specific things in it!!
#
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return [ "ERROR:.*Both machines own.*resources"
, "ERROR:.*lost a lot of packets!"
, "ERROR: Cannot rexmit pkt .*: seqno too low"
, "ERROR: Irretrievably lost packet: node"
, "CRIT: Cluster node .* returning after partition"
]
AllTestClasses.append(Split_brainTest)
###################################################################
class ResourceRecover(CTSTest):
###################################################################
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="ResourceRecover"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.max=30
self.rid=None
self.action = "fail"
# make sure the interval is greater than 0 so failcount is updated
self.interval = 60000
def __call__(self, node):
'''Perform the 'ResourceRecover' test. '''
self.incr("calls")
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
resourcelist = self.CM.active_resources(node)
# if there are no resourcelist, return directly
if len(resourcelist)==0:
self.CM.log("No active resources on %s" % node)
return self.skipped()
self.rid = self.CM.Env.RandomGen.choice(resourcelist)
self.CM.debug("Shooting %s..." % self.rid)
pats = []
pats.append("crmd.* Performing op=%s_stop_0" % self.rid)
pats.append("crmd.* Performing op=%s_start_0" % self.rid)
pats.append("crmd.* LRM operation %s_start_0.*complete" % self.rid)
pats.append("Updating failcount for %s on .* after .* %s"
% (self.rid, self.action))
watch = CTS.LogWatcher(self.CM["LogFileName"], pats, timeout=60)
watch.setwatch()
# fail a resource by calling an action it doesn't support
self.CM.rsh.remote_py(node, "os", "system",
"@sbindir@/crm_resource -F -r %s -H %s &>/dev/null" % (self.rid, node))
watch.lookforall()
self.CM.cluster_stable()
recovernode=self.CM.ResourceLocation(self.rid)
if len(recovernode)==1:
self.CM.debug("Recovered: %s is running on %s"
%(self.rid, recovernode[0]))
if not watch.unmatched:
return self.success()
else:
return self.failure("Patterns not found: %s"
% repr(watch.unmatched))
elif len(recovernode)==0:
return self.failure("%s was not recovered and is inactive"
% self.rid)
else:
return self.failure("%s is now active on more than one node: %s"
%(self.rid, str(recovernode)))
def is_applicable(self):
'''ResourceRecover is applicable only when there are resources running
on our cluster and environment is linux-ha-v2'''
if self.CM["Name"] == "linux-ha-v2":
resourcelist=self.CM.Resources()
if len(resourcelist)==0:
self.CM.log("No resources on this cluster")
return 0
else:
return 1
return 0
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [ """Updating failcount for %s""" % self.rid,
"""Unknown operation: fail""",
"""ERROR: sending stonithRA op to stonithd failed.""",
"""ERROR: process_lrm_event: LRM operation %s_%s_%d""" % (self.rid, self.action, self.interval),
"""ERROR: process_graph_event: Action %s_%s_%d initiated outside of a transition""" % (self.rid, self.action, self.interval),
]
AllTestClasses.append(ResourceRecover)
###################################################################
class ComponentFail(CTSTest):
###################################################################
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="ComponentFail"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
self.stopall = SimulStopLite(cm)
self.complist = cm.Components()
self.theystart = cm["Pat:They_started"]
def __call__(self, node):
'''Perform the 'ComponentFail' test. '''
self.incr("calls")
# start all nodes
if not self.CM.cluster_stable():
self.stopall(None)
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
# select a component to kill
component = self.CM.Env.RandomGen.choice(self.complist)
self.CM.log("choose %s to kill"%component.name)
patterns = []
patterns.append("%s heartbeat.*Respawning.*%s" %(node, component.name))
patterns.append(self.theystart%node)
# set the watch for stable
watch = CTS.LogWatcher(
self.CM["LogFileName"], patterns, self.CM["DeadTime"]+10)
watch.setwatch()
# kill the component
component.kill(node)
# check to see Heartbeat noticed
match = watch.look()
if match:
self.CM.log("Found match: %s"%(match))
# now watch it recover...
if self.CM.cluster_stable():
return self.success()
else:
self.failure("Cluster not stable")
else:
return self.failure("Heartbeat didnt notice %s die" %component)
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return ["""heartbeat.*killed by signal 9""", """heartbeat.*Respawning"""]
#AllTestClasses.append(ComponentFail)
####################################################################
class Split_brainTest2(CTSTest):
####################################################################
'''It is used to test split-brain. when the path between the two nodes break
check the two nodes both take over the resource'''
def __init__(self,cm):
CTSTest.__init__(self,cm)
self.name = "Split_brain2"
self.start = StartTest(cm)
self.startall = SimulStartLite(cm)
def __call__(self, node):
'''Perform split-brain test'''
self.incr("calls")
ret = self.startall(None)
if not ret:
return self.failure("Setup failed")
count1 = self.CM.Env.RandomGen.randint(1,len(self.CM.Env["nodes"])-1)
partition1 = []
while len(partition1) < count1:
select = self.CM.Env.RandomGen.choice(self.CM.Env["nodes"])
if not select in partition1:
partition1.append(select)
partition2 = []
for member in self.CM.Env["nodes"]:
if not member in partition1:
partition2.append(member)
allownodes1 = ""
for member in partition1:
allownodes1 += member + " "
allownodes2 = ""
for member in partition2:
allownodes2 += member + " "
self.CM.log("Partition1: " + str(partition1))
self.CM.log("Partition2: " + str(partition2))
'''isolate nodes, Look for node is dead message'''
watchdeadpats = [ ]
deadpat = self.CM["Pat:They_dead"]
for member in self.CM.Env["nodes"]:
thispat = (deadpat % member)
watchdeadpats.append(thispat)
watchdead = CTS.LogWatcher(self.CM["LogFileName"], watchdeadpats\
, timeout=self.CM["DeadTime"]+60)
watchdead.ReturnOnlyMatch()
watchdead.setwatch()
for member in partition1:
if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 :
self.CM.savecomm_node(node)
if not self.CM.isolate_node(member,allownodes1):
return self.failure("Could not isolate the nodes")
for member in partition2:
if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 :
self.CM.savecomm_node(node)
if not self.CM.isolate_node(member,allownodes2):
return self.failure("Could not isolate the nodes")
if not watchdead.lookforall():
for member in self.CM.Env["nodes"]:
self.CM.unisolate_node(member)
self.CM.log("Patterns not found: " + repr(watchdead.unmatched))
return self.failure("Didn't find the log 'dead' message")
dcnum=0
while dcnum < 2:
dcnum = 0
for member in self.CM.Env["nodes"]:
if self.CM.is_node_dc(member):
dcnum += 1
time.sleep(1)
'''
Unisolate the node, look for the return partition message
and check whether they restart
'''
watchpartitionpats = [self.CM["Pat:DC_IDLE"]]
partitionpat = self.CM["Pat:Return_partition"]
for member in self.CM.Env["nodes"]:
thispat = (partitionpat % member)
watchpartitionpats.append(thispat)
watchpartition = CTS.LogWatcher(self.CM["LogFileName"], watchpartitionpats\
, timeout=self.CM["DeadTime"]+60)
watchpartition.setwatch()
for member in self.CM.Env["nodes"]:
if float(self.CM.Env["XmitLoss"])!=0 or float(self.CM.Env["RecvLoss"])!=0 :
self.CM.restorecomm_node(node)
self.CM.unisolate_node(member)
if not watchpartition.lookforall():
self.CM.log("Patterns not found: " + repr(watchpartition.unmatched))
return self.failure("Didn't find return from partition messages")
return self.success()
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
def errorstoignore(self):
'''Return list of errors which are 'normal' and should be ignored'''
return [ "ERROR:.*Both machines own.*resources"
, "ERROR:.*lost a lot of packets!"
, "ERROR: Cannot rexmit pkt .*: seqno too low"
, "ERROR: Irretrievably lost packet: node"
]
#AllTestClasses.append(Split_brainTest2)
####################################################################
class MemoryTest(CTSTest):
####################################################################
'''Check to see if anyone is leaking memory'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="Memory"
# self.test = ElectionMemoryTest(cm)
self.test = ResourceRecover(cm)
self.startall = SimulStartLite(cm)
self.before = {}
self.after = {}
def __call__(self, node):
ps_command='''ps -eo ucomm,pid,pmem,tsiz,dsiz,rss,vsize | grep -e ccm -e ha_logd -e cib -e crmd -e lrmd -e tengine -e pengine'''
memory_error = [
"", "", "",
"Code",
"Data",
"Resident",
"Total"
]
ret = self.startall(None)
if not ret:
return self.failure("Test setup failed")
time.sleep(10)
for node in self.CM.Env["nodes"]:
self.before[node] = {}
rsh_pipe = self.CM.rsh.popen(node, ps_command)
rsh_pipe.tochild.close()
result = rsh_pipe.fromchild.readline()
while result:
tokens = result.split()
self.before[node][tokens[1]] = result
result = rsh_pipe.fromchild.readline()
rsh_pipe.fromchild.close()
self.lastrc = rsh_pipe.wait()
# do something...
if not self.test(node):
return self.failure("Underlying test failed")
time.sleep(10)
for node in self.CM.Env["nodes"]:
self.after[node] = {}
rsh_pipe = self.CM.rsh.popen(node, ps_command)
rsh_pipe.tochild.close()
result = rsh_pipe.fromchild.readline()
while result:
tokens = result.split()
self.after[node][tokens[1]] = result
result = rsh_pipe.fromchild.readline()
rsh_pipe.fromchild.close()
self.lastrc = rsh_pipe.wait()
failed_nodes = []
for node in self.CM.Env["nodes"]:
failed = 0
for process in self.before[node]:
messages = []
before_line = self.before[node][process]
after_line = self.after[node][process]
if not after_line:
self.CM.log("%s %s[%s] exited during the test"
%(node, before_tokens[0], before_tokens[1]))
continue
before_tokens = before_line.split()
after_tokens = after_line.split()
# 3 : Code size
# 4 : Data size
# 5 : Resident size
# 6 : Total size
for index in [ 3, 4, 6 ]:
mem_before = int(before_tokens[index])
mem_after = int(after_tokens[index])
mem_diff = mem_after - mem_before
mem_allow = mem_before * 0.01
# for now...
mem_allow = 0
if mem_diff > mem_allow:
failed = 1
messages.append("%s size grew by %dkB (%dkB)"
%(memory_error[index], mem_diff, mem_after))
elif mem_diff < 0:
messages.append("%s size shrank by %dkB (%dkB)"
%(memory_error[index], mem_diff, mem_after))
if len(messages) > 0:
self.CM.log("Process %s[%s] on %s: %s"
%(before_tokens[0], before_tokens[1], node,
repr(messages)))
self.CM.debug("%s Before: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB"
%(node, before_tokens[0], before_tokens[1],
before_tokens[2], before_tokens[3],
before_tokens[4], before_tokens[5],
before_tokens[6]))
self.CM.debug("%s After: %s[%s] (%s%%):\tcode=%skB, data=%skB, resident=%skB, total=%skB"
%(node, after_tokens[0], after_tokens[1],
after_tokens[2], after_tokens[3],
after_tokens[4], after_tokens[5],
after_tokens[6]))
if failed == 1:
failed_nodes.append(node)
if len(failed_nodes) > 0:
return self.failure("Memory leaked on: " + repr(failed_nodes))
return self.success()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return [ """ERROR: .* LRM operation.*monitor on .*: not running""",
"""pengine:.*Handling failed """]
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
#AllTestClasses.append(MemoryTest)
####################################################################
class ElectionMemoryTest(CTSTest):
####################################################################
'''Check to see if anyone is leaking memory'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="Election"
def __call__(self, node):
self.rsh.readaline(node, self.CM["ElectionCmd"]%node)
if self.CM.cluster_stable():
return self.success()
return self.failure("Cluster not stable")
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
def is_applicable(self):
'''Never applicable, only for use by the memory test'''
return 0
AllTestClasses.append(ElectionMemoryTest)
####################################################################
class SpecialTest1(CTSTest):
####################################################################
'''Set up a custom test to cause quorum failure issues for Andrew'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="SpecialTest1"
self.startall = SimulStartLite(cm)
self.restart1 = RestartTest(cm)
self.stopall = SimulStopLite(cm)
def __call__(self, node):
'''Perform the 'SpecialTest1' test for Andrew. '''
self.incr("calls")
# Shut down all the nodes...
ret = self.stopall(None)
if not ret:
return ret
# Start the selected node
ret = self.restart1(node)
if not ret:
return ret
# Start all remaining nodes
ret = self.startall(None)
return ret
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
def is_applicable(self):
return 1
AllTestClasses.append(SpecialTest1)
###################################################################
class NearQuorumPointTest(CTSTest):
###################################################################
'''
This test brings larger clusters near the quorum point (50%).
In addition, it will test doing starts and stops at the same time.
Here is how I think it should work:
- loop over the nodes and decide randomly which will be up and which
will be down Use a 50% probability for each of up/down.
- figure out what to do to get into that state from the current state
- in parallel, bring up those going up and bring those going down.
'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="NearQuorumPoint"
def __call__(self, dummy):
'''Perform the 'NearQuorumPoint' test. '''
self.incr("calls")
startset = []
stopset = []
#decide what to do with each node
for node in self.CM.Env["nodes"]:
action = self.CM.Env.RandomGen.choice(["start","stop"])
#action = self.CM.Env.RandomGen.choice(["start","stop","no change"])
if action == "start" :
startset.append(node)
elif action == "stop" :
stopset.append(node)
self.CM.debug("start nodes:" + repr(startset))
self.CM.debug("stop nodes:" + repr(stopset))
#add search patterns
watchpats = [ ]
for node in stopset:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
watchpats.append(self.CM["Pat:We_stopped"] % node)
for node in startset:
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
watchpats.append(self.CM["Pat:They_started"] % node)
if len(watchpats) == 0:
return self.skipped()
watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats
, timeout=self.CM["DeadTime"]+10)
watch.setwatch()
#begin actions
for node in stopset:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
self.CM.StopaCMnoBlock(node)
for node in startset:
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
self.CM.StartaCMnoBlock(node)
#get the result
if watch.lookforall():
self.CM.cluster_stable()
return self.success()
self.CM.log("Warn: Patterns not found: " + repr(watch.unmatched))
#get the "bad" nodes
upnodes = []
for node in stopset:
if self.CM.StataCM(node) == 1:
upnodes.append(node)
downnodes = []
for node in startset:
if self.CM.StataCM(node) == 0:
downnodes.append(node)
if upnodes == [] and downnodes == []:
self.CM.cluster_stable()
return self.success()
if len(upnodes) > 0:
self.CM.log("Warn: Unstoppable nodes: " + repr(upnodes))
if len(downnodes) > 0:
self.CM.log("Warn: Unstartable nodes: " + repr(downnodes))
return self.failure()
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2":
return 1
return 0
AllTestClasses.append(NearQuorumPointTest)
###################################################################
class BSC_AddResource(CTSTest):
###################################################################
'''Add a resource to the cluster'''
def __init__(self, cm):
CTSTest.__init__(self, cm)
self.name="AddResource"
self.resource_offset = 0
self.cib_cmd="""@sbindir@/cibadmin -C -o %s -X '%s' """
def __call__(self, node):
self.resource_offset = self.resource_offset + 1
r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset)
start_pat = "crmd.*%s_start_0.*complete"
patterns = []
patterns.append(start_pat % r_id)
watch = CTS.LogWatcher(
self.CM["LogFileName"], patterns, self.CM["DeadTime"])
watch.setwatch()
fields = string.split(self.CM.Env["IPBase"], '.')
fields[3] = str(int(fields[3])+1)
ip = string.join(fields, '.')
self.CM.Env["IPBase"] = ip
if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip):
return self.failure("Make resource %s failed" % r_id)
failed = 0
watch_result = watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.CM.log ("Warn: Pattern not found: %s" % (regex))
failed = 1
if failed:
return self.failure("Resource pattern(s) not found")
if not self.CM.cluster_stable(self.CM["DeadTime"]):
return self.failure("Unstable cluster")
return self.success()
def make_ip_resource(self, node, id, rclass, type, ip):
self.CM.log("Creating %s::%s:%s (%s) on %s" % (rclass,type,id,ip,node))
rsc_xml="""
<primitive id="%s" class="%s" type="%s" provider="heartbeat">
<instance_attributes id="%s"><attributes>
<nvpair id="%s" name="ip" value="%s"/>
</attributes></instance_attributes>
</primitive>""" % (id, rclass, type, id, id, ip)
node_constraint="""
<rsc_location id="run_%s" rsc="%s">
<rule id="pref_run_%s" score="100">
<expression id="%s_loc_expr" attribute="#uname" operation="eq" value="%s"/>
</rule>
</rsc_location>""" % (id, id, id, id, node)
rc = 0
(rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("constraints", node_constraint))
if rc != 0:
self.CM.log("Constraint creation failed: %d" % rc)
return None
(rc, lines) = self.CM.rsh.remote_py(node, "os", "system", self.cib_cmd % ("resources", rsc_xml))
if rc != 0:
self.CM.log("Resource creation failed: %d" % rc)
return None
return 1
def is_applicable(self):
if self.CM["Name"] == "linux-ha-v2" and self.CM.Env["DoBSC"]:
return 1
return None
def TestList(cm):
result = []
for testclass in AllTestClasses:
bound_test = testclass(cm)
if bound_test.is_applicable():
result.append(bound_test)
return result
class SimulStopLite(CTSTest):
###################################################################
'''Stop any active nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="SimulStopLite"
def __call__(self, dummy):
'''Perform the 'SimulStopLite' setup work. '''
self.incr("calls")
self.CM.debug("Setup: " + self.name)
# We ignore the "node" parameter...
watchpats = [ ]
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
self.incr("WasStarted")
watchpats.append(self.CM["Pat:All_stopped"] % node)
if self.CM.Env["use_logd"]:
watchpats.append(self.CM["Pat:Logd_stopped"] % node)
if len(watchpats) == 0:
self.CM.clear_all_caches()
return self.skipped()
# Stop all the nodes - at about the same time...
watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats
, timeout=self.CM["DeadTime"]+10)
watch.setwatch()
self.starttime=time.time()
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["up"]:
self.CM.StopaCMnoBlock(node)
if watch.lookforall():
self.CM.clear_all_caches()
return self.success()
did_fail=0
up_nodes = []
for node in self.CM.Env["nodes"]:
if self.CM.StataCM(node) == 1:
did_fail=1
up_nodes.append(node)
if did_fail:
return self.failure("Active nodes exist: " + repr(up_nodes))
self.CM.log("Warn: All nodes stopped but CTS didnt detect: "
+ repr(watch.unmatched))
self.CM.clear_all_caches()
return self.success()
def is_applicable(self):
'''SimulStopLite is a setup test and never applicable'''
return 0
###################################################################
class SimulStartLite(CTSTest):
###################################################################
'''Start any stopped nodes ~ simultaneously'''
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="SimulStartLite"
def __call__(self, dummy):
'''Perform the 'SimulStartList' setup work. '''
self.incr("calls")
self.CM.debug("Setup: " + self.name)
# We ignore the "node" parameter...
watchpats = [ ]
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
self.incr("WasStopped")
watchpats.append(self.CM["Pat:They_started"] % node)
if len(watchpats) == 0:
return self.skipped()
# Start all the nodes - at about the same time...
watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats
, timeout=self.CM["DeadTime"]+10)
watch.setwatch()
self.starttime=time.time()
for node in self.CM.Env["nodes"]:
if self.CM.ShouldBeStatus[node] == self.CM["down"]:
self.CM.StartaCMnoBlock(node)
if watch.lookforall():
for attempt in (1, 2, 3, 4, 5):
if self.CM.cluster_stable():
return self.success()
return self.failure("Cluster did not stabilize")
did_fail=0
unstable = []
for node in self.CM.Env["nodes"]:
if self.CM.StataCM(node) == 0:
did_fail=1
unstable.append(node)
if did_fail:
return self.failure("Unstarted nodes exist: " + repr(unstable))
unstable = []
for node in self.CM.Env["nodes"]:
if not self.CM.node_stable(node):
did_fail=1
unstable.append(node)
if did_fail:
return self.failure("Unstable cluster nodes exist: "
+ repr(unstable))
self.CM.log("Warn: All nodes started but CTS didnt detect: "
+ repr(watch.unmatched))
return self.success()
def is_applicable(self):
'''SimulStartLite is a setup test and never applicable'''
return 0
###################################################################
class LoggingTest(CTSTest):
###################################################################
def __init__(self, cm):
CTSTest.__init__(self,cm)
self.name="Logging"
def __call__(self, dummy):
'''Perform the 'Logging' test. '''
self.incr("calls")
# Make sure logging is working and we have enough disk space...
if not self.CM.TestLogging():
sys.exit(1)
if not self.CM.CheckDf():
sys.exit(1)
def is_applicable(self):
'''ResourceRecover is applicable only when there are resources running
on our cluster and environment is linux-ha-v2'''
return self.CM.Env["DoBSC"]
def errorstoignore(self):
'''Return list of errors which should be ignored'''
return []
#AllTestClasses.append(LoggingTest)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jul 8, 5:24 PM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2002367
Default Alt Text
(223 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment