Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/cts/lab/CM_common.py b/cts/lab/CM_common.py
index 1cb1249977..cfa73ae2b1 100755
--- a/cts/lab/CM_common.py
+++ b/cts/lab/CM_common.py
@@ -1,468 +1,469 @@
""" Cluster Manager common class for Pacemaker's Cluster Test Suite (CTS)
This was originally the cluster manager class for the Heartbeat stack.
It is retained for use as a base class by other cluster manager classes.
It could be merged into the ClusterManager class directly, but this is
easier.
"""
__copyright__ = """Original Author: Huang Zhen <zhenhltc@cn.ibm.com>
Copyright 2004 International Business Machines
-with later changes copyright 2004-2020 the Pacemaker project contributors.
+with later changes copyright 2004-2021 the Pacemaker project contributors.
The version control history for this file may have further details.
"""
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys
from cts.CTSvars import *
from cts.CTS import *
from cts.CIB import *
from cts.CTStests import AuditResource
from cts.watcher import LogWatcher
+from cts.ClusterManager import ClusterManager
class crm_common(ClusterManager):
def __init__(self, Environment, randseed=None, name=None):
ClusterManager.__init__(self, Environment, randseed=randseed)
self.fastfail = 0
self.cib_installed = 0
self.config = None
self.cluster_monitor = 0
self.use_short_names = 1
if self.Env["DoBSC"]:
del self.templates["Pat:They_stopped"]
self._finalConditions()
self.check_transitions = 0
self.check_elections = 0
self.CIBsync = {}
self.CibFactory = ConfigFactory(self)
self.cib = self.CibFactory.createConfig(self.Env["Schema"])
def errorstoignore(self):
# At some point implement a more elegant solution that
# also produces a report at the end
""" Return a list of known error messages that should be ignored """
return PatternSelector().get_patterns(self.name, "BadNewsIgnore")
def install_config(self, node):
if not self.ns.WaitForNodeToComeUp(node):
self.log("Node %s is not up." % node)
return None
if not node in self.CIBsync and self.Env["ClobberCIB"] == 1:
self.CIBsync[node] = 1
self.rsh(node, "rm -f "+CTSvars.CRM_CONFIG_DIR+"/cib*")
# Only install the CIB on the first node, all the other ones will pick it up from there
if self.cib_installed == 1:
return None
self.cib_installed = 1
if self.Env["CIBfilename"] == None:
self.log("Installing Generated CIB on node %s" % (node))
self.cib.install(node)
else:
self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node))
if 0 != self.rsh.cp(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)):
raise ValueError("Can not scp file to %s %d"%(node))
self.rsh(node, "chown "+CTSvars.CRM_DAEMON_USER+" "+CTSvars.CRM_CONFIG_DIR+"/cib.xml")
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
self.partitions_expected = 1
for node in self.Env["nodes"]:
self.ShouldBeStatus[node] = ""
if self.Env["experimental-tests"]:
self.unisolate_node(node)
self.StataCM(node)
def test_node_CM(self, node):
'''Report the status of the cluster manager on a given node'''
watchpats = [ ]
watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)")
watchpats.append(self.templates["Pat:NonDC_started"] % node)
watchpats.append(self.templates["Pat:DC_started"] % node)
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, "ClusterIdle", hosts=[node], kind=self.Env["LogWatcher"])
idle_watch.setwatch()
out = self.rsh(node, self.templates["StatusCmd"]%node, 1)
self.debug("Node %s status: '%s'" %(node, out))
if not out or (out.find('ok') < 0):
if self.ShouldBeStatus[node] == "up":
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, "down", self.ShouldBeStatus[node]))
self.ShouldBeStatus[node] = "down"
return 0
if self.ShouldBeStatus[node] == "down":
self.log(
"Node status for %s is %s but we think it should be %s: %s"
% (node, "up", self.ShouldBeStatus[node], out))
self.ShouldBeStatus[node] = "up"
# check the output first - because syslog-ng loses messages
if out.find('S_NOT_DC') != -1:
# Up and stable
return 2
if out.find('S_IDLE') != -1:
# Up and stable
return 2
# fall back to syslog-ng and wait
if not idle_watch.look():
# just up
self.debug("Warn: Node %s is unstable: %s" % (node, out))
return 1
# Up and stable
return 2
# Is the node up or is the node down
def StataCM(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_CM(node) > 0:
return 1
return None
# Being up and being stable is not the same question...
def node_stable(self, node):
'''Report the status of the cluster manager on a given node'''
if self.test_node_CM(node) == 2:
return 1
self.log("Warn: Node %s not stable" % (node))
return None
def partition_stable(self, nodes, timeout=None):
watchpats = [ ]
watchpats.append("Current ping state: S_IDLE")
watchpats.append(self.templates["Pat:DC_IDLE"])
self.debug("Waiting for cluster stability...")
if timeout == None:
timeout = self.Env["DeadTime"]
if len(nodes) < 3:
self.debug("Cluster is inactive")
return 1
idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, "ClusterStable", timeout, hosts=nodes.split(), kind=self.Env["LogWatcher"])
idle_watch.setwatch()
for node in nodes.split():
# have each node dump its current state
self.rsh(node, self.templates["StatusCmd"] % node, 1)
ret = idle_watch.look()
while ret:
self.debug(ret)
for node in nodes.split():
if re.search(node, ret):
return 1
ret = idle_watch.look()
self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout))
return None
def cluster_stable(self, timeout=None, double_check=False):
partitions = self.find_partitions()
for partition in partitions:
if not self.partition_stable(partition, timeout):
return None
if double_check:
# Make sure we are really stable and that all resources,
# including those that depend on transient node attributes,
# are started if they were going to be
time.sleep(5)
for partition in partitions:
if not self.partition_stable(partition, timeout):
return None
return 1
def is_node_dc(self, node, status_line=None):
rc = 0
if not status_line:
status_line = self.rsh(node, self.templates["StatusCmd"]%node, 1)
if not status_line:
rc = 0
elif status_line.find('S_IDLE') != -1:
rc = 1
elif status_line.find('S_INTEGRATION') != -1:
rc = 1
elif status_line.find('S_FINALIZE_JOIN') != -1:
rc = 1
elif status_line.find('S_POLICY_ENGINE') != -1:
rc = 1
elif status_line.find('S_TRANSITION_ENGINE') != -1:
rc = 1
return rc
def active_resources(self, node):
(rc, output) = self.rsh(node, """crm_resource -c""", None)
resources = []
for line in output:
if re.search("^Resource", line):
tmp = AuditResource(self, line)
if tmp.type == "primitive" and tmp.host == node:
resources.append(tmp.id)
return resources
def ResourceLocation(self, rid):
ResourceNodes = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
cmd = self.templates["RscRunning"] % (rid)
(rc, lines) = self.rsh(node, cmd, None)
if rc == 127:
self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
for line in lines:
self.log("Output: "+line)
elif rc == 0:
ResourceNodes.append(node)
return ResourceNodes
def find_partitions(self):
ccm_partitions = []
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
partition = self.rsh(node, self.templates["PartitionCmd"], 1)
if not partition:
self.log("no partition details for %s" % node)
elif len(partition) > 2:
nodes = partition.split()
nodes.sort()
partition = ' '.join(nodes)
found = 0
for a_partition in ccm_partitions:
if partition == a_partition:
found = 1
if found == 0:
self.debug("Adding partition from %s: %s" % (node, partition))
ccm_partitions.append(partition)
else:
self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
else:
self.log("bad partition details for %s" % node)
else:
self.debug("Node %s is down... skipping" % node)
self.debug("Found partitions: %s" % repr(ccm_partitions) )
return ccm_partitions
def HasQuorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.Env["nodes"]
for node in node_list:
if self.ShouldBeStatus[node] == "up":
quorum = self.rsh(node, self.templates["QuorumCmd"], 1)
if quorum.find("1") != -1:
return 1
elif quorum.find("0") != -1:
return 0
else:
self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum)
return 0
def Components(self):
complist = []
common_ignore = [
"Pending action:",
"(ERROR|error): crm_log_message_adv:",
"(ERROR|error): MSG: No message to dump",
"pending LRM operations at shutdown",
"Lost connection to the CIB manager",
"Connection to the CIB terminated...",
"Sending message to the CIB manager FAILED",
"Action A_RECOVER .* not supported",
"(ERROR|error): stonithd_op_result_ready: not signed on",
"pingd.*(ERROR|error): send_update: Could not send update",
"send_ipc_message: IPC Channel to .* is not connected",
"unconfirmed_actions: Waiting on .* unconfirmed actions",
"cib_native_msgready: Message pending on command channel",
r": Performing A_EXIT_1 - forcefully exiting ",
r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.",
]
stonith_ignore = [
r"Updating failcount for child_DoFencing",
r"error.*: Fencer connection failed \(will retry\)",
"pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.",
]
stonith_ignore.extend(common_ignore)
ccm = Process(self, "ccm", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
"pacemaker-controld.*Action A_RECOVER .* not supported",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
"pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
r"attrd.*exited with status 1",
r"cib.*exited with status 2",
# Not if it was fenced
# "A new node joined the cluster",
# "WARN: determine_online_status: Node .* is unclean",
# "Scheduling Node .* for STONITH",
# "Executing .* fencing operation",
# "tengine_stonith_callback: .*result=0",
# "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE",
# "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN",
"State transition S_STARTING -> S_PENDING",
], badnews_ignore = common_ignore)
based = Process(self, "pacemaker-based", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
"Lost connection to the CIB manager",
"Connection to the CIB manager terminated",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
"pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
r"pacemaker-controld.*: Could not recover from internal error",
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
r"attrd.*exited with status 1",
], badnews_ignore = common_ignore)
execd = Process(self, "pacemaker-execd", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
"LRM Connection failed",
"pacemaker-controld.*I_ERROR.*lrm_connection_destroy",
"State transition S_STARTING -> S_PENDING",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
# this status number is likely wrong now
r"pacemaker-controld.*exited with status 2",
], badnews_ignore = common_ignore)
controld = Process(self, "pacemaker-controld", triggersreboot=self.fastfail,
pats = [
# "WARN: determine_online_status: Node .* is unclean",
# "Scheduling Node .* for STONITH",
# "Executing .* fencing operation",
# "tengine_stonith_callback: .*result=0",
"State transition .* S_IDLE",
"State transition S_STARTING -> S_PENDING",
], badnews_ignore = common_ignore)
schedulerd = Process(self, "pacemaker-schedulerd", triggersreboot=self.fastfail, pats = [
"State transition .* S_RECOVERY",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*: Could not recover from internal error",
r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed",
"pacemaker-controld.*I_ERROR.*save_cib_contents",
# this status number is likely wrong now
r"pacemaker-controld.*exited with status 2",
], badnews_ignore = common_ignore, dc_only=1)
if self.Env["DoFencing"] == 1 :
complist.append(Process(self, "stoniths", triggersreboot=self.fastfail, dc_pats = [
r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed",
"Attempting connection to fencing daemon",
], badnews_ignore = stonith_ignore))
if self.fastfail == 0:
ccm.pats.extend([
# these status numbers are likely wrong now
r"attrd.*exited with status 1",
r"pacemaker-(based|controld).*exited with status 2",
])
based.pats.extend([
# these status numbers are likely wrong now
r"attrd.*exited with status 1",
r"pacemaker-controld.*exited with status 2",
])
execd.pats.extend([
# these status numbers are likely wrong now
r"pacemaker-controld.*exited with status 2",
])
complist.append(ccm)
complist.append(based)
complist.append(execd)
complist.append(controld)
complist.append(schedulerd)
return complist
def StandbyStatus(self, node):
out=self.rsh(node, self.templates["StandbyQueryCmd"] % node, 1)
if not out:
return "off"
out = out[:-1]
self.debug("Standby result: "+out)
return out
# status == "on" : Enter Standby mode
# status == "off": Enter Active mode
def SetStandbyMode(self, node, status):
current_status = self.StandbyStatus(node)
cmd = self.templates["StandbyCmd"] % (node, status)
ret = self.rsh(node, cmd)
return True
def AddDummyRsc(self, node, rid):
rsc_xml = """ '<resources>
<primitive class=\"ocf\" id=\"%s\" provider=\"pacemaker\" type=\"Dummy\">
<operations>
<op id=\"%s-interval-10s\" interval=\"10s\" name=\"monitor\"/
</operations>
</primitive>
</resources>'""" % (rid, rid)
constraint_xml = """ '<constraints>
<rsc_location id=\"location-%s-%s\" node=\"%s\" rsc=\"%s\" score=\"INFINITY\"/>
</constraints>'
""" % (rid, node, node, rid)
self.rsh(node, self.templates['CibAddXml'] % (rsc_xml))
self.rsh(node, self.templates['CibAddXml'] % (constraint_xml))
def RemoveDummyRsc(self, node, rid):
constraint = "\"//rsc_location[@rsc='%s']\"" % (rid)
rsc = "\"//primitive[@id='%s']\"" % (rid)
self.rsh(node, self.templates['CibDelXpath'] % constraint)
self.rsh(node, self.templates['CibDelXpath'] % rsc)
#######################################################################
#
# A little test code...
#
# Which you are advised to completely ignore...
#
#######################################################################
if __name__ == '__main__':
pass
diff --git a/cts/lab/CTS.py b/cts/lab/CTS.py
index 6d8ead37d2..34f5a686e3 100644
--- a/cts/lab/CTS.py
+++ b/cts/lab/CTS.py
@@ -1,809 +1,185 @@
""" Main classes for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2021 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
-import os
-import re
import sys
import time
import traceback
-from collections import UserDict
-
-from cts.CTSvars import *
from cts.logging import LogFactory
-from cts.watcher import LogWatcher
from cts.remote import RemoteFactory, input_wrapper
from cts.environment import EnvFactory
-from cts.patterns import PatternSelector
-
-has_log_stats = {}
-log_stats_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_stats.sh"
-log_stats = """
-#!%s
-# Tool for generating system load reports while CTS runs
-
-trap "" 1
-
-f=$1; shift
-action=$1; shift
-base=`basename $0`
-
-if [ ! -e $f ]; then
- echo "Time, Load 1, Load 5, Load 15, Test Marker" > $f
-fi
-
-function killpid() {
- if [ -e $f.pid ]; then
- kill -9 `cat $f.pid`
- rm -f $f.pid
- fi
-}
-
-function status() {
- if [ -e $f.pid ]; then
- kill -0 `cat $f.pid`
- return $?
- else
- return 1
- fi
-}
-
-function start() {
- # Is it already running?
- if
- status
- then
- return
- fi
-
- echo Active as $$
- echo $$ > $f.pid
-
- while [ 1 = 1 ]; do
- uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
- #top -b -c -n1 | grep -e usr/libexec/pacemaker | grep -v -e grep -e python | head -n 1 | sed s@/usr/libexec/pacemaker/@@ | awk '{print " 0, "$9", "$10", "$12}' | tr '\\n' ',' >> $f
- echo 0 >> $f
- sleep 5
- done
-}
-
-case $action in
- start)
- start
- ;;
- start-bg|bg)
- # Use c --ssh -- ./stats.sh file start-bg
- nohup $0 $f start >/dev/null 2>&1 </dev/null &
- ;;
- stop)
- killpid
- ;;
- delete)
- killpid
- rm -f $f
- ;;
- mark)
- uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
- echo " $*" >> $f
- start
- ;;
- *)
- echo "Unknown action: $action."
- ;;
-esac
-""" % (CTSvars.BASH_PATH)
class CtsLab(object):
'''This class defines the Lab Environment for the Cluster Test System.
It defines those things which are expected to change from test
environment to test environment for the same cluster manager.
It is where you define the set of nodes that are in your test lab
what kind of reset mechanism you use, etc.
- This class is derived from a UserDict because we hold many
- different parameters of different kinds, and this provides
- a uniform and extensible interface useful for any kind of
- communication between the user/administrator/tester and CTS.
-
At this point in time, it is the intent of this class to model static
configuration and/or environmental data about the environment which
doesn't change as the tests proceed.
Well-known names (keys) are an important concept in this class.
The HasMinimalKeys member function knows the minimal set of
well-known names for the class.
The following names are standard (well-known) at this time:
nodes An array of the nodes in the cluster
reset A ResetMechanism object
logger An array of objects that log strings...
CMclass The type of ClusterManager we are running
(This is a class object, not a class instance)
RandSeed Random seed. It is a triple of bytes. (optional)
The CTS code ignores names it doesn't know about/need.
The individual tests have access to this information, and it is
perfectly acceptable to provide hints, tweaks, fine-tuning
directions or other information to the tests through this mechanism.
'''
def __init__(self, args=None):
self.Env = EnvFactory().getInstance(args)
self.Scenario = None
self.logger = LogFactory()
self.rsh = RemoteFactory().getInstance()
def dump(self):
self.Env.dump()
def has_key(self, key):
return key in list(self.Env.keys())
def __getitem__(self, key):
return self.Env[key]
def __setitem__(self, key, value):
self.Env[key] = value
def run(self, Scenario, Iterations):
if not Scenario:
self.logger.log("No scenario was defined")
return 1
self.logger.log("Cluster nodes: ")
for node in self.Env["nodes"]:
self.logger.log(" * %s" % (node))
if not Scenario.SetUp():
return 1
try :
Scenario.run(Iterations)
except :
self.logger.log("Exception by %s" % sys.exc_info()[0])
self.logger.traceback(traceback)
Scenario.summarize()
Scenario.TearDown()
return 1
#ClusterManager.oprofileSave(Iterations)
Scenario.TearDown()
Scenario.summarize()
if Scenario.Stats["failure"] > 0:
return Scenario.Stats["failure"]
elif Scenario.Stats["success"] != Iterations:
self.logger.log("No failure count but success != requested iterations")
return 1
return 0
def __CheckNode(self, node):
"Raise a ValueError if the given node isn't valid"
if not self.IsValidNode(node):
raise ValueError("Invalid node [%s] in CheckNode" % node)
class NodeStatus(object):
def __init__(self, env):
self.Env = env
def IsNodeBooted(self, node):
'''Return TRUE if the given node is booted (responds to pings)'''
if self.Env["docker"]:
return RemoteFactory().getInstance()("localhost", "docker inspect --format {{.State.Running}} %s | grep -q true" % node, silent=True) == 0
return RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, silent=True) == 0
def IsSshdUp(self, node):
rc = RemoteFactory().getInstance()(node, "true", silent=True)
return rc == 0
def WaitForNodeToComeUp(self, node, Timeout=300):
'''Return TRUE when given node comes up, or None/FALSE if timeout'''
timeout = Timeout
anytimeouts = 0
while timeout > 0:
if self.IsNodeBooted(node) and self.IsSshdUp(node):
if anytimeouts:
# Fudge to wait for the system to finish coming up
time.sleep(30)
LogFactory().debug("Node %s now up" % node)
return 1
time.sleep(30)
if (not anytimeouts):
LogFactory().debug("Waiting for node %s to come up" % node)
anytimeouts = 1
timeout = timeout - 1
LogFactory().log("%s did not come up within %d tries" % (node, Timeout))
if self.Env["continue"] == 1:
answer = "Y"
else:
try:
answer = input_wrapper('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("%s did not come up within %d tries" % (node, Timeout))
def WaitForAllNodesToComeUp(self, nodes, timeout=300):
'''Return TRUE when all nodes come up, or FALSE if timeout'''
for node in nodes:
if not self.WaitForNodeToComeUp(node, timeout):
return None
return 1
-class ClusterManager(UserDict):
- '''The Cluster Manager class.
- This is an subclass of the Python dictionary class.
- (this is because it contains lots of {name,value} pairs,
- not because it's behavior is that terribly similar to a
- dictionary in other ways.)
-
- This is an abstract class which class implements high-level
- operations on the cluster and/or its cluster managers.
- Actual cluster managers classes are subclassed from this type.
-
- One of the things we do is track the state we think every node should
- be in.
- '''
-
- def __InitialConditions(self):
- #if os.geteuid() != 0:
- # raise ValueError("Must Be Root!")
- None
-
- def _finalConditions(self):
- for key in list(self.keys()):
- if self[key] == None:
- raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.")
-
- def __init__(self, Environment, randseed=None):
- self.Env = EnvFactory().getInstance()
- self.templates = PatternSelector(self.Env["Name"])
- self.__InitialConditions()
- self.logger = LogFactory()
- self.TestLoggingLevel=0
- self.data = {}
- self.name = self.Env["Name"]
-
- self.rsh = RemoteFactory().getInstance()
- self.ShouldBeStatus={}
- self.ns = NodeStatus(self.Env)
- self.OurNode = os.uname()[1].lower()
- self.__instance_errorstoignore = []
-
- def __getitem__(self, key):
- if key == "Name":
- return self.name
-
- print("FIXME: Getting %s from %s" % (key, repr(self)))
- if key in self.data:
- return self.data[key]
-
- return self.templates.get_patterns(self.Env["Name"], key)
-
- def __setitem__(self, key, value):
- print("FIXME: Setting %s=%s on %s" % (key, value, repr(self)))
- self.data[key] = value
-
- def key_for_node(self, node):
- return node
-
- def instance_errorstoignore_clear(self):
- '''Allows the test scenario to reset instance errors to ignore on each iteration.'''
- self.__instance_errorstoignore = []
-
- def instance_errorstoignore(self):
- '''Return list of errors which are 'normal' for a specific test instance'''
- return self.__instance_errorstoignore
-
- def log(self, args):
- self.logger.log(args)
-
- def debug(self, args):
- self.logger.debug(args)
-
- def upcount(self):
- '''How many nodes are up?'''
- count = 0
- for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] == "up":
- count = count + 1
- return count
-
- def install_support(self, command="install"):
- for node in self.Env["nodes"]:
- self.rsh(node, CTSvars.CRM_DAEMON_DIR + "/cts-support " + command)
-
- def prepare_fencing_watcher(self, name):
- # If we don't have quorum now but get it as a result of starting this node,
- # then a bunch of nodes might get fenced
- upnode = None
- if self.HasQuorum(None):
- self.debug("Have quorum")
- return None
-
- if not self.templates["Pat:Fencing_start"]:
- print("No start pattern")
- return None
-
- if not self.templates["Pat:Fencing_ok"]:
- print("No ok pattern")
- return None
-
- stonith = None
- stonithPats = []
- for peer in self.Env["nodes"]:
- if self.ShouldBeStatus[peer] != "up":
- stonithPats.append(self.templates["Pat:Fencing_ok"] % peer)
- stonithPats.append(self.templates["Pat:Fencing_start"] % peer)
-
- stonith = LogWatcher(self.Env["LogFileName"], stonithPats, "StartupFencing", 0, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
- stonith.setwatch()
- return stonith
-
- def fencing_cleanup(self, node, stonith):
- peer_list = []
- peer_state = {}
-
- self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
-
- # If we just started a node, we may now have quorum (and permission to fence)
- if not stonith:
- self.debug("Nothing to do")
- return peer_list
-
- q = self.HasQuorum(None)
- if not q and len(self.Env["nodes"]) > 2:
- # We didn't gain quorum - we shouldn't have shot anyone
- self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"])))
- return peer_list
-
- for n in self.Env["nodes"]:
- peer_state[n] = "unknown"
-
- # Now see if any states need to be updated
- self.debug("looking for: " + repr(stonith.regexes))
- shot = stonith.look(0)
- while shot:
- line = repr(shot)
- self.debug("Found: " + line)
- del stonith.regexes[stonith.whichmatch]
-
- # Extract node name
- for n in self.Env["nodes"]:
- if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
- peer = n
- peer_state[peer] = "complete"
- self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer)
-
- elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
- # TODO: Correctly detect multiple fencing operations for the same host
- peer = n
- peer_state[peer] = "in-progress"
- self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer)
-
- if not peer:
- self.logger.log("ERROR: Unknown stonith match: %s" % line)
-
- elif not peer in peer_list:
- self.debug("Found peer: " + peer)
- peer_list.append(peer)
-
- # Get the next one
- shot = stonith.look(60)
-
- for peer in peer_list:
-
- self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
- if self.Env["at-boot"]:
- self.ShouldBeStatus[peer] = "up"
- else:
- self.ShouldBeStatus[peer] = "down"
-
- if peer_state[peer] == "in-progress":
- # Wait for any in-progress operations to complete
- shot = stonith.look(60)
- while len(stonith.regexes) and shot:
- line = repr(shot)
- self.debug("Found: " + line)
- del stonith.regexes[stonith.whichmatch]
- shot = stonith.look(60)
-
- # Now make sure the node is alive too
- self.ns.WaitForNodeToComeUp(peer, self.Env["DeadTime"])
-
- # Poll until it comes up
- if self.Env["at-boot"]:
- if not self.StataCM(peer):
- time.sleep(self.Env["StartTime"])
-
- if not self.StataCM(peer):
- self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
- return None
-
- return peer_list
-
- def StartaCM(self, node, verbose=False):
-
- '''Start up the cluster manager on a given node'''
- if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
- else: self.debug("Starting %s on node %s" % (self.templates["Name"], node))
- ret = 1
-
- if not node in self.ShouldBeStatus:
- self.ShouldBeStatus[node] = "down"
-
- if self.ShouldBeStatus[node] != "down":
- return 1
-
- patterns = []
- # Technically we should always be able to notice ourselves starting
- patterns.append(self.templates["Pat:Local_started"] % node)
- if self.upcount() == 0:
- patterns.append(self.templates["Pat:DC_started"] % node)
- else:
- patterns.append(self.templates["Pat:NonDC_started"] % node)
-
- watch = LogWatcher(
- self.Env["LogFileName"], patterns, "StartaCM", self.Env["StartTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
-
- self.install_config(node)
-
- self.ShouldBeStatus[node] = "any"
- if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
- self.logger.log ("%s was already started" % (node))
- return 1
-
- stonith = self.prepare_fencing_watcher(node)
- watch.setwatch()
-
- if self.rsh(node, self.templates["StartCmd"]) != 0:
- self.logger.log ("Warn: Start command failed on node %s" % (node))
- self.fencing_cleanup(node, stonith)
- return None
-
- self.ShouldBeStatus[node] = "up"
- watch_result = watch.lookforall()
-
- if watch.unmatched:
- for regex in watch.unmatched:
- self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
-
- if watch_result and self.cluster_stable(self.Env["DeadTime"]):
- #self.debug("Found match: "+ repr(watch_result))
- self.fencing_cleanup(node, stonith)
- return 1
-
- elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
- self.fencing_cleanup(node, stonith)
- return 1
-
- self.logger.log ("Warn: Start failed for node %s" % (node))
- return None
-
- def StartaCMnoBlock(self, node, verbose=False):
-
- '''Start up the cluster manager on a given node with none-block mode'''
-
- if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node))
- else: self.debug("Starting %s on node %s" % (self["Name"], node))
-
- self.install_config(node)
- self.rsh(node, self.templates["StartCmd"], synchronous=0)
- self.ShouldBeStatus[node] = "up"
- return 1
-
- def StopaCM(self, node, verbose=False, force=False):
-
- '''Stop the cluster manager on a given node'''
-
- if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node))
- else: self.debug("Stopping %s on node %s" % (self["Name"], node))
-
- if self.ShouldBeStatus[node] != "up" and force == False:
- return 1
-
- if self.rsh(node, self.templates["StopCmd"]) == 0:
- # Make sure we can continue even if corosync leaks
- # fdata-* is the old name
- #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*")
- self.ShouldBeStatus[node] = "down"
- self.cluster_stable(self.Env["DeadTime"])
- return 1
- else:
- self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
-
- return None
-
- def StopaCMnoBlock(self, node):
-
- '''Stop the cluster manager on a given node with none-block mode'''
-
- self.debug("Stopping %s on node %s" % (self["Name"], node))
-
- self.rsh(node, self.templates["StopCmd"], synchronous=0)
- self.ShouldBeStatus[node] = "down"
- return 1
-
- def RereadCM(self, node):
-
- '''Force the cluster manager on a given node to reread its config
- This may be a no-op on certain cluster managers.
- '''
- rc=self.rsh(node, self.templates["RereadCmd"])
- if rc == 0:
- return 1
- else:
- self.logger.log ("Could not force %s on node %s to reread its config"
- % (self["Name"], node))
- return None
-
- def startall(self, nodelist=None, verbose=False, quick=False):
-
- '''Start the cluster manager on every node in the cluster.
- We can do it on a subset of the cluster if nodelist is not None.
- '''
- map = {}
- if not nodelist:
- nodelist = self.Env["nodes"]
-
- for node in nodelist:
- if self.ShouldBeStatus[node] == "down":
- self.ns.WaitForAllNodesToComeUp(nodelist, 300)
-
- if not quick:
- # This is used for "basic sanity checks", so only start one node ...
- if not self.StartaCM(node, verbose=verbose):
- return 0
- return 1
-
- # Approximation of SimulStartList for --boot
- watchpats = [ ]
- watchpats.append(self.templates["Pat:DC_IDLE"])
- for node in nodelist:
- watchpats.append(self.templates["Pat:InfraUp"] % node)
- watchpats.append(self.templates["Pat:PacemakerUp"] % node)
- watchpats.append(self.templates["Pat:Local_started"] % node)
- watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node))
-
- # Start all the nodes - at about the same time...
- watch = LogWatcher(self.Env["LogFileName"], watchpats, "fast-start", self.Env["DeadTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
- watch.setwatch()
-
- if not self.StartaCM(nodelist[0], verbose=verbose):
- return 0
- for node in nodelist:
- self.StartaCMnoBlock(node, verbose=verbose)
-
- watch.lookforall()
- if watch.unmatched:
- for regex in watch.unmatched:
- self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
-
- if not self.cluster_stable():
- self.logger.log("Cluster did not stabilize")
- return 0
-
- return 1
-
- def stopall(self, nodelist=None, verbose=False, force=False):
-
- '''Stop the cluster managers on every node in the cluster.
- We can do it on a subset of the cluster if nodelist is not None.
- '''
-
- ret = 1
- map = {}
- if not nodelist:
- nodelist = self.Env["nodes"]
- for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] == "up" or force == True:
- if not self.StopaCM(node, verbose=verbose, force=force):
- ret = 0
- return ret
-
- def rereadall(self, nodelist=None):
-
- '''Force the cluster managers on every node in the cluster
- to reread their config files. We can do it on a subset of the
- cluster if nodelist is not None.
- '''
-
- map = {}
- if not nodelist:
- nodelist = self.Env["nodes"]
- for node in self.Env["nodes"]:
- if self.ShouldBeStatus[node] == "up":
- self.RereadCM(node)
-
- def statall(self, nodelist=None):
-
- '''Return the status of the cluster managers in the cluster.
- We can do it on a subset of the cluster if nodelist is not None.
- '''
-
- result = {}
- if not nodelist:
- nodelist = self.Env["nodes"]
- for node in nodelist:
- if self.StataCM(node):
- result[node] = "up"
- else:
- result[node] = "down"
- return result
-
- def isolate_node(self, target, nodes=None):
- '''isolate the communication between the nodes'''
- if not nodes:
- nodes = self.Env["nodes"]
-
- for node in nodes:
- if node != target:
- rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
- if rc != 0:
- self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
- return None
- else:
- self.debug("Communication cut between %s and %s" % (target, node))
- return 1
-
- def unisolate_node(self, target, nodes=None):
- '''fix the communication between the nodes'''
- if not nodes:
- nodes = self.Env["nodes"]
-
- for node in nodes:
- if node != target:
- restored = 0
-
- # Limit the amount of time we have asynchronous connectivity for
- # Restore both sides as simultaneously as possible
- self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=0)
- self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=0)
- self.debug("Communication restored between %s and %s" % (target, node))
-
- def reducecomm_node(self,node):
- '''reduce the communication between the nodes'''
- rc = self.rsh(node, self.templates["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"]))
- if rc == 0:
- return 1
- else:
- self.logger.log("Could not reduce the communication between the nodes from node: %s" % node)
- return None
-
- def restorecomm_node(self,node):
- '''restore the saved communication between the nodes'''
- rc = 0
- if float(self.Env["XmitLoss"]) != 0 or float(self.Env["RecvLoss"]) != 0 :
- rc = self.rsh(node, self.templates["RestoreCommCmd"]);
- if rc == 0:
- return 1
- else:
- self.logger.log("Could not restore the communication between the nodes from node: %s" % node)
- return None
-
- def oprofileStart(self, node=None):
- if not node:
- for n in self.Env["oprofile"]:
- self.oprofileStart(n)
-
- elif node in self.Env["oprofile"]:
- self.debug("Enabling oprofile on %s" % node)
- self.rsh(node, "opcontrol --init")
- self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
- self.rsh(node, "opcontrol --start")
- self.rsh(node, "opcontrol --reset")
-
- def oprofileSave(self, test, node=None):
- if not node:
- for n in self.Env["oprofile"]:
- self.oprofileSave(test, n)
-
- elif node in self.Env["oprofile"]:
- self.rsh(node, "opcontrol --dump")
- self.rsh(node, "opcontrol --save=cts.%d" % test)
- # Read back with: opreport -l session:cts.0 image:<directory>/c*
- if None:
- self.rsh(node, "opcontrol --reset")
- else:
- self.oprofileStop(node)
- self.oprofileStart(node)
-
- def oprofileStop(self, node=None):
- if not node:
- for n in self.Env["oprofile"]:
- self.oprofileStop(n)
-
- elif node in self.Env["oprofile"]:
- self.debug("Stopping oprofile on %s" % node)
- self.rsh(node, "opcontrol --reset")
- self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
-
-
- def StatsExtract(self):
- if not self.Env["stats"]:
- return
-
- for host in self.Env["nodes"]:
- log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR
- if host in has_log_stats:
- self.rsh(host, '''bash %s %s stop''' % (log_stats_bin, log_stats_file))
- (rc, lines) = self.rsh(host, '''cat %s''' % log_stats_file, stdout=2)
- self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
-
- fname = "cts-stats-%d-nodes-%s.csv" % (len(self.Env["nodes"]), host)
- print("Extracted stats: %s" % fname)
- fd = open(fname, "a")
- fd.writelines(lines)
- fd.close()
-
- def StatsMark(self, testnum):
- '''Mark the test number in the stats log'''
-
- global has_log_stats
- if not self.Env["stats"]:
- return
-
- for host in self.Env["nodes"]:
- log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR
- if not host in has_log_stats:
-
- global log_stats
- global log_stats_bin
- script=log_stats
- #script = re.sub("\\\\", "\\\\", script)
- script = re.sub('\"', '\\\"', script)
- script = re.sub("'", "\'", script)
- script = re.sub("`", "\`", script)
- script = re.sub("\$", "\\\$", script)
-
- self.debug("Installing %s on %s" % (log_stats_bin, host))
- self.rsh(host, '''echo "%s" > %s''' % (script, log_stats_bin), silent=True)
- self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
- has_log_stats[host] = 1
-
- # Now mark it
- self.rsh(host, '''bash %s %s mark %s''' % (log_stats_bin, log_stats_file, testnum), synchronous=0)
-
-
class Component(object):
def kill(self, node):
None
class Process(Component):
def __init__(self, cm, name, process=None, dc_only=0, pats=[], dc_pats=[], badnews_ignore=[], common_ignore=[], triggersreboot=0):
self.name = str(name)
self.dc_only = dc_only
self.pats = pats
self.dc_pats = dc_pats
self.CM = cm
self.badnews_ignore = badnews_ignore
self.badnews_ignore.extend(common_ignore)
self.triggersreboot = triggersreboot
if process:
self.proc = str(process)
else:
self.proc = str(name)
self.KillCmd = "killall -9 " + self.proc
def kill(self, node):
if self.CM.rsh(node, self.KillCmd) != 0:
self.CM.log ("ERROR: Kill %s failed on node %s" % (self.name,node))
return None
return 1
diff --git a/cts/lab/CTSlab.py.in b/cts/lab/CTSlab.py.in
index c81903050d..7bca3bd204 100644
--- a/cts/lab/CTSlab.py.in
+++ b/cts/lab/CTSlab.py.in
@@ -1,127 +1,128 @@
#!@PYTHON@
""" Command-line interface to Pacemaker's Cluster Test Suite (CTS)
"""
-__copyright__ = "Copyright 2001-2020 the Pacemaker project contributors"
+__copyright__ = "Copyright 2001-2021 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys, signal, os
pdir = os.path.dirname(sys.path[0])
sys.path.insert(0, pdir) # So that things work from the source directory
try:
+ from cts.CTS import *
from cts.CTSvars import *
from cts.CM_corosync import *
from cts.CTSaudits import AuditList
from cts.CTStests import TestList
from cts.CTSscenarios import *
from cts.logging import LogFactory
except ImportError as e:
sys.stderr.write("abort: %s\n" % e)
sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" %
' '.join(sys.path))
sys.exit(1)
# These are globals so they can be used by the signal handler.
scenario = None
LogFactory().add_stderr()
def sig_handler(signum, frame) :
LogFactory().log("Interrupted by signal %d"%signum)
if scenario: scenario.summarize()
if signum == 15 :
if scenario: scenario.TearDown()
sys.exit(1)
if __name__ == '__main__':
Environment = CtsLab(sys.argv[1:])
NumIter = Environment["iterations"]
if NumIter is None:
NumIter = 1
Tests = []
# Set the signal handler
signal.signal(15, sig_handler)
signal.signal(10, sig_handler)
# Create the Cluster Manager object
cm = None
if Environment["Stack"] == "corosync 2+":
cm = crm_corosync(Environment)
else:
LogFactory().log("Unknown stack: "+Environment["stack"])
sys.exit(1)
if Environment["TruncateLog"] == 1:
if Environment["OutputFile"] is None:
LogFactory().log("Ignoring truncate request because no output file specified")
else:
LogFactory().log("Truncating %s" % Environment["OutputFile"])
with open(Environment["OutputFile"], "w") as outputfile:
outputfile.truncate(0)
Audits = AuditList(cm)
if Environment["ListTests"] == 1:
Tests = TestList(cm, Audits)
LogFactory().log("Total %d tests"%len(Tests))
for test in Tests :
LogFactory().log(str(test.name));
sys.exit(0)
elif len(Environment["tests"]) == 0:
Tests = TestList(cm, Audits)
else:
Chosen = Environment["tests"]
for TestCase in Chosen:
match = None
for test in TestList(cm, Audits):
if test.name == TestCase:
match = test
if not match:
LogFactory().log("--choose: No applicable/valid tests chosen")
sys.exit(1)
else:
Tests.append(match)
# Scenario selection
if Environment["scenario"] == "basic-sanity":
scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests)
elif Environment["scenario"] == "all-once":
NumIter = len(Tests)
scenario = AllOnce(
cm, [ BootCluster(Environment), PacketLoss(Environment) ], Audits, Tests)
elif Environment["scenario"] == "sequence":
scenario = Sequence(
cm, [ BootCluster(Environment), PacketLoss(Environment) ], Audits, Tests)
elif Environment["scenario"] == "boot":
scenario = Boot(cm, [ LeaveBooted(Environment)], Audits, [])
else:
scenario = RandomTests(
cm, [ BootCluster(Environment), PacketLoss(Environment) ], Audits, Tests)
LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TESTS ")
LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"]))
LogFactory().log("Schema: %s" % Environment["Schema"])
LogFactory().log("Scenario: %s" % scenario.__doc__)
LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"])
LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"])
LogFactory().log("Random Seed: %s" % Environment["RandSeed"])
LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip())
LogFactory().log("System log files: %s" % Environment["LogFileName"])
if Environment.has_key("IPBase"):
LogFactory().log("Base IP for resources: %s" % Environment["IPBase"])
LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"])
Environment.dump()
rc = Environment.run(scenario, NumIter)
sys.exit(rc)
diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py
index b411a6a406..750b0b11c6 100644
--- a/cts/lab/CTSscenarios.py
+++ b/cts/lab/CTSscenarios.py
@@ -1,598 +1,602 @@
""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS)
"""
-__copyright__ = "Copyright 2000-2020 the Pacemaker project contributors"
+__copyright__ = "Copyright 2000-2021 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
-from cts.CTS import *
+import os
+import re
+import sys
+import time
+
from cts.CTStests import CTSTest
from cts.CTSaudits import ClusterAudit
from cts.watcher import LogWatcher
from cts.remote import input_wrapper
class ScenarioComponent(object):
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''Return TRUE if the current ScenarioComponent is applicable
in the given LabEnvironment given to the constructor.
'''
raise ValueError("Abstract Class member (IsApplicable)")
def SetUp(self, CM):
'''Set up the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
def TearDown(self, CM):
'''Tear down (undo) the given ScenarioComponent'''
raise ValueError("Abstract Class member (Setup)")
class Scenario(object):
(
'''The basic idea of a scenario is that of an ordered list of
ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn,
and then after the tests have been run, they are torn down using TearDown()
(in reverse order).
A Scenario is applicable to a particular cluster manager iff each
ScenarioComponent is applicable.
A partially set up scenario is torn down if it fails during setup.
''')
def __init__(self, ClusterManager, Components, Audits, Tests):
"Initialize the Scenario from the list of ScenarioComponents"
self.ClusterManager = ClusterManager
self.Components = Components
self.Audits = Audits
self.Tests = Tests
self.BadNews = None
self.TestSets = []
self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0}
self.Sets = []
#self.ns=CTS.NodeStatus(self.Env)
for comp in Components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in Audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in Tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def IsApplicable(self):
(
'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable()
'''
)
for comp in self.Components:
if not comp.IsApplicable():
return None
return 1
def SetUp(self):
'''Set up the Scenario. Return TRUE on success.'''
self.ClusterManager.prepare()
self.audit() # Also detects remote/local log config
self.ClusterManager.StatsMark(0)
self.ClusterManager.ns.WaitForAllNodesToComeUp(self.ClusterManager.Env["nodes"])
self.audit()
self.ClusterManager.install_support()
self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"],
self.ClusterManager.templates.get_patterns(
self.ClusterManager.Env["Name"], "BadNews"), "BadNews", 0,
kind=self.ClusterManager.Env["LogWatcher"],
hosts=self.ClusterManager.Env["nodes"])
self.BadNews.setwatch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self.Components):
if not self.Components[j].SetUp(self.ClusterManager):
# OOPS! We failed. Tear partial setups down.
self.audit()
self.ClusterManager.log("Tearing down partial setup")
self.TearDown(j)
return None
j = j + 1
self.audit()
return 1
def TearDown(self, max=None):
'''Tear Down the Scenario - in reverse order.'''
if max == None:
max = len(self.Components)-1
j = max
while j >= 0:
self.Components[j].TearDown(self.ClusterManager)
j = j - 1
self.audit()
self.ClusterManager.StatsExtract()
self.ClusterManager.install_support("uninstall")
def incr(self, name):
'''Increment (or initialize) the value associated with the given name'''
if not name in self.Stats:
self.Stats[name] = 0
self.Stats[name] = self.Stats[name]+1
def run(self, Iterations):
self.ClusterManager.oprofileStart()
try:
self.run_loop(Iterations)
self.ClusterManager.oprofileStop()
except:
self.ClusterManager.oprofileStop()
raise
def run_loop(self, Iterations):
raise ValueError("Abstract Class member (run_loop)")
def run_test(self, test, testcount):
nodechoice = self.ClusterManager.Env.RandomNode()
ret = 1
where = ""
did_run = 0
self.ClusterManager.StatsMark(testcount)
self.ClusterManager.instance_errorstoignore_clear()
self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]")
starttime = test.set_timer()
if not test.setup(nodechoice):
self.ClusterManager.log("Setup failed")
ret = 0
elif not test.canrunnow(nodechoice):
self.ClusterManager.log("Skipped")
test.skipped()
else:
did_run = 1
ret = test(nodechoice)
if not test.teardown(nodechoice):
self.ClusterManager.log("Teardown failed")
if self.ClusterManager.Env["continue"] == 1:
answer = "Y"
else:
try:
answer = input_wrapper('Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = 0
stoptime = time.time()
self.ClusterManager.oprofileSave(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if not test["min_time"]:
test["elapsed_time"] = elapsed_time
test["min_time"] = test_time
test["max_time"] = test_time
else:
test["elapsed_time"] = test["elapsed_time"] + elapsed_time
if test_time < test["min_time"]:
test["min_time"] = test_time
if test_time > test["max_time"]:
test["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self.ClusterManager.statall()
did_run = 1 # Force the test count to be incremented anyway so test extraction works
self.audit(test.errorstoignore())
return did_run
def summarize(self):
self.ClusterManager.log("****************")
self.ClusterManager.log("Overall Results:" + repr(self.Stats))
self.ClusterManager.log("****************")
stat_filter = {
"calls":0,
"failure":0,
"skipped":0,
"auditfail":0,
}
self.ClusterManager.log("Test Summary")
for test in self.Tests:
for key in list(stat_filter.keys()):
stat_filter[key] = test.Stats[key]
self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter))
self.ClusterManager.debug("Detailed Results")
for test in self.Tests:
self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats))
self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, LocalIgnore=[]):
errcount = 0
ignorelist = []
ignorelist.append("CTS:")
ignorelist.extend(LocalIgnore)
ignorelist.extend(self.ClusterManager.errorstoignore())
ignorelist.extend(self.ClusterManager.instance_errorstoignore())
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self.Audits:
if not audit():
self.ClusterManager.log("Audit " + audit.name() + " FAILED.")
failed += 1
else:
self.ClusterManager.debug("Audit " + audit.name() + " passed.")
while errcount < 1000:
match = None
if self.BadNews:
match = self.BadNews.look(0)
if match:
add_err = 1
for ignore in ignorelist:
if add_err == 1 and re.search(ignore, match):
add_err = 0
if add_err == 1:
self.ClusterManager.log("BadNews: " + match)
self.incr("BadNews")
errcount = errcount + 1
else:
break
else:
if self.ClusterManager.Env["continue"] == 1:
answer = "Y"
else:
try:
answer = input_wrapper('Big problems. Continue? [nY]')
except EOFError as e:
answer = "n"
if answer and answer == "n":
self.ClusterManager.log("Shutting down.")
self.summarize()
self.TearDown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self.BadNews:
self.BadNews.end()
return failed
class AllOnce(Scenario):
'''Every Test Once''' # Accessable as __doc__
def run_loop(self, Iterations):
testcount = 1
for test in self.Tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
'''Random Test Execution'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
test = self.ClusterManager.Env.RandomGen.choice(self.Tests)
self.run_test(test, testcount)
testcount += 1
class BasicSanity(Scenario):
'''Basic Cluster Sanity'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
test = self.Environment.RandomGen.choice(self.Tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
'''Named Tests in Sequence'''
def run_loop(self, Iterations):
testcount = 1
while testcount <= Iterations:
for test in self.Tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
'''Start the Cluster'''
def run_loop(self, Iterations):
testcount = 0
class BootCluster(ScenarioComponent):
(
'''BootCluster is the most basic of ScenarioComponents.
This ScenarioComponent simply starts the cluster manager on all the nodes.
It is fairly robust as it waits for all nodes to come up before starting
as they might have been rebooted or crashed for some reason beforehand.
''')
def __init__(self, Env):
pass
def IsApplicable(self):
'''BootCluster is so generic it is always Applicable'''
return 1
def SetUp(self, CM):
'''Basic Cluster Manager startup. Start everything'''
CM.prepare()
# Clear out the cobwebs ;-)
CM.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all nodes.")
return CM.startall(verbose=True, quick=True)
def TearDown(self, CM, force=False):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Stopping Cluster Manager on all nodes")
return CM.stopall(verbose=True, force=force)
class LeaveBooted(BootCluster):
def TearDown(self, CM):
'''Set up the given ScenarioComponent'''
# Stop the cluster manager everywhere
CM.log("Leaving Cluster running on all nodes")
return 1
class PingFest(ScenarioComponent):
(
'''PingFest does a flood ping to each node in the cluster from the test machine.
If the LabEnvironment Parameter PingSize is set, it will be used as the size
of ping packet requested (via the -s option). If it is not set, it defaults
to 1024 bytes.
According to the manual page for ping:
Outputs packets as fast as they come back or one hundred times per
second, whichever is more. For every ECHO_REQUEST sent a period ``.''
is printed, while for every ECHO_REPLY received a backspace is printed.
This provides a rapid display of how many packets are being dropped.
Only the super-user may use this option. This can be very hard on a net-
work and should be used with caution.
''' )
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''PingFests are always applicable ;-)
'''
return 1
def SetUp(self, CM):
'''Start the PingFest!'''
self.PingSize = 1024
if "PingSize" in list(CM.Env.keys()):
self.PingSize = CM.Env["PingSize"]
CM.log("Starting %d byte flood pings" % self.PingSize)
self.PingPids = []
for node in CM.Env["nodes"]:
self.PingPids.append(self._pingchild(node))
CM.log("Ping PIDs: " + repr(self.PingPids))
return 1
def TearDown(self, CM):
'''Stop it right now! My ears are pinging!!'''
for pid in self.PingPids:
if pid != None:
CM.log("Stopping ping process %d" % pid)
os.kill(pid, signal.SIGKILL)
def _pingchild(self, node):
Args = ["ping", "-qfn", "-s", str(self.PingSize), node]
sys.stdin.flush()
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid < 0:
self.Env.log("Cannot fork ping child")
return None
if pid > 0:
return pid
# Otherwise, we're the child process.
os.execvp("ping", Args)
self.Env.log("Cannot execvp ping: " + repr(Args))
sys.exit(1)
class PacketLoss(ScenarioComponent):
(
'''
It would be useful to do some testing of CTS with a modest amount of packet loss
enabled - so we could see that everything runs like it should with a certain
amount of packet loss present.
''')
def IsApplicable(self):
'''always Applicable'''
return 1
def SetUp(self, CM):
'''Reduce the reliability of communications'''
if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
return 1
for node in CM.Env["nodes"]:
CM.reducecomm_node(node)
CM.log("Reduce the reliability of communications")
return 1
def TearDown(self, CM):
'''Fix the reliability of communications'''
if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
return 1
for node in CM.Env["nodes"]:
CM.unisolate_node(node)
CM.log("Fix the reliability of communications")
class BasicSanityCheck(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["DoBSC"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on BSC node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on BSC node(s).")
return CM.stopall()
class Benchmark(ScenarioComponent):
(
'''
''')
def IsApplicable(self):
return self.Env["benchmark"]
def SetUp(self, CM):
CM.prepare()
# Clear out the cobwebs
self.TearDown(CM, force=True)
# Now start the Cluster Manager on all the nodes.
CM.log("Starting Cluster Manager on all node(s).")
return CM.startall()
def TearDown(self, CM):
CM.log("Stopping Cluster Manager on all node(s).")
return CM.stopall()
class RollingUpgrade(ScenarioComponent):
(
'''
Test a rolling upgrade between two versions of the stack
''')
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
if not self.Env["rpm-dir"]:
return None
if not self.Env["current-version"]:
return None
if not self.Env["previous-version"]:
return None
return 1
def install(self, node, version):
target_dir = "/tmp/rpm-%s" % version
src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version)
rc = self.CM.rsh(node, "mkdir -p %s" % target_dir)
rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir))
rc = self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir))
return self.success()
def upgrade(self, node):
return self.install(node, self.CM.Env["current-version"])
def downgrade(self, node):
return self.install(node, self.CM.Env["previous-version"])
def SetUp(self, CM):
print(repr(self)+"prepare")
CM.prepare()
# Clear out the cobwebs
CM.stopall(force=True)
CM.log("Downgrading all nodes to %s." % self.Env["previous-version"])
for node in self.Env["nodes"]:
if not self.downgrade(node):
CM.log("Couldn't downgrade %s" % node)
return None
return 1
def TearDown(self, CM):
# Stop everything
CM.log("Stopping Cluster Manager on Upgrade nodes.")
CM.stopall()
CM.log("Upgrading all nodes to %s." % self.Env["current-version"])
for node in self.Env["nodes"]:
if not self.upgrade(node):
CM.log("Couldn't upgrade %s" % node)
return None
return 1
diff --git a/cts/lab/CTS.py b/cts/lab/ClusterManager.py
similarity index 77%
copy from cts/lab/CTS.py
copy to cts/lab/ClusterManager.py
index 6d8ead37d2..824fe709a0 100644
--- a/cts/lab/CTS.py
+++ b/cts/lab/ClusterManager.py
@@ -1,809 +1,630 @@
-""" Main classes for Pacemaker's Cluster Test Suite (CTS)
+""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS)
"""
__copyright__ = "Copyright 2000-2021 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import os
import re
-import sys
import time
-import traceback
from collections import UserDict
from cts.CTSvars import *
+from cts.CTS import NodeStatus
from cts.logging import LogFactory
from cts.watcher import LogWatcher
-from cts.remote import RemoteFactory, input_wrapper
+from cts.remote import RemoteFactory
from cts.environment import EnvFactory
from cts.patterns import PatternSelector
has_log_stats = {}
log_stats_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_stats.sh"
log_stats = """
#!%s
# Tool for generating system load reports while CTS runs
trap "" 1
f=$1; shift
action=$1; shift
base=`basename $0`
if [ ! -e $f ]; then
echo "Time, Load 1, Load 5, Load 15, Test Marker" > $f
fi
function killpid() {
if [ -e $f.pid ]; then
kill -9 `cat $f.pid`
rm -f $f.pid
fi
}
function status() {
if [ -e $f.pid ]; then
kill -0 `cat $f.pid`
return $?
else
return 1
fi
}
function start() {
# Is it already running?
if
status
then
return
fi
echo Active as $$
echo $$ > $f.pid
while [ 1 = 1 ]; do
uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
#top -b -c -n1 | grep -e usr/libexec/pacemaker | grep -v -e grep -e python | head -n 1 | sed s@/usr/libexec/pacemaker/@@ | awk '{print " 0, "$9", "$10", "$12}' | tr '\\n' ',' >> $f
echo 0 >> $f
sleep 5
done
}
case $action in
start)
start
;;
start-bg|bg)
# Use c --ssh -- ./stats.sh file start-bg
nohup $0 $f start >/dev/null 2>&1 </dev/null &
;;
stop)
killpid
;;
delete)
killpid
rm -f $f
;;
mark)
uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
echo " $*" >> $f
start
;;
*)
echo "Unknown action: $action."
;;
esac
""" % (CTSvars.BASH_PATH)
-class CtsLab(object):
- '''This class defines the Lab Environment for the Cluster Test System.
- It defines those things which are expected to change from test
- environment to test environment for the same cluster manager.
-
- It is where you define the set of nodes that are in your test lab
- what kind of reset mechanism you use, etc.
-
- This class is derived from a UserDict because we hold many
- different parameters of different kinds, and this provides
- a uniform and extensible interface useful for any kind of
- communication between the user/administrator/tester and CTS.
-
- At this point in time, it is the intent of this class to model static
- configuration and/or environmental data about the environment which
- doesn't change as the tests proceed.
-
- Well-known names (keys) are an important concept in this class.
- The HasMinimalKeys member function knows the minimal set of
- well-known names for the class.
-
- The following names are standard (well-known) at this time:
-
- nodes An array of the nodes in the cluster
- reset A ResetMechanism object
- logger An array of objects that log strings...
- CMclass The type of ClusterManager we are running
- (This is a class object, not a class instance)
- RandSeed Random seed. It is a triple of bytes. (optional)
-
- The CTS code ignores names it doesn't know about/need.
- The individual tests have access to this information, and it is
- perfectly acceptable to provide hints, tweaks, fine-tuning
- directions or other information to the tests through this mechanism.
- '''
-
- def __init__(self, args=None):
- self.Env = EnvFactory().getInstance(args)
- self.Scenario = None
- self.logger = LogFactory()
- self.rsh = RemoteFactory().getInstance()
-
- def dump(self):
- self.Env.dump()
-
- def has_key(self, key):
- return key in list(self.Env.keys())
-
- def __getitem__(self, key):
- return self.Env[key]
-
- def __setitem__(self, key, value):
- self.Env[key] = value
-
- def run(self, Scenario, Iterations):
- if not Scenario:
- self.logger.log("No scenario was defined")
- return 1
-
- self.logger.log("Cluster nodes: ")
- for node in self.Env["nodes"]:
- self.logger.log(" * %s" % (node))
-
- if not Scenario.SetUp():
- return 1
-
- try :
- Scenario.run(Iterations)
- except :
- self.logger.log("Exception by %s" % sys.exc_info()[0])
- self.logger.traceback(traceback)
-
- Scenario.summarize()
- Scenario.TearDown()
- return 1
-
- #ClusterManager.oprofileSave(Iterations)
- Scenario.TearDown()
-
- Scenario.summarize()
- if Scenario.Stats["failure"] > 0:
- return Scenario.Stats["failure"]
-
- elif Scenario.Stats["success"] != Iterations:
- self.logger.log("No failure count but success != requested iterations")
- return 1
-
- return 0
-
- def __CheckNode(self, node):
- "Raise a ValueError if the given node isn't valid"
-
- if not self.IsValidNode(node):
- raise ValueError("Invalid node [%s] in CheckNode" % node)
-
-class NodeStatus(object):
- def __init__(self, env):
- self.Env = env
-
- def IsNodeBooted(self, node):
- '''Return TRUE if the given node is booted (responds to pings)'''
- if self.Env["docker"]:
- return RemoteFactory().getInstance()("localhost", "docker inspect --format {{.State.Running}} %s | grep -q true" % node, silent=True) == 0
-
- return RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, silent=True) == 0
-
- def IsSshdUp(self, node):
- rc = RemoteFactory().getInstance()(node, "true", silent=True)
- return rc == 0
-
- def WaitForNodeToComeUp(self, node, Timeout=300):
- '''Return TRUE when given node comes up, or None/FALSE if timeout'''
- timeout = Timeout
- anytimeouts = 0
- while timeout > 0:
- if self.IsNodeBooted(node) and self.IsSshdUp(node):
- if anytimeouts:
- # Fudge to wait for the system to finish coming up
- time.sleep(30)
- LogFactory().debug("Node %s now up" % node)
- return 1
-
- time.sleep(30)
- if (not anytimeouts):
- LogFactory().debug("Waiting for node %s to come up" % node)
-
- anytimeouts = 1
- timeout = timeout - 1
-
- LogFactory().log("%s did not come up within %d tries" % (node, Timeout))
- if self.Env["continue"] == 1:
- answer = "Y"
- else:
- try:
- answer = input_wrapper('Continue? [nY]')
- except EOFError as e:
- answer = "n"
- if answer and answer == "n":
- raise ValueError("%s did not come up within %d tries" % (node, Timeout))
-
- def WaitForAllNodesToComeUp(self, nodes, timeout=300):
- '''Return TRUE when all nodes come up, or FALSE if timeout'''
-
- for node in nodes:
- if not self.WaitForNodeToComeUp(node, timeout):
- return None
- return 1
-
-
class ClusterManager(UserDict):
'''The Cluster Manager class.
This is an subclass of the Python dictionary class.
(this is because it contains lots of {name,value} pairs,
not because it's behavior is that terribly similar to a
dictionary in other ways.)
This is an abstract class which class implements high-level
operations on the cluster and/or its cluster managers.
Actual cluster managers classes are subclassed from this type.
One of the things we do is track the state we think every node should
be in.
'''
def __InitialConditions(self):
#if os.geteuid() != 0:
# raise ValueError("Must Be Root!")
None
def _finalConditions(self):
for key in list(self.keys()):
if self[key] == None:
raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.")
def __init__(self, Environment, randseed=None):
self.Env = EnvFactory().getInstance()
self.templates = PatternSelector(self.Env["Name"])
self.__InitialConditions()
self.logger = LogFactory()
self.TestLoggingLevel=0
self.data = {}
self.name = self.Env["Name"]
self.rsh = RemoteFactory().getInstance()
self.ShouldBeStatus={}
self.ns = NodeStatus(self.Env)
self.OurNode = os.uname()[1].lower()
self.__instance_errorstoignore = []
def __getitem__(self, key):
if key == "Name":
return self.name
print("FIXME: Getting %s from %s" % (key, repr(self)))
if key in self.data:
return self.data[key]
return self.templates.get_patterns(self.Env["Name"], key)
def __setitem__(self, key, value):
print("FIXME: Setting %s=%s on %s" % (key, value, repr(self)))
self.data[key] = value
def key_for_node(self, node):
return node
def instance_errorstoignore_clear(self):
'''Allows the test scenario to reset instance errors to ignore on each iteration.'''
self.__instance_errorstoignore = []
def instance_errorstoignore(self):
'''Return list of errors which are 'normal' for a specific test instance'''
return self.__instance_errorstoignore
def log(self, args):
self.logger.log(args)
def debug(self, args):
self.logger.debug(args)
def upcount(self):
'''How many nodes are up?'''
count = 0
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
count = count + 1
return count
def install_support(self, command="install"):
for node in self.Env["nodes"]:
self.rsh(node, CTSvars.CRM_DAEMON_DIR + "/cts-support " + command)
def prepare_fencing_watcher(self, name):
# If we don't have quorum now but get it as a result of starting this node,
# then a bunch of nodes might get fenced
upnode = None
if self.HasQuorum(None):
self.debug("Have quorum")
return None
if not self.templates["Pat:Fencing_start"]:
print("No start pattern")
return None
if not self.templates["Pat:Fencing_ok"]:
print("No ok pattern")
return None
stonith = None
stonithPats = []
for peer in self.Env["nodes"]:
if self.ShouldBeStatus[peer] != "up":
stonithPats.append(self.templates["Pat:Fencing_ok"] % peer)
stonithPats.append(self.templates["Pat:Fencing_start"] % peer)
stonith = LogWatcher(self.Env["LogFileName"], stonithPats, "StartupFencing", 0, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
stonith.setwatch()
return stonith
def fencing_cleanup(self, node, stonith):
peer_list = []
peer_state = {}
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
# If we just started a node, we may now have quorum (and permission to fence)
if not stonith:
self.debug("Nothing to do")
return peer_list
q = self.HasQuorum(None)
if not q and len(self.Env["nodes"]) > 2:
# We didn't gain quorum - we shouldn't have shot anyone
self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"])))
return peer_list
for n in self.Env["nodes"]:
peer_state[n] = "unknown"
# Now see if any states need to be updated
self.debug("looking for: " + repr(stonith.regexes))
shot = stonith.look(0)
while shot:
line = repr(shot)
self.debug("Found: " + line)
del stonith.regexes[stonith.whichmatch]
# Extract node name
for n in self.Env["nodes"]:
if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
peer = n
peer_state[peer] = "complete"
self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer)
elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
# TODO: Correctly detect multiple fencing operations for the same host
peer = n
peer_state[peer] = "in-progress"
self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer)
if not peer:
self.logger.log("ERROR: Unknown stonith match: %s" % line)
elif not peer in peer_list:
self.debug("Found peer: " + peer)
peer_list.append(peer)
# Get the next one
shot = stonith.look(60)
for peer in peer_list:
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
if self.Env["at-boot"]:
self.ShouldBeStatus[peer] = "up"
else:
self.ShouldBeStatus[peer] = "down"
if peer_state[peer] == "in-progress":
# Wait for any in-progress operations to complete
shot = stonith.look(60)
while len(stonith.regexes) and shot:
line = repr(shot)
self.debug("Found: " + line)
del stonith.regexes[stonith.whichmatch]
shot = stonith.look(60)
# Now make sure the node is alive too
self.ns.WaitForNodeToComeUp(peer, self.Env["DeadTime"])
# Poll until it comes up
if self.Env["at-boot"]:
if not self.StataCM(peer):
time.sleep(self.Env["StartTime"])
if not self.StataCM(peer):
self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
return None
return peer_list
def StartaCM(self, node, verbose=False):
'''Start up the cluster manager on a given node'''
if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
else: self.debug("Starting %s on node %s" % (self.templates["Name"], node))
ret = 1
if not node in self.ShouldBeStatus:
self.ShouldBeStatus[node] = "down"
if self.ShouldBeStatus[node] != "down":
return 1
patterns = []
# Technically we should always be able to notice ourselves starting
patterns.append(self.templates["Pat:Local_started"] % node)
if self.upcount() == 0:
patterns.append(self.templates["Pat:DC_started"] % node)
else:
patterns.append(self.templates["Pat:NonDC_started"] % node)
watch = LogWatcher(
self.Env["LogFileName"], patterns, "StartaCM", self.Env["StartTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
self.install_config(node)
self.ShouldBeStatus[node] = "any"
if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
self.logger.log ("%s was already started" % (node))
return 1
stonith = self.prepare_fencing_watcher(node)
watch.setwatch()
if self.rsh(node, self.templates["StartCmd"]) != 0:
self.logger.log ("Warn: Start command failed on node %s" % (node))
self.fencing_cleanup(node, stonith)
return None
self.ShouldBeStatus[node] = "up"
watch_result = watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
if watch_result and self.cluster_stable(self.Env["DeadTime"]):
#self.debug("Found match: "+ repr(watch_result))
self.fencing_cleanup(node, stonith)
return 1
elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
self.fencing_cleanup(node, stonith)
return 1
self.logger.log ("Warn: Start failed for node %s" % (node))
return None
def StartaCMnoBlock(self, node, verbose=False):
'''Start up the cluster manager on a given node with none-block mode'''
if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node))
else: self.debug("Starting %s on node %s" % (self["Name"], node))
self.install_config(node)
self.rsh(node, self.templates["StartCmd"], synchronous=0)
self.ShouldBeStatus[node] = "up"
return 1
def StopaCM(self, node, verbose=False, force=False):
'''Stop the cluster manager on a given node'''
if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node))
else: self.debug("Stopping %s on node %s" % (self["Name"], node))
if self.ShouldBeStatus[node] != "up" and force == False:
return 1
if self.rsh(node, self.templates["StopCmd"]) == 0:
# Make sure we can continue even if corosync leaks
# fdata-* is the old name
#self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*")
self.ShouldBeStatus[node] = "down"
self.cluster_stable(self.Env["DeadTime"])
return 1
else:
self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
return None
def StopaCMnoBlock(self, node):
'''Stop the cluster manager on a given node with none-block mode'''
self.debug("Stopping %s on node %s" % (self["Name"], node))
self.rsh(node, self.templates["StopCmd"], synchronous=0)
self.ShouldBeStatus[node] = "down"
return 1
def RereadCM(self, node):
'''Force the cluster manager on a given node to reread its config
This may be a no-op on certain cluster managers.
'''
rc=self.rsh(node, self.templates["RereadCmd"])
if rc == 0:
return 1
else:
self.logger.log ("Could not force %s on node %s to reread its config"
% (self["Name"], node))
return None
def startall(self, nodelist=None, verbose=False, quick=False):
'''Start the cluster manager on every node in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
map = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.ShouldBeStatus[node] == "down":
self.ns.WaitForAllNodesToComeUp(nodelist, 300)
if not quick:
# This is used for "basic sanity checks", so only start one node ...
if not self.StartaCM(node, verbose=verbose):
return 0
return 1
- # Approximation of SimulStartList for --boot
+ # Approximation of SimulStartList for --boot
watchpats = [ ]
watchpats.append(self.templates["Pat:DC_IDLE"])
for node in nodelist:
watchpats.append(self.templates["Pat:InfraUp"] % node)
watchpats.append(self.templates["Pat:PacemakerUp"] % node)
watchpats.append(self.templates["Pat:Local_started"] % node)
watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node))
# Start all the nodes - at about the same time...
watch = LogWatcher(self.Env["LogFileName"], watchpats, "fast-start", self.Env["DeadTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"])
watch.setwatch()
if not self.StartaCM(nodelist[0], verbose=verbose):
return 0
for node in nodelist:
self.StartaCMnoBlock(node, verbose=verbose)
watch.lookforall()
if watch.unmatched:
for regex in watch.unmatched:
self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
if not self.cluster_stable():
self.logger.log("Cluster did not stabilize")
return 0
return 1
def stopall(self, nodelist=None, verbose=False, force=False):
'''Stop the cluster managers on every node in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
ret = 1
map = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up" or force == True:
if not self.StopaCM(node, verbose=verbose, force=force):
ret = 0
return ret
def rereadall(self, nodelist=None):
'''Force the cluster managers on every node in the cluster
to reread their config files. We can do it on a subset of the
cluster if nodelist is not None.
'''
map = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in self.Env["nodes"]:
if self.ShouldBeStatus[node] == "up":
self.RereadCM(node)
def statall(self, nodelist=None):
'''Return the status of the cluster managers in the cluster.
We can do it on a subset of the cluster if nodelist is not None.
'''
result = {}
if not nodelist:
nodelist = self.Env["nodes"]
for node in nodelist:
if self.StataCM(node):
result[node] = "up"
else:
result[node] = "down"
return result
def isolate_node(self, target, nodes=None):
'''isolate the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node != target:
rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
if rc != 0:
self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
return None
else:
self.debug("Communication cut between %s and %s" % (target, node))
return 1
def unisolate_node(self, target, nodes=None):
'''fix the communication between the nodes'''
if not nodes:
nodes = self.Env["nodes"]
for node in nodes:
if node != target:
restored = 0
# Limit the amount of time we have asynchronous connectivity for
# Restore both sides as simultaneously as possible
self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=0)
self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=0)
self.debug("Communication restored between %s and %s" % (target, node))
def reducecomm_node(self,node):
'''reduce the communication between the nodes'''
rc = self.rsh(node, self.templates["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"]))
if rc == 0:
return 1
else:
self.logger.log("Could not reduce the communication between the nodes from node: %s" % node)
return None
def restorecomm_node(self,node):
'''restore the saved communication between the nodes'''
rc = 0
if float(self.Env["XmitLoss"]) != 0 or float(self.Env["RecvLoss"]) != 0 :
rc = self.rsh(node, self.templates["RestoreCommCmd"]);
if rc == 0:
return 1
else:
self.logger.log("Could not restore the communication between the nodes from node: %s" % node)
return None
def oprofileStart(self, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofileStart(n)
elif node in self.Env["oprofile"]:
self.debug("Enabling oprofile on %s" % node)
self.rsh(node, "opcontrol --init")
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
self.rsh(node, "opcontrol --start")
self.rsh(node, "opcontrol --reset")
def oprofileSave(self, test, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofileSave(test, n)
elif node in self.Env["oprofile"]:
self.rsh(node, "opcontrol --dump")
self.rsh(node, "opcontrol --save=cts.%d" % test)
# Read back with: opreport -l session:cts.0 image:<directory>/c*
if None:
self.rsh(node, "opcontrol --reset")
else:
self.oprofileStop(node)
self.oprofileStart(node)
def oprofileStop(self, node=None):
if not node:
for n in self.Env["oprofile"]:
self.oprofileStop(n)
elif node in self.Env["oprofile"]:
self.debug("Stopping oprofile on %s" % node)
self.rsh(node, "opcontrol --reset")
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
def StatsExtract(self):
if not self.Env["stats"]:
return
for host in self.Env["nodes"]:
log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR
if host in has_log_stats:
self.rsh(host, '''bash %s %s stop''' % (log_stats_bin, log_stats_file))
(rc, lines) = self.rsh(host, '''cat %s''' % log_stats_file, stdout=2)
self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
fname = "cts-stats-%d-nodes-%s.csv" % (len(self.Env["nodes"]), host)
print("Extracted stats: %s" % fname)
fd = open(fname, "a")
fd.writelines(lines)
fd.close()
def StatsMark(self, testnum):
'''Mark the test number in the stats log'''
global has_log_stats
if not self.Env["stats"]:
return
for host in self.Env["nodes"]:
log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR
if not host in has_log_stats:
global log_stats
global log_stats_bin
script=log_stats
#script = re.sub("\\\\", "\\\\", script)
script = re.sub('\"', '\\\"', script)
script = re.sub("'", "\'", script)
script = re.sub("`", "\`", script)
script = re.sub("\$", "\\\$", script)
self.debug("Installing %s on %s" % (log_stats_bin, host))
self.rsh(host, '''echo "%s" > %s''' % (script, log_stats_bin), silent=True)
self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
has_log_stats[host] = 1
# Now mark it
self.rsh(host, '''bash %s %s mark %s''' % (log_stats_bin, log_stats_file, testnum), synchronous=0)
-
-
-class Component(object):
- def kill(self, node):
- None
-
-
-class Process(Component):
- def __init__(self, cm, name, process=None, dc_only=0, pats=[], dc_pats=[], badnews_ignore=[], common_ignore=[], triggersreboot=0):
- self.name = str(name)
- self.dc_only = dc_only
- self.pats = pats
- self.dc_pats = dc_pats
- self.CM = cm
- self.badnews_ignore = badnews_ignore
- self.badnews_ignore.extend(common_ignore)
- self.triggersreboot = triggersreboot
-
- if process:
- self.proc = str(process)
- else:
- self.proc = str(name)
- self.KillCmd = "killall -9 " + self.proc
-
- def kill(self, node):
- if self.CM.rsh(node, self.KillCmd) != 0:
- self.CM.log ("ERROR: Kill %s failed on node %s" % (self.name,node))
- return None
- return 1
diff --git a/cts/lab/Makefile.am b/cts/lab/Makefile.am
index 697e845ea0..2303a1b3a0 100644
--- a/cts/lab/Makefile.am
+++ b/cts/lab/Makefile.am
@@ -1,38 +1,39 @@
#
# Copyright 2001-2021 the Pacemaker project contributors
#
# The version control history for this file may have further details.
#
# This source code is licensed under the GNU General Public License version 2
# or later (GPLv2+) WITHOUT ANY WARRANTY.
#
MAINTAINERCLEANFILES = Makefile.in
noinst_SCRIPTS = cluster_test \
OCFIPraTest.py
# Commands intended to be run only via other commands
halibdir = $(CRM_DAEMON_DIR)
dist_halib_SCRIPTS = cts-log-watcher
ctslibdir = $(pythondir)/cts
ctslib_PYTHON = __init__.py \
CIB.py \
cib_xml.py \
+ ClusterManager.py \
CM_common.py \
CM_corosync.py \
CTS.py \
CTSaudits.py \
CTSscenarios.py \
CTStests.py \
environment.py \
logging.py \
patterns.py \
remote.py \
watcher.py
nodist_ctslib_PYTHON = CTSvars.py
ctsdir = $(datadir)/$(PACKAGE)/tests/cts
cts_SCRIPTS = CTSlab.py \
cts

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jul 8, 6:40 PM (2 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2002741
Default Alt Text
(100 KB)

Event Timeline