diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py index 7af5fc8626..1bb8d0be8f 100644 --- a/cts/lab/CTSscenarios.py +++ b/cts/lab/CTSscenarios.py @@ -1,601 +1,598 @@ """ Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2000-2021 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import sys import time from cts.CTStests import CTSTest from cts.CTSaudits import ClusterAudit from cts.watcher import LogWatcher class ScenarioComponent(object): def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return True if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' raise ValueError("Abstract Class member (IsApplicable)") def SetUp(self, CM): '''Set up the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") class Scenario(object): ( '''The basic idea of a scenario is that of an ordered list of ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, and then after the tests have been run, they are torn down using TearDown() (in reverse order). A Scenario is applicable to a particular cluster manager iff each ScenarioComponent is applicable. A partially set up scenario is torn down if it fails during setup. ''') def __init__(self, ClusterManager, Components, Audits, Tests): "Initialize the Scenario from the list of ScenarioComponents" self.ClusterManager = ClusterManager self.Components = Components self.Audits = Audits self.Tests = Tests self.BadNews = None self.TestSets = [] self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0} self.Sets = [] #self.ns=CTS.NodeStatus(self.Env) for comp in Components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in Tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def IsApplicable(self): ( '''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() ''' ) for comp in self.Components: if not comp.IsApplicable(): return None return True def SetUp(self): '''Set up the Scenario. Return TRUE on success.''' self.ClusterManager.prepare() self.audit() # Also detects remote/local log config - self.ClusterManager.StatsMark(0) self.ClusterManager.ns.WaitForAllNodesToComeUp(self.ClusterManager.Env["nodes"]) self.audit() self.ClusterManager.install_support() self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"], self.ClusterManager.templates.get_patterns("BadNews"), "BadNews", 0, kind=self.ClusterManager.Env["LogWatcher"], hosts=self.ClusterManager.Env["nodes"]) self.BadNews.setwatch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self.Components): if not self.Components[j].SetUp(self.ClusterManager): # OOPS! We failed. Tear partial setups down. self.audit() self.ClusterManager.log("Tearing down partial setup") self.TearDown(j) return None j = j + 1 self.audit() return 1 def TearDown(self, max=None): '''Tear Down the Scenario - in reverse order.''' if max == None: max = len(self.Components)-1 j = max while j >= 0: self.Components[j].TearDown(self.ClusterManager) j = j - 1 self.audit() - self.ClusterManager.StatsExtract() self.ClusterManager.install_support("uninstall") def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def run(self, Iterations): self.ClusterManager.oprofileStart() try: self.run_loop(Iterations) self.ClusterManager.oprofileStop() except: self.ClusterManager.oprofileStop() raise def run_loop(self, Iterations): raise ValueError("Abstract Class member (run_loop)") def run_test(self, test, testcount): nodechoice = self.ClusterManager.Env.random_node() ret = 1 where = "" did_run = 0 - self.ClusterManager.StatsMark(testcount) self.ClusterManager.instance_errorstoignore_clear() self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]") starttime = test.set_timer() if not test.setup(nodechoice): self.ClusterManager.log("Setup failed") ret = 0 elif not test.canrunnow(nodechoice): self.ClusterManager.log("Skipped") test.skipped() else: did_run = 1 ret = test(nodechoice) if not test.teardown(nodechoice): self.ClusterManager.log("Teardown failed") if self.ClusterManager.Env["continue"]: answer = "Y" else: try: answer = input('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) ret = 0 stoptime = time.time() self.ClusterManager.oprofileSave(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if not test["min_time"]: test["elapsed_time"] = elapsed_time test["min_time"] = test_time test["max_time"] = test_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if test_time < test["min_time"]: test["min_time"] = test_time if test_time > test["max_time"]: test["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self.ClusterManager.statall() did_run = 1 # Force the test count to be incremented anyway so test extraction works self.audit(test.errorstoignore()) return did_run def summarize(self): self.ClusterManager.log("****************") self.ClusterManager.log("Overall Results:" + repr(self.Stats)) self.ClusterManager.log("****************") stat_filter = { "calls":0, "failure":0, "skipped":0, "auditfail":0, } self.ClusterManager.log("Test Summary") for test in self.Tests: for key in list(stat_filter.keys()): stat_filter[key] = test.Stats[key] self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter)) self.ClusterManager.debug("Detailed Results") for test in self.Tests: self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats)) self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, LocalIgnore=[]): errcount = 0 ignorelist = [] ignorelist.append("CTS:") ignorelist.extend(LocalIgnore) ignorelist.extend(self.ClusterManager.errorstoignore()) ignorelist.extend(self.ClusterManager.instance_errorstoignore()) # This makes sure everything is stabilized before starting... failed = 0 for audit in self.Audits: if not audit(): self.ClusterManager.log("Audit " + audit.name() + " FAILED.") failed += 1 else: self.ClusterManager.debug("Audit " + audit.name() + " passed.") while errcount < 1000: match = None if self.BadNews: match = self.BadNews.look(0) if match: add_err = 1 for ignore in ignorelist: if add_err == 1 and re.search(ignore, match): add_err = 0 if add_err == 1: self.ClusterManager.log("BadNews: " + match) self.incr("BadNews") errcount = errcount + 1 else: break else: if self.ClusterManager.Env["continue"]: answer = "Y" else: try: answer = input('Big problems. Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": self.ClusterManager.log("Shutting down.") self.summarize() self.TearDown() raise ValueError("Looks like we hit a BadNews jackpot!") if self.BadNews: self.BadNews.end() return failed class AllOnce(Scenario): '''Every Test Once''' # Accessable as __doc__ def run_loop(self, Iterations): testcount = 1 for test in self.Tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): '''Random Test Execution''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: test = self.ClusterManager.Env.random_gen.choice(self.Tests) self.run_test(test, testcount) testcount += 1 class BasicSanity(Scenario): '''Basic Cluster Sanity''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: test = self.Environment.random_gen.choice(self.Tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): '''Named Tests in Sequence''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: for test in self.Tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): '''Start the Cluster''' def run_loop(self, Iterations): testcount = 0 class BootCluster(ScenarioComponent): ( '''BootCluster is the most basic of ScenarioComponents. This ScenarioComponent simply starts the cluster manager on all the nodes. It is fairly robust as it waits for all nodes to come up before starting as they might have been rebooted or crashed for some reason beforehand. ''') def __init__(self, Env): pass def IsApplicable(self): '''BootCluster is so generic it is always Applicable''' return True def SetUp(self, CM): '''Basic Cluster Manager startup. Start everything''' CM.prepare() # Clear out the cobwebs ;-) CM.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all nodes.") return CM.startall(verbose=True, quick=True) def TearDown(self, CM, force=False): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Stopping Cluster Manager on all nodes") return CM.stopall(verbose=True, force=force) class LeaveBooted(BootCluster): def TearDown(self, CM): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Leaving Cluster running on all nodes") return 1 class PingFest(ScenarioComponent): ( '''PingFest does a flood ping to each node in the cluster from the test machine. If the LabEnvironment Parameter PingSize is set, it will be used as the size of ping packet requested (via the -s option). If it is not set, it defaults to 1024 bytes. According to the manual page for ping: Outputs packets as fast as they come back or one hundred times per second, whichever is more. For every ECHO_REQUEST sent a period ``.'' is printed, while for every ECHO_REPLY received a backspace is printed. This provides a rapid display of how many packets are being dropped. Only the super-user may use this option. This can be very hard on a net- work and should be used with caution. ''' ) def __init__(self, Env): self.Env = Env def IsApplicable(self): '''PingFests are always applicable ;-) ''' return True def SetUp(self, CM): '''Start the PingFest!''' self.PingSize = 1024 if "PingSize" in list(CM.Env.keys()): self.PingSize = CM.Env["PingSize"] CM.log("Starting %d byte flood pings" % self.PingSize) self.PingPids = [] for node in CM.Env["nodes"]: self.PingPids.append(self._pingchild(node)) CM.log("Ping PIDs: " + repr(self.PingPids)) return 1 def TearDown(self, CM): '''Stop it right now! My ears are pinging!!''' for pid in self.PingPids: if pid != None: CM.log("Stopping ping process %d" % pid) os.kill(pid, signal.SIGKILL) def _pingchild(self, node): Args = ["ping", "-qfn", "-s", str(self.PingSize), node] sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid < 0: self.Env.log("Cannot fork ping child") return None if pid > 0: return pid # Otherwise, we're the child process. os.execvp("ping", Args) self.Env.log("Cannot execvp ping: " + repr(Args)) sys.exit(1) class PacketLoss(ScenarioComponent): ( ''' It would be useful to do some testing of CTS with a modest amount of packet loss enabled - so we could see that everything runs like it should with a certain amount of packet loss present. ''') def IsApplicable(self): '''always Applicable''' return True def SetUp(self, CM): '''Reduce the reliability of communications''' if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : return 1 for node in CM.Env["nodes"]: CM.reducecomm_node(node) CM.log("Reduce the reliability of communications") return 1 def TearDown(self, CM): '''Fix the reliability of communications''' if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : return 1 for node in CM.Env["nodes"]: CM.unisolate_node(node) CM.log("Fix the reliability of communications") class BasicSanityCheck(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["DoBSC"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on BSC node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on BSC node(s).") return CM.stopall() class Benchmark(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["benchmark"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM, force=True) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on all node(s).") return CM.stopall() class RollingUpgrade(ScenarioComponent): ( ''' Test a rolling upgrade between two versions of the stack ''') def __init__(self, Env): self.Env = Env def IsApplicable(self): if not self.Env["rpm-dir"]: return None if not self.Env["current-version"]: return None if not self.Env["previous-version"]: return None return True def install(self, node, version): target_dir = "/tmp/rpm-%s" % version src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version) self.CM.rsh(node, "mkdir -p %s" % target_dir) rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir)) self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir)) return self.success() def upgrade(self, node): return self.install(node, self.CM.Env["current-version"]) def downgrade(self, node): return self.install(node, self.CM.Env["previous-version"]) def SetUp(self, CM): print(repr(self)+"prepare") CM.prepare() # Clear out the cobwebs CM.stopall(force=True) CM.log("Downgrading all nodes to %s." % self.Env["previous-version"]) for node in self.Env["nodes"]: if not self.downgrade(node): CM.log("Couldn't downgrade %s" % node) return None return 1 def TearDown(self, CM): # Stop everything CM.log("Stopping Cluster Manager on Upgrade nodes.") CM.stopall() CM.log("Upgrading all nodes to %s." % self.Env["current-version"]) for node in self.Env["nodes"]: if not self.upgrade(node): CM.log("Couldn't upgrade %s" % node) return None return 1 diff --git a/cts/lab/ClusterManager.py b/cts/lab/ClusterManager.py index 2b3ccbb376..79f06140e9 100644 --- a/cts/lab/ClusterManager.py +++ b/cts/lab/ClusterManager.py @@ -1,1085 +1,962 @@ """ ClusterManager class for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = """Copyright 2000-2023 the Pacemaker project contributors. Certain portions by Huang Zhen are copyright 2004 International Business Machines. The version control history for this file may have further details.""" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import time from collections import UserDict from cts.CIB import ConfigFactory from cts.CTS import NodeStatus, Process from cts.CTStests import AuditResource from cts.watcher import LogWatcher from pacemaker.buildoptions import BuildOptions from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory -has_log_stats = {} -log_stats_bin = BuildOptions.DAEMON_DIR + "/cts_log_stats.sh" -log_stats = """ -#!%s -# Tool for generating system load reports while CTS runs - -trap "" 1 - -f=$1; shift -action=$1; shift -base=`basename $0` - -if [ ! -e $f ]; then - echo "Time, Load 1, Load 5, Load 15, Test Marker" > $f -fi - -function killpid() { - if [ -e $f.pid ]; then - kill -9 `cat $f.pid` - rm -f $f.pid - fi -} - -function status() { - if [ -e $f.pid ]; then - kill -0 `cat $f.pid` - return $? - else - return 1 - fi -} - -function start() { - # Is it already running? - if - status - then - return - fi - - echo Active as $$ - echo $$ > $f.pid - - while [ 1 = 1 ]; do - uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f - #top -b -c -n1 | grep -e usr/libexec/pacemaker | grep -v -e grep -e python | head -n 1 | sed s@/usr/libexec/pacemaker/@@ | awk '{print " 0, "$9", "$10", "$12}' | tr '\\n' ',' >> $f - echo 0 >> $f - sleep 5 - done -} - -case $action in - start) - start - ;; - start-bg|bg) - # Use c --ssh -- ./stats.sh file start-bg - nohup $0 $f start >/dev/null 2>&1 > $f - echo " $*" >> $f - start - ;; - *) - echo "Unknown action: $action." - ;; -esac -""" % (BuildOptions.BASH_PATH) - class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def __InitialConditions(self): #if os.geteuid() != 0: # raise ValueError("Must Be Root!") None def _finalConditions(self): for key in list(self.keys()): if self[key] == None: raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") def __init__(self): self.Env = EnvFactory().getInstance() self.templates = PatternSelector(self.Env["Name"]) self.__InitialConditions() self.logger = LogFactory() self.TestLoggingLevel=0 self.data = {} self.name = self.Env["Name"] self.rsh = RemoteFactory().getInstance() self.ShouldBeStatus={} self.ns = NodeStatus(self.Env) self.OurNode = os.uname()[1].lower() self.__instance_errorstoignore = [] self.fastfail = 0 self.cib_installed = 0 self.config = None self.cluster_monitor = 0 self.use_short_names = 1 if self.Env["DoBSC"]: del self.templates["Pat:They_stopped"] self._finalConditions() self.check_transitions = 0 self.check_elections = 0 self.CIBsync = {} self.CibFactory = ConfigFactory(self) self.cib = self.CibFactory.createConfig(self.Env["Schema"]) def __getitem__(self, key): if key == "Name": return self.name print("FIXME: Getting %s from %s" % (key, repr(self))) if key in self.data: return self.data[key] return self.templates.get_patterns(key) def __setitem__(self, key, value): print("FIXME: Setting %s=%s on %s" % (key, value, repr(self))) self.data[key] = value def key_for_node(self, node): return node def instance_errorstoignore_clear(self): '''Allows the test scenario to reset instance errors to ignore on each iteration.''' self.__instance_errorstoignore = [] def instance_errorstoignore(self): '''Return list of errors which are 'normal' for a specific test instance''' return self.__instance_errorstoignore def log(self, args): self.logger.log(args) def debug(self, args): self.logger.debug(args) def upcount(self): '''How many nodes are up?''' count = 0 for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": count = count + 1 return count def install_support(self, command="install"): for node in self.Env["nodes"]: self.rsh(node, BuildOptions.DAEMON_DIR + "/cts-support " + command) def prepare_fencing_watcher(self, name): # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced upnode = None if self.HasQuorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonithPats = [] for peer in self.Env["nodes"]: if self.ShouldBeStatus[peer] != "up": stonithPats.append(self.templates["Pat:Fencing_ok"] % peer) stonithPats.append(self.templates["Pat:Fencing_start"] % peer) stonith = LogWatcher(self.Env["LogFileName"], stonithPats, "StartupFencing", 0, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"]) stonith.setwatch() return stonith def fencing_cleanup(self, node, stonith): peer_list = [] peer_state = {} self.debug("Looking for nodes that were fenced as a result of %s starting" % node) # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.HasQuorum(None) if not q and len(self.Env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"]))) return peer_list for n in self.Env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug("looking for: " + repr(stonith.regexes)) shot = stonith.look(0) while shot: line = repr(shot) self.debug("Found: " + line) del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.Env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self.logger.log("ERROR: Unknown stonith match: %s" % line) elif not peer in peer_list: self.debug("Found peer: " + peer) peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) if self.Env["at-boot"]: self.ShouldBeStatus[peer] = "up" else: self.ShouldBeStatus[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while len(stonith.regexes) and shot: line = repr(shot) self.debug("Found: " + line) del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too self.ns.WaitForNodeToComeUp(peer, self.Env["DeadTime"]) # Poll until it comes up if self.Env["at-boot"]: if not self.StataCM(peer): time.sleep(self.Env["StartTime"]) if not self.StataCM(peer): self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) return None return peer_list def StartaCM(self, node, verbose=False): '''Start up the cluster manager on a given node''' if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node)) else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) ret = 1 if not node in self.ShouldBeStatus: self.ShouldBeStatus[node] = "down" if self.ShouldBeStatus[node] != "down": return 1 patterns = [] # Technically we should always be able to notice ourselves starting patterns.append(self.templates["Pat:Local_started"] % node) if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) watch = LogWatcher( self.Env["LogFileName"], patterns, "StartaCM", self.Env["StartTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"]) self.install_config(node) self.ShouldBeStatus[node] = "any" if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): self.logger.log ("%s was already started" % (node)) return 1 stonith = self.prepare_fencing_watcher(node) watch.setwatch() (rc, _) = self.rsh(node, self.templates["StartCmd"]) if rc != 0: self.logger.log ("Warn: Start command failed on node %s" % (node)) self.fencing_cleanup(node, stonith) return None self.ShouldBeStatus[node] = "up" watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) if watch_result and self.cluster_stable(self.Env["DeadTime"]): #self.debug("Found match: "+ repr(watch_result)) self.fencing_cleanup(node, stonith) return 1 elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): self.fencing_cleanup(node, stonith) return 1 self.logger.log ("Warn: Start failed for node %s" % (node)) return None def StartaCMnoBlock(self, node, verbose=False): '''Start up the cluster manager on a given node with none-block mode''' if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node)) else: self.debug("Starting %s on node %s" % (self["Name"], node)) self.install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=False) self.ShouldBeStatus[node] = "up" return 1 def StopaCM(self, node, verbose=False, force=False): '''Stop the cluster manager on a given node''' if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node)) else: self.debug("Stopping %s on node %s" % (self["Name"], node)) if self.ShouldBeStatus[node] != "up" and force == False: return 1 (rc, _) = self.rsh(node, self.templates["StopCmd"]) if rc == 0: # Make sure we can continue even if corosync leaks # fdata-* is the old name #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*") self.ShouldBeStatus[node] = "down" self.cluster_stable(self.Env["DeadTime"]) return 1 else: self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) return None def StopaCMnoBlock(self, node): '''Stop the cluster manager on a given node with none-block mode''' self.debug("Stopping %s on node %s" % (self["Name"], node)) self.rsh(node, self.templates["StopCmd"], synchronous=False) self.ShouldBeStatus[node] = "down" return 1 def RereadCM(self, node): '''Force the cluster manager on a given node to reread its config This may be a no-op on certain cluster managers. ''' (rc, _) = self.rsh(node, self.templates["RereadCmd"]) if rc == 0: return 1 else: self.logger.log ("Could not force %s on node %s to reread its config" % (self["Name"], node)) return None def startall(self, nodelist=None, verbose=False, quick=False): '''Start the cluster manager on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.ShouldBeStatus[node] == "down": self.ns.WaitForAllNodesToComeUp(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... if not self.StartaCM(node, verbose=verbose): return 0 return 1 # Approximation of SimulStartList for --boot watchpats = [ ] watchpats.append(self.templates["Pat:DC_IDLE"]) for node in nodelist: watchpats.append(self.templates["Pat:InfraUp"] % node) watchpats.append(self.templates["Pat:PacemakerUp"] % node) watchpats.append(self.templates["Pat:Local_started"] % node) watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node)) # Start all the nodes - at about the same time... watch = LogWatcher(self.Env["LogFileName"], watchpats, "fast-start", self.Env["DeadTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"]) watch.setwatch() if not self.StartaCM(nodelist[0], verbose=verbose): return 0 for node in nodelist: self.StartaCMnoBlock(node, verbose=verbose) watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) if not self.cluster_stable(): self.logger.log("Cluster did not stabilize") return 0 return 1 def stopall(self, nodelist=None, verbose=False, force=False): '''Stop the cluster managers on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up" or force == True: if not self.StopaCM(node, verbose=verbose, force=force): ret = 0 return ret def rereadall(self, nodelist=None): '''Force the cluster managers on every node in the cluster to reread their config files. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": self.RereadCM(node) def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.StataCM(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): '''isolate the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node != target: rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node)) if rc != 0: self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) return None else: self.debug("Communication cut between %s and %s" % (target, node)) return 1 def unisolate_node(self, target, nodes=None): '''fix the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node != target: restored = 0 # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False) self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False) self.debug("Communication restored between %s and %s" % (target, node)) def reducecomm_node(self,node): '''reduce the communication between the nodes''' (rc, _) = self.rsh(node, self.templates["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"])) if rc == 0: return 1 else: self.logger.log("Could not reduce the communication between the nodes from node: %s" % node) return None def restorecomm_node(self,node): '''restore the saved communication between the nodes''' rc = 0 if float(self.Env["XmitLoss"]) != 0 or float(self.Env["RecvLoss"]) != 0 : (rc, _) = self.rsh(node, self.templates["RestoreCommCmd"]) if rc == 0: return 1 else: self.logger.log("Could not restore the communication between the nodes from node: %s" % node) return None def oprofileStart(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileStart(n) elif node in self.Env["oprofile"]: self.debug("Enabling oprofile on %s" % node) self.rsh(node, "opcontrol --init") self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") self.rsh(node, "opcontrol --start") self.rsh(node, "opcontrol --reset") def oprofileSave(self, test, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileSave(test, n) elif node in self.Env["oprofile"]: self.rsh(node, "opcontrol --dump") self.rsh(node, "opcontrol --save=cts.%d" % test) # Read back with: opreport -l session:cts.0 image:/c* if None: self.rsh(node, "opcontrol --reset") else: self.oprofileStop(node) self.oprofileStart(node) def oprofileStop(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileStop(n) elif node in self.Env["oprofile"]: self.debug("Stopping oprofile on %s" % node) self.rsh(node, "opcontrol --reset") self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") - - def StatsExtract(self): - if not self.Env["stats"]: - return - - for host in self.Env["nodes"]: - log_stats_file = "%s/cts-stats.csv" % BuildOptions.DAEMON_DIR - if host in has_log_stats: - self.rsh(host, '''bash %s %s stop''' % (log_stats_bin, log_stats_file)) - (_, lines) = self.rsh(host, '''cat %s''' % log_stats_file, verbose=1) - self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file)) - - fname = "cts-stats-%d-nodes-%s.csv" % (len(self.Env["nodes"]), host) - print("Extracted stats: %s" % fname) - fd = open(fname, "a") - fd.writelines(lines) - fd.close() - - def StatsMark(self, testnum): - '''Mark the test number in the stats log''' - - global has_log_stats - if not self.Env["stats"]: - return - - for host in self.Env["nodes"]: - log_stats_file = "%s/cts-stats.csv" % BuildOptions.DAEMON_DIR - if not host in has_log_stats: - - global log_stats - global log_stats_bin - script=log_stats - #script = re.sub("\\\\", "\\\\", script) - script = re.sub('\"', '\\\"', script) - script = re.sub("'", "\'", script) - script = re.sub("`", "\`", script) - script = re.sub("\$", "\\\$", script) - - self.debug("Installing %s on %s" % (log_stats_bin, host)) - self.rsh(host, '''echo "%s" > %s''' % (script, log_stats_bin), verbose=0) - self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file)) - has_log_stats[host] = 1 - - # Now mark it - self.rsh(host, '''bash %s %s mark %s''' % (log_stats_bin, log_stats_file, testnum), synchronous=False) - def errorstoignore(self): # At some point implement a more elegant solution that # also produces a report at the end """ Return a list of known error messages that should be ignored """ return self.templates.get_patterns("BadNewsIgnore") def install_config(self, node): if not self.ns.WaitForNodeToComeUp(node): self.log("Node %s is not up." % node) return None if not node in self.CIBsync and self.Env["ClobberCIB"]: self.CIBsync[node] = 1 self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*") # Only install the CIB on the first node, all the other ones will pick it up from there if self.cib_installed == 1: return None self.cib_installed = 1 if self.Env["CIBfilename"] == None: self.log("Installing Generated CIB on node %s" % (node)) self.cib.install(node) else: self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node)) if self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) != 0: raise ValueError("Can not scp file to %s %d"%(node)) self.rsh(node, "chown " + BuildOptions.DAEMON_USER + " " + BuildOptions.CIB_DIR + "/cib.xml") def prepare(self): '''Finish the Initialization process. Prepare to test...''' self.partitions_expected = 1 for node in self.Env["nodes"]: self.ShouldBeStatus[node] = "" if self.Env["experimental-tests"]: self.unisolate_node(node) self.StataCM(node) def test_node_CM(self, node): '''Report the status of the cluster manager on a given node''' watchpats = [ ] watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") watchpats.append(self.templates["Pat:NonDC_started"] % node) watchpats.append(self.templates["Pat:DC_started"] % node) idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, "ClusterIdle", hosts=[node], kind=self.Env["LogWatcher"]) idle_watch.setwatch() (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) if not out: out = "" else: out = out[0].strip() self.debug("Node %s status: '%s'" %(node, out)) if out.find('ok') < 0: if self.ShouldBeStatus[node] == "up": self.log( "Node status for %s is %s but we think it should be %s" % (node, "down", self.ShouldBeStatus[node])) self.ShouldBeStatus[node] = "down" return 0 if self.ShouldBeStatus[node] == "down": self.log( "Node status for %s is %s but we think it should be %s: %s" % (node, "up", self.ShouldBeStatus[node], out)) self.ShouldBeStatus[node] = "up" # check the output first - because syslog-ng loses messages if out.find('S_NOT_DC') != -1: # Up and stable return 2 if out.find('S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug("Warn: Node %s is unstable: %s" % (node, out)) return 1 # Up and stable return 2 # Is the node up or is the node down def StataCM(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) > 0: return 1 return None # Being up and being stable is not the same question... def node_stable(self, node): '''Report the status of the cluster manager on a given node''' if self.test_node_CM(node) == 2: return 1 self.log("Warn: Node %s not stable" % (node)) return None def partition_stable(self, nodes, timeout=None): watchpats = [ ] watchpats.append("Current ping state: S_IDLE") watchpats.append(self.templates["Pat:DC_IDLE"]) self.debug("Waiting for cluster stability...") if timeout == None: timeout = self.Env["DeadTime"] if len(nodes) < 3: self.debug("Cluster is inactive") return 1 idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, "ClusterStable", timeout, hosts=nodes.split(), kind=self.Env["LogWatcher"]) idle_watch.setwatch() for node in nodes.split(): # have each node dump its current state self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) ret = idle_watch.look() while ret: self.debug(ret) for node in nodes.split(): if re.search(node, ret): return 1 ret = idle_watch.look() self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout)) return None def cluster_stable(self, timeout=None, double_check=False): partitions = self.find_partitions() for partition in partitions: if not self.partition_stable(partition, timeout): return None if double_check: # Make sure we are really stable and that all resources, # including those that depend on transient node attributes, # are started if they were going to be time.sleep(5) for partition in partitions: if not self.partition_stable(partition, timeout): return None return 1 def is_node_dc(self, node, status_line=None): rc = 0 if not status_line: (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) if out: status_line = out[0].strip() if not status_line: rc = 0 elif status_line.find('S_IDLE') != -1: rc = 1 elif status_line.find('S_INTEGRATION') != -1: rc = 1 elif status_line.find('S_FINALIZE_JOIN') != -1: rc = 1 elif status_line.find('S_POLICY_ENGINE') != -1: rc = 1 elif status_line.find('S_TRANSITION_ENGINE') != -1: rc = 1 return rc def active_resources(self, node): (_, output) = self.rsh(node, "crm_resource -c", verbose=1) resources = [] for line in output: if re.search("^Resource", line): tmp = AuditResource(self, line) if tmp.type == "primitive" and tmp.host == node: resources.append(tmp.id) return resources def ResourceLocation(self, rid): ResourceNodes = [] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": cmd = self.templates["RscRunning"] % (rid) (rc, lines) = self.rsh(node, cmd) if rc == 127: self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) for line in lines: self.log("Output: "+line) elif rc == 0: ResourceNodes.append(node) return ResourceNodes def find_partitions(self): ccm_partitions = [] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) if not out: self.log("no partition details for %s" % node) continue partition = out[0].strip() if len(partition) > 2: nodes = partition.split() nodes.sort() partition = ' '.join(nodes) found = 0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug("Adding partition from %s: %s" % (node, partition)) ccm_partitions.append(partition) else: self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) else: self.log("bad partition details for %s" % node) else: self.debug("Node %s is down... skipping" % node) self.debug("Found partitions: %s" % repr(ccm_partitions) ) return ccm_partitions def HasQuorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] for node in node_list: if self.ShouldBeStatus[node] == "up": (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) quorum = quorum[0].strip() if quorum.find("1") != -1: return 1 elif quorum.find("0") != -1: return 0 else: self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum) return 0 def Components(self): complist = [] common_ignore = [ "Pending action:", "(ERROR|error): crm_log_message_adv:", "(ERROR|error): MSG: No message to dump", "pending LRM operations at shutdown", "Lost connection to the CIB manager", "Connection to the CIB terminated...", "Sending message to the CIB manager FAILED", "Action A_RECOVER .* not supported", "(ERROR|error): stonithd_op_result_ready: not signed on", "pingd.*(ERROR|error): send_update: Could not send update", "send_ipc_message: IPC Channel to .* is not connected", "unconfirmed_actions: Waiting on .* unconfirmed actions", "cib_native_msgready: Message pending on command channel", r": Performing A_EXIT_1 - forcefully exiting ", r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.", ] stonith_ignore = [ r"Updating failcount for child_DoFencing", r"error.*: Fencer connection failed \(will retry\)", "pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.", ] stonith_ignore.extend(common_ignore) ccm = Process(self, "ccm", triggersreboot=self.fastfail, pats = [ "State transition .* S_RECOVERY", "pacemaker-controld.*Action A_RECOVER .* not supported", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", r"pacemaker-controld.*: Could not recover from internal error", "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", # these status numbers are likely wrong now r"pacemaker-controld.*exited with status 2", r"attrd.*exited with status 1", r"cib.*exited with status 2", # Not if it was fenced # "A new node joined the cluster", # "WARN: determine_online_status: Node .* is unclean", # "Scheduling node .* for fencing", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", # "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE", # "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN", "State transition S_STARTING -> S_PENDING", ], badnews_ignore = common_ignore) based = Process(self, "pacemaker-based", triggersreboot=self.fastfail, pats = [ "State transition .* S_RECOVERY", "Lost connection to the CIB manager", "Connection to the CIB manager terminated", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", r"pacemaker-controld.*: Could not recover from internal error", # these status numbers are likely wrong now r"pacemaker-controld.*exited with status 2", r"attrd.*exited with status 1", ], badnews_ignore = common_ignore) execd = Process(self, "pacemaker-execd", triggersreboot=self.fastfail, pats = [ "State transition .* S_RECOVERY", "LRM Connection failed", "pacemaker-controld.*I_ERROR.*lrm_connection_destroy", "State transition S_STARTING -> S_PENDING", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", r"pacemaker-controld.*: Could not recover from internal error", # this status number is likely wrong now r"pacemaker-controld.*exited with status 2", ], badnews_ignore = common_ignore) controld = Process(self, "pacemaker-controld", triggersreboot=self.fastfail, pats = [ # "WARN: determine_online_status: Node .* is unclean", # "Scheduling node .* for fencing", # "Executing .* fencing operation", # "tengine_stonith_callback: .*result=0", "State transition .* S_IDLE", "State transition S_STARTING -> S_PENDING", ], badnews_ignore = common_ignore) schedulerd = Process(self, "pacemaker-schedulerd", triggersreboot=self.fastfail, pats = [ "State transition .* S_RECOVERY", r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", r"pacemaker-controld.*: Could not recover from internal error", r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed", "pacemaker-controld.*I_ERROR.*save_cib_contents", # this status number is likely wrong now r"pacemaker-controld.*exited with status 2", ], badnews_ignore = common_ignore, dc_only=1) if self.Env["DoFencing"]: complist.append(Process(self, "stoniths", triggersreboot=self.fastfail, dc_pats = [ r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed", "Attempting connection to fencing daemon", ], badnews_ignore = stonith_ignore)) if self.fastfail == 0: ccm.pats.extend([ # these status numbers are likely wrong now r"attrd.*exited with status 1", r"pacemaker-(based|controld).*exited with status 2", ]) based.pats.extend([ # these status numbers are likely wrong now r"attrd.*exited with status 1", r"pacemaker-controld.*exited with status 2", ]) execd.pats.extend([ # these status numbers are likely wrong now r"pacemaker-controld.*exited with status 2", ]) complist.append(ccm) complist.append(based) complist.append(execd) complist.append(controld) complist.append(schedulerd) return complist def StandbyStatus(self, node): (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) if not out: return "off" out = out[0].strip() self.debug("Standby result: "+out) return out # status == "on" : Enter Standby mode # status == "off": Enter Active mode def SetStandbyMode(self, node, status): current_status = self.StandbyStatus(node) cmd = self.templates["StandbyCmd"] % (node, status) self.rsh(node, cmd) return True def AddDummyRsc(self, node, rid): rsc_xml = """ ' '""" % (rid, rid) constraint_xml = """ ' ' """ % (rid, node, node, rid) self.rsh(node, self.templates['CibAddXml'] % (rsc_xml)) self.rsh(node, self.templates['CibAddXml'] % (constraint_xml)) def RemoveDummyRsc(self, node, rid): constraint = "\"//rsc_location[@rsc='%s']\"" % (rid) rsc = "\"//primitive[@id='%s']\"" % (rid) self.rsh(node, self.templates['CibDelXpath'] % constraint) self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index fdfce18695..d10b8aabbc 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,573 +1,572 @@ """ Test environment classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["EnvFactory"] __copyright__ = "Copyright 2014-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse import os import random import socket import sys import time from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory class Environment: def __init__(self, args): self.data = {} self._nodes = [] # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["IPagent"] = "IPaddr2" self["DoFencing"] = True self["ClobberCIB"] = False self["CIBfilename"] = None self["CIBResource"] = False self["LogWatcher"] = "any" self["node-limit"] = 0 self["scenario"] = "random" - self["stats"] = False self.random_gen = random.Random() self.logger = LogFactory() self._seed_random() self.rsh = RemoteFactory().getInstance() self.target = "localhost" self.parse_args(args) if not self["ListTests"]: self.validate() self.discover() def _seed_random(self, seed=None): if not seed: seed = int(time.time()) self["RandSeed"] = seed self.random_gen.seed(str(seed)) def dump(self): keys = [] for key in list(self.data.keys()): keys.append(key) keys.sort() for key in keys: self.logger.debug("Environment["+key+"]:\t"+str(self[key])) def keys(self): return list(self.data.keys()) def has_key(self, key): if key == "nodes": return True return key in self.data def __getitem__(self, key): if str(key) == "0": raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead") if key == "nodes": return self._nodes if key == "Name": return self.get_stack_short() if key in self.data: return self.data[key] return None def __setitem__(self, key, value): if key == "Stack": self.set_stack(value) elif key == "node-limit": self.data[key] = value self.filter_nodes() elif key == "nodes": self._nodes = [] for node in value: # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: n = node.strip() socket.gethostbyname_ex(n) self._nodes.append(n) except: self.logger.log(node+" not found in DNS... aborting") raise self.filter_nodes() else: self.data[key] = value def random_node(self): """ Choose a random node from the cluster """ return self.random_gen.choice(self["nodes"]) def set_stack(self, name): # Normalize stack names if name in ["corosync", "cs", "mcp"]: self.data["Stack"] = "corosync 2+" else: raise ValueError("Unknown stack: "+name) def get_stack_short(self): # Create the Cluster Manager object if "Stack" not in self.data: return "unknown" if self.data["Stack"] == "corosync 2+": return "crm-corosync" LogFactory().log("Unknown stack: "+self["stack"]) raise ValueError("Unknown stack: "+self["stack"]) def detect_syslog(self): # Detect syslog variant if "syslogd" not in self.data: if self["have_systemd"]: # Systemd (_, lines) = self.rsh(self.target, "systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) self["syslogd"] = lines[0].strip() else: # SYS-V (_, lines) = self.rsh(self.target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) self["syslogd"] = lines[0].strip() if "syslogd" not in self.data or not self["syslogd"]: # default self["syslogd"] = "rsyslog" def disable_service(self, node, service): if self["have_systemd"]: # Systemd (rc, _) = self.rsh(node, "systemctl disable %s" % service) return rc # SYS-V (rc, _) = self.rsh(node, "chkconfig %s off" % service) return rc def enable_service(self, node, service): if self["have_systemd"]: # Systemd (rc, _) = self.rsh(node, "systemctl enable %s" % service) return rc # SYS-V (rc, _) = self.rsh(node, "chkconfig %s on" % service) return rc def service_is_enabled(self, node, service): if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self.rsh(node, "systemctl is-enabled %s | grep enabled" % service) return rc == 0 # SYS-V (rc, _) = self.rsh(node, "chkconfig --list | grep -e %s.*on" % service) return rc == 0 def detect_at_boot(self): # Detect if the cluster starts at boot if "at-boot" not in self.data: self["at-boot"] = self.service_is_enabled(self.target, "corosync") \ or self.service_is_enabled(self.target, "pacemaker") def detect_ip_offset(self): # Try to determine an offset for IPaddr resources if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self.rsh(self.target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self.rsh(self.target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) self["IPBase"] = lines[0].strip() if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self.logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self.logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) elif int(self["IPBase"].split('.')[3]) >= 240: self.logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s" % (self["IPBase"], self["IPBase"].split('.')[3])) self["IPBase"] = " fe80::1234:56:7890:1000" self.logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"]) def filter_nodes(self): if self["node-limit"] > 0: if len(self["nodes"]) > self["node-limit"]: self.logger.log("Limiting the number of nodes configured=%d (max=%d)" %(len(self["nodes"]), self["node-limit"])) while len(self["nodes"]) > self["node-limit"]: self["nodes"].pop(len(self["nodes"])-1) def validate(self): if not self["nodes"]: print("No nodes specified!") sys.exit(1) def discover(self): self.target = random.Random().choice(self["nodes"]) exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser if "have_systemd" not in self.data: (rc, _) = self.rsh(self.target, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 self.detect_syslog() self.detect_at_boot() self.detect_ip_offset() def parse_args(self, argv): if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0]) grp1 = parser.add_argument_group("Common options") grp1.add_argument("-g", "--dsh-group", "--group", metavar="GROUP", dest="group", help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)") grp1.add_argument("-l", "--limit-nodes", type=int, default=0, metavar="MAX", help="Only use the first MAX cluster nodes supplied with --nodes") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", metavar="NODES", help="List of cluster nodes separated by whitespace") grp1.add_argument("--stack", default="corosync", metavar="STACK", help="Which cluster stack is installed") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes") grp2.add_argument("--at-boot", "--cluster-starts-at-boot", choices=["1", "0", "yes", "no"], help="Does the cluster software start at boot time?") grp2.add_argument("--facility", "--syslog-facility", default="daemon", metavar="NAME", help="Which syslog facility to log to") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named test") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--bsc", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--container-tests", action="store_true", help="Include pacemaker_remote tests that run in lxc container resources") grp4.add_argument("--experimental-tests", action="store_true", help="Include experimental tests") grp4.add_argument("--loop-minutes", type=int, default=60, help="") grp4.add_argument("--no-loop-tests", action="store_true", help="Don't run looping/time-based tests") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--oprofile", metavar="NODES", help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--qarsh", action="store_true", help="Use QARSH to access nodes instead of SSH") grp4.add_argument("--recv-loss", type=float, default=0.0, metavar="RATE", help="") grp4.add_argument("--schema", metavar="SCHEMA", default="pacemaker-3.0", help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--set", action="append", metavar="ARG", default=[], help="Set key=value pairs (can be specified multiple times)") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--valgrind-procs", metavar="PROCS", default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd", help="Run valgrind against the given space-separated list of processes") grp4.add_argument("--valgrind-tests", action="store_true", help="Include tests using valgrind") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") grp4.add_argument("--xmit-loss", type=float, default=0.0, metavar="RATE", help="") parser.add_argument("iterations", type=int, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. They get a default from the add_argument # calls, only do one thing, and they do not have any side effects. self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["Stack"] = args.stack self["SyslogFacility"] = args.facility self["TruncateLog"] = args.truncate self["at-boot"] = args.at_boot in ["1", "yes"] self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["container-tests"] = args.container_tests self["experimental-tests"] = args.experimental_tests self["iterations"] = args.iterations self["loop-minutes"] = args.loop_minutes self["loop-tests"] = not args.no_loop_tests self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["node-limit"] = args.limit_nodes self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["valgrind-procs"] = args.valgrind_procs self["valgrind-tests"] = args.valgrind_tests self["warn-inactive"] = args.warn_inactive # Nodes and groups are mutually exclusive, so their defaults cannot be # set in their add_argument calls. Additionally, groups does more than # just set a value. Here, set nodes first and then if a group is # specified, override the previous nodes value. if args.nodes: self["nodes"] = args.nodes.split(" ") else: self["nodes"] = [] if args.group: self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group) LogFactory().add_file(self["OutputFile"], "CTS") dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group) if os.path.isfile(dsh_file): self["nodes"] = [] with open(dsh_file, "r") as f: for line in f: l = line.strip().rstrip() if not l.startswith('#'): self["nodes"].append(l) else: print("Unknown DSH group: %s" % args.dsh_group) # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.bsc: self["DoBSC"] = True self["scenario"] = "basic-sanity" if args.cib_filename: self["CIBfilename"] = args.cib_filename else: self["CIBfilename"] = None if args.choose: self["scenario"] = "sequence" self["tests"].append(args.choose) if args.fencing: if args.fencing in ["0", "no"]: self["DoFencing"] = False else: self["DoFencing"] = True if args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % ( os.environ['OS_REGION_NAME'], os.environ['OS_TENANT_NAME'], os.environ['OS_AUTH_URL'], os.environ['OS_USERNAME'], os.environ['OS_PASSWORD'] ) elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % ( os.environ['RHEVM_USERNAME'], os.environ['RHEVM_PASSWORD'], os.environ['RHEVM_SERVER'], os.environ['RHEVM_PORT'], ) if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["LogWatcher"] = "remote" else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.oprofile: self["oprofile"] = args.oprofile.split(" ") else: self["oprofile"] = [] if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True if args.qarsh: self.rsh.enable_qarsh() for kv in args.set: (name, value) = kv.split("=") self[name] = value print("Setting %s = %s" % (name, value)) class EnvFactory: instance = None def getInstance(self, args=None): if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance