diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py index 178ef9288c..acd94b9ae1 100644 --- a/cts/lab/CTSscenarios.py +++ b/cts/lab/CTSscenarios.py @@ -1,562 +1,562 @@ """ Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import sys import time from pacemaker._cts.audits import ClusterAudit from pacemaker._cts.tests.base import CTSTest from pacemaker._cts.watcher import LogWatcher class ScenarioComponent(object): def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return True if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' raise ValueError("Abstract Class member (IsApplicable)") def SetUp(self, CM): '''Set up the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' raise ValueError("Abstract Class member (Setup)") class Scenario(object): ( '''The basic idea of a scenario is that of an ordered list of ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, and then after the tests have been run, they are torn down using TearDown() (in reverse order). A Scenario is applicable to a particular cluster manager iff each ScenarioComponent is applicable. A partially set up scenario is torn down if it fails during setup. ''') def __init__(self, ClusterManager, Components, Audits, Tests): "Initialize the Scenario from the list of ScenarioComponents" self.ClusterManager = ClusterManager self.Components = Components self.Audits = Audits self.Tests = Tests self.BadNews = None self.TestSets = [] self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0} self.Sets = [] #self.ns=CTS.NodeStatus(self.Env) for comp in Components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in Audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in Tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def IsApplicable(self): ( '''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() ''' ) for comp in self.Components: if not comp.IsApplicable(): return None return True def SetUp(self): '''Set up the Scenario. Return TRUE on success.''' self.ClusterManager.prepare() self.audit() # Also detects remote/local log config self.ClusterManager.ns.wait_for_all_nodes(self.ClusterManager.Env["nodes"]) self.audit() self.ClusterManager.install_support() self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"], self.ClusterManager.templates.get_patterns("BadNews"), self.ClusterManager.Env["nodes"], self.ClusterManager.Env["LogWatcher"], "BadNews", 0) self.BadNews.set_watch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self.Components): if not self.Components[j].SetUp(self.ClusterManager): # OOPS! We failed. Tear partial setups down. self.audit() self.ClusterManager.log("Tearing down partial setup") self.TearDown(j) return None j = j + 1 self.audit() return 1 def TearDown(self, max=None): '''Tear Down the Scenario - in reverse order.''' if max == None: max = len(self.Components)-1 j = max while j >= 0: self.Components[j].TearDown(self.ClusterManager) j = j - 1 self.audit() self.ClusterManager.install_support("uninstall") def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def run(self, Iterations): self.ClusterManager.oprofileStart() try: self.run_loop(Iterations) self.ClusterManager.oprofileStop() except: self.ClusterManager.oprofileStop() raise def run_loop(self, Iterations): raise ValueError("Abstract Class member (run_loop)") def run_test(self, test, testcount): nodechoice = self.ClusterManager.Env.random_node() - ret = 1 + ret = True where = "" did_run = 0 self.ClusterManager.instance_errorstoignore_clear() self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]") starttime = test.set_timer() if not test.setup(nodechoice): self.ClusterManager.log("Setup failed") - ret = 0 + ret = False elif not test.can_run_now(nodechoice): self.ClusterManager.log("Skipped") test.skipped() else: did_run = 1 ret = test(nodechoice) if not test.teardown(nodechoice): self.ClusterManager.log("Teardown failed") if self.ClusterManager.Env["continue"]: answer = "Y" else: try: answer = input('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) - ret = 0 + ret = False stoptime = time.time() self.ClusterManager.oprofileSave(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if "min_time" not in test.stats: test.stats["elapsed_time"] = elapsed_time test.stats["min_time"] = test_time test.stats["max_time"] = test_time else: test.stats["elapsed_time"] = test.stats["elapsed_time"] + elapsed_time if test_time < test.stats["min_time"]: test.stats["min_time"] = test_time if test_time > test.stats["max_time"]: test.stats["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self.ClusterManager.statall() did_run = 1 # Force the test count to be incremented anyway so test extraction works self.audit(test.errors_to_ignore) return did_run def summarize(self): self.ClusterManager.log("****************") self.ClusterManager.log("Overall Results:" + repr(self.Stats)) self.ClusterManager.log("****************") stat_filter = { "calls":0, "failure":0, "skipped":0, "auditfail":0, } self.ClusterManager.log("Test Summary") for test in self.Tests: for key in list(stat_filter.keys()): stat_filter[key] = test.stats[key] self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter)) self.ClusterManager.debug("Detailed Results") for test in self.Tests: self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.stats)) self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, LocalIgnore=[]): errcount = 0 ignorelist = [] ignorelist.append("CTS:") ignorelist.extend(LocalIgnore) ignorelist.extend(self.ClusterManager.errorstoignore()) ignorelist.extend(self.ClusterManager.instance_errorstoignore()) # This makes sure everything is stabilized before starting... failed = 0 for audit in self.Audits: if not audit(): self.ClusterManager.log("Audit " + audit.name + " FAILED.") failed += 1 else: self.ClusterManager.debug("Audit " + audit.name + " passed.") while errcount < 1000: match = None if self.BadNews: match = self.BadNews.look(0) if match: add_err = 1 for ignore in ignorelist: if add_err == 1 and re.search(ignore, match): add_err = 0 if add_err == 1: self.ClusterManager.log("BadNews: " + match) self.incr("BadNews") errcount = errcount + 1 else: break else: if self.ClusterManager.Env["continue"]: answer = "Y" else: try: answer = input('Big problems. Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": self.ClusterManager.log("Shutting down.") self.summarize() self.TearDown() raise ValueError("Looks like we hit a BadNews jackpot!") if self.BadNews: self.BadNews.end() return failed class AllOnce(Scenario): '''Every Test Once''' # Accessable as __doc__ def run_loop(self, Iterations): testcount = 1 for test in self.Tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): '''Random Test Execution''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: test = self.ClusterManager.Env.random_gen.choice(self.Tests) self.run_test(test, testcount) testcount += 1 class BasicSanity(Scenario): '''Basic Cluster Sanity''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: test = self.Environment.random_gen.choice(self.Tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): '''Named Tests in Sequence''' def run_loop(self, Iterations): testcount = 1 while testcount <= Iterations: for test in self.Tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): '''Start the Cluster''' def run_loop(self, Iterations): testcount = 0 class BootCluster(ScenarioComponent): ( '''BootCluster is the most basic of ScenarioComponents. This ScenarioComponent simply starts the cluster manager on all the nodes. It is fairly robust as it waits for all nodes to come up before starting as they might have been rebooted or crashed for some reason beforehand. ''') def __init__(self, Env): pass def IsApplicable(self): '''BootCluster is so generic it is always Applicable''' return True def SetUp(self, CM): '''Basic Cluster Manager startup. Start everything''' CM.prepare() # Clear out the cobwebs ;-) CM.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all nodes.") return CM.startall(verbose=True, quick=True) def TearDown(self, CM, force=False): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Stopping Cluster Manager on all nodes") return CM.stopall(verbose=True, force=force) class LeaveBooted(BootCluster): def TearDown(self, CM): '''Set up the given ScenarioComponent''' # Stop the cluster manager everywhere CM.log("Leaving Cluster running on all nodes") return 1 class PingFest(ScenarioComponent): ( '''PingFest does a flood ping to each node in the cluster from the test machine. If the LabEnvironment Parameter PingSize is set, it will be used as the size of ping packet requested (via the -s option). If it is not set, it defaults to 1024 bytes. According to the manual page for ping: Outputs packets as fast as they come back or one hundred times per second, whichever is more. For every ECHO_REQUEST sent a period ``.'' is printed, while for every ECHO_REPLY received a backspace is printed. This provides a rapid display of how many packets are being dropped. Only the super-user may use this option. This can be very hard on a net- work and should be used with caution. ''' ) def __init__(self, Env): self.Env = Env def IsApplicable(self): '''PingFests are always applicable ;-) ''' return True def SetUp(self, CM): '''Start the PingFest!''' self.PingSize = 1024 if "PingSize" in list(CM.Env.keys()): self.PingSize = CM.Env["PingSize"] CM.log("Starting %d byte flood pings" % self.PingSize) self.PingPids = [] for node in CM.Env["nodes"]: self.PingPids.append(self._pingchild(node)) CM.log("Ping PIDs: " + repr(self.PingPids)) return 1 def TearDown(self, CM): '''Stop it right now! My ears are pinging!!''' for pid in self.PingPids: if pid != None: CM.log("Stopping ping process %d" % pid) os.kill(pid, signal.SIGKILL) def _pingchild(self, node): Args = ["ping", "-qfn", "-s", str(self.PingSize), node] sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid < 0: self.Env.log("Cannot fork ping child") return None if pid > 0: return pid # Otherwise, we're the child process. os.execvp("ping", Args) self.Env.log("Cannot execvp ping: " + repr(Args)) sys.exit(1) class BasicSanityCheck(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["DoBSC"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on BSC node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on BSC node(s).") return CM.stopall() class Benchmark(ScenarioComponent): ( ''' ''') def IsApplicable(self): return self.Env["benchmark"] def SetUp(self, CM): CM.prepare() # Clear out the cobwebs self.TearDown(CM, force=True) # Now start the Cluster Manager on all the nodes. CM.log("Starting Cluster Manager on all node(s).") return CM.startall() def TearDown(self, CM): CM.log("Stopping Cluster Manager on all node(s).") return CM.stopall() class RollingUpgrade(ScenarioComponent): ( ''' Test a rolling upgrade between two versions of the stack ''') def __init__(self, Env): self.Env = Env def IsApplicable(self): if not self.Env["rpm-dir"]: return None if not self.Env["current-version"]: return None if not self.Env["previous-version"]: return None return True def install(self, node, version): target_dir = "/tmp/rpm-%s" % version src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version) self.CM.rsh(node, "mkdir -p %s" % target_dir) rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir)) self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir)) return self.success() def upgrade(self, node): return self.install(node, self.CM.Env["current-version"]) def downgrade(self, node): return self.install(node, self.CM.Env["previous-version"]) def SetUp(self, CM): print(repr(self)+"prepare") CM.prepare() # Clear out the cobwebs CM.stopall(force=True) CM.log("Downgrading all nodes to %s." % self.Env["previous-version"]) for node in self.Env["nodes"]: if not self.downgrade(node): CM.log("Couldn't downgrade %s" % node) return None return 1 def TearDown(self, CM): # Stop everything CM.log("Stopping Cluster Manager on Upgrade nodes.") CM.stopall() CM.log("Upgrading all nodes to %s." % self.Env["current-version"]) for node in self.Env["nodes"]: if not self.upgrade(node): CM.log("Couldn't upgrade %s" % node) return None return 1 diff --git a/python/pacemaker/_cts/tests/base.py b/python/pacemaker/_cts/tests/base.py index 82de515cfd..b82650987c 100644 --- a/python/pacemaker/_cts/tests/base.py +++ b/python/pacemaker/_cts/tests/base.py @@ -1,942 +1,942 @@ """ Base classes for CTS tests """ __all__ = ["CTSTest", "RemoteDriver", "SimulStartLite", "SimulStopLite", "StartTest", "StopTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" # # SPECIAL NOTE: # # Tests may NOT implement any cluster-manager-specific code in them. # EXTEND the ClusterManager object to provide the base capabilities # the test needs if you need to do something that the current CM classes # do not. Otherwise you screw up the whole point of the object structure # in CTS. # # Thank you. # import os import re import time import subprocess import tempfile from pacemaker._cts.audits import AuditConstraint, AuditResource from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.timer import Timer from pacemaker._cts.watcher import LogWatcher # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): # pylint: disable=invalid-name self.audits = [] self.name = None self.templates = PatternSelector(cm["Name"]) self.stats = { "auditfail": 0, "calls": 0, "failure": 0, "skipped": 0, "success": 0 } self._cm = cm self._env = EnvFactory().getInstance() self._r_o2cb = None self._r_ocfs2 = [] self._rsh = RemoteFactory().getInstance() self._logger = LogFactory() self._timers = {} self.benchmark = True # which tests to benchmark self.failed = False self.is_container = False self.is_experimental = False self.is_loop = False self.is_unsafe = False self.is_valgrind = False self.passed = True def log(self, args): self._logger.log(args) def debug(self, args): self._logger.debug(args) def log_mark(self, msg): self.debug("MARK: test %s %s %d" % (self.name,msg,time.time())) def get_timer(self, key="test"): try: return self._timers[key].start_time except KeyError: return 0 def set_timer(self, key="test"): if key not in self._timers: self._timers[key] = Timer(self._logger, self.name, key) self._timers[key].start() return self._timers[key].start_time def log_timer(self, key="test"): if key not in self._timers: return elapsed = self._timers[key].elapsed self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed)) del self._timers[key] def incr(self, name): if name not in self.stats: self.stats[name] = 0 self.stats[name] += 1 # Reset the test passed boolean if name == "calls": self.passed = True def failure(self, reason="none"): '''Increment the failure count''' self.passed = False self.incr("failure") self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason) - return None + return False def success(self): '''Increment the success count''' self.incr("success") - return 1 + return True def skipped(self): '''Increment the skipped count''' self.incr("skipped") - return 1 + return True def __call__(self, node): """ Perform the given test """ raise NotImplementedError def audit(self): passed = 1 for audit in self.audits: if not audit(): self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name)) self.incr("auditfail") passed = 0 return passed def setup(self, node): '''Setup the given test''' # node is used in subclasses # pylint: disable=unused-argument return self.success() def teardown(self, node): '''Tear down the given test''' # node is used in subclasses # pylint: disable=unused-argument return self.success() def create_watch(self, patterns, timeout, name=None): if not name: name = self.name return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout) def local_badnews(self, prefix, watch, local_ignore=None): errcount = 0 if not prefix: prefix = "LocalBadNews:" ignorelist = [" CTS: ", prefix] if local_ignore: ignorelist += local_ignore while errcount < 100: match = watch.look(0) if match: add_err = True for ignore in ignorelist: if add_err and re.search(ignore, match): add_err = False if add_err: self._logger.log("%s %s" % (prefix, match)) errcount += 1 else: break else: self._logger.log("Too many errors!") watch.end() return errcount def is_applicable(self): return self.is_applicable_common() def is_applicable_common(self): '''Return True if we are applicable in the current test configuration''' if self.is_loop and not self._env["loop-tests"]: return False if self.is_unsafe and not self._env["unsafe-tests"]: return False if self.is_valgrind and not self._env["valgrind-tests"]: return False if self.is_experimental and not self._env["experimental-tests"]: return False if self.is_container and not self._env["container-tests"]: return False if self._env["benchmark"] and not self.benchmark: return False return True def _find_ocfs2_resources(self, node): self._r_o2cb = None self._r_ocfs2 = [] (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self._cm, line) if r.rtype == "o2cb" and r.parent != "NA": self.debug("Found o2cb: %s" % self._r_o2cb) self._r_o2cb = r.parent if re.search("^Constraint", line): c = AuditConstraint(self._cm, line) if c.type == "rsc_colocation" and c.target == self._r_o2cb: self._r_ocfs2.append(c.rsc) self.debug("Found ocfs2 filesystems: %s" % self._r_ocfs2) return len(self._r_ocfs2) def can_run_now(self, node): """ Return True if we can meaningfully run right now """ # node is used in subclasses # pylint: disable=unused-argument return True @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ return [] class RemoteDriver(CTSTest): def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "RemoteDriver" self._corosync_enabled = False self._pacemaker_enabled = False self._remote_node = None self._remote_rsc = "remote-rsc" self._start = StartTest(cm) self._startall = SimulStartLite(cm) self._stop = StopTest(cm) self.reset() def reset(self): self.failed = False self.fail_string = "" self._pcmk_started = False self._remote_node_added = False self._remote_rsc_added = False self._remote_use_reconnect_interval = self._env.random_gen.choice([True,False]) def fail(self, msg): """ Mark test as failed. """ self.failed = True # Always log the failure. self._logger.log(msg) # Use first failure as test status, as it's likely to be most useful. if not self.fail_string: self.fail_string = msg def _get_other_node(self, node): for othernode in self._env["nodes"]: if othernode == node: # we don't want to try and use the cib that we just shutdown. # find a cluster node that is not our soon to be remote-node. continue return othernode def _del_rsc(self, node, rsc): othernode = self._get_other_node(node) (rc, _) = self._rsh(othernode, "crm_resource -D -r %s -t primitive" % rsc) if rc != 0: self.fail("Removal of resource '%s' failed" % rsc) def _add_rsc(self, node, rsc_xml): othernode = self._get_other_node(node) (rc, _) = self._rsh(othernode, "cibadmin -C -o resources -X '%s'" % rsc_xml) if rc != 0: self.fail("resource creation failed") def _add_primitive_rsc(self, node): rsc_xml = """ """ % { "node": self._remote_rsc } self._add_rsc(node, rsc_xml) if not self.failed: self._remote_rsc_added = True def _add_connection_rsc(self, node): rsc_xml = """ """ % { "node": self._remote_node, "server": node } if self._remote_use_reconnect_interval: # Set reconnect interval on resource rsc_xml += """ """ % self._remote_node rsc_xml += """ """ % { "node": self._remote_node } self._add_rsc(node, rsc_xml) if not self.failed: self._remote_node_added = True def _disable_services(self, node): self._corosync_enabled = self._env.service_is_enabled(node, "corosync") if self._corosync_enabled: self._env.disable_service(node, "corosync") self._pacemaker_enabled = self._env.service_is_enabled(node, "pacemaker") if self._pacemaker_enabled: self._env.disable_service(node, "pacemaker") def _enable_services(self, node): if self._corosync_enabled: self._env.enable_service(node, "corosync") if self._pacemaker_enabled: self._env.enable_service(node, "pacemaker") def _stop_pcmk_remote(self, node): # disable pcmk remote for _ in range(10): (rc, _) = self._rsh(node, "service pacemaker_remote stop") if rc != 0: time.sleep(6) else: break def _start_pcmk_remote(self, node): for _ in range(10): (rc, _) = self._rsh(node, "service pacemaker_remote start") if rc != 0: time.sleep(6) else: self._pcmk_started = True break def _freeze_pcmk_remote(self, node): """ Simulate a Pacemaker Remote daemon failure. """ # We freeze the process. self._rsh(node, "killall -STOP pacemaker-remoted") def _resume_pcmk_remote(self, node): # We resume the process. self._rsh(node, "killall -CONT pacemaker-remoted") def _start_metal(self, node): # Cluster nodes are reused as remote nodes in remote tests. If cluster # services were enabled at boot, in case the remote node got fenced, the # cluster node would join instead of the expected remote one. Meanwhile # pacemaker_remote would not be able to start. Depending on the chances, # the situations might not be able to be orchestrated gracefully any more. # # Temporarily disable any enabled cluster serivces. self._disable_services(node) # make sure the resource doesn't already exist for some reason self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_rsc) self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_node) if not self._stop(node): self.fail("Failed to shutdown cluster node %s" % node) return self._start_pcmk_remote(node) if not self._pcmk_started: self.fail("Failed to start pacemaker_remote on node %s" % node) return # Convert node to baremetal now that it has shutdown the cluster stack pats = [ self.templates["Pat:RscOpOK"] % ("start", self._remote_node), self.templates["Pat:DC_IDLE"] ] watch = self.create_watch(pats, 120) watch.set_watch() self._add_connection_rsc(node) with Timer(self._logger, self.name, "remoteMetalInit"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def migrate_connection(self, node): if self.failed: return pats = [ self.templates["Pat:RscOpOK"] % ("migrate_to", self._remote_node), self.templates["Pat:RscOpOK"] % ("migrate_from", self._remote_node), self.templates["Pat:DC_IDLE"] ] watch = self.create_watch(pats, 120) watch.set_watch() (rc, _) = self._rsh(node, "crm_resource -M -r %s" % self._remote_node, verbose=1) if rc != 0: self.fail("failed to move remote node connection resource") return with Timer(self._logger, self.name, "remoteMetalMigrate"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return def fail_rsc(self, node): if self.failed: return watchpats = [ self.templates["Pat:RscRemoteOpOK"] % ("stop", self._remote_rsc, self._remote_node), self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node), self.templates["Pat:DC_IDLE"] ] watch = self.create_watch(watchpats, 120) watch.set_watch() self.debug("causing dummy rsc to fail.") self._rsh(node, "rm -f /var/run/resource-agents/Dummy*") with Timer(self._logger, self.name, "remoteRscFail"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched) def fail_connection(self, node): if self.failed: return watchpats = [ self.templates["Pat:Fencing_ok"] % self._remote_node, self.templates["Pat:NodeFenced"] % self._remote_node ] watch = self.create_watch(watchpats, 120) watch.set_watch() # freeze the pcmk remote daemon. this will result in fencing self.debug("Force stopped active remote node") self._freeze_pcmk_remote(node) self.debug("Waiting for remote node to be fenced.") with Timer(self._logger, self.name, "remoteMetalFence"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return self.debug("Waiting for the remote node to come back up") self._cm.ns.wait_for_node(node, 120) pats = [ self.templates["Pat:RscOpOK"] % ("start", self._remote_node) ] if self._remote_rsc_added: pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node)) watch = self.create_watch([], 240) watch.set_watch() # start the remote node again watch it integrate back into cluster. self._start_pcmk_remote(node) if not self._pcmk_started: self.fail("Failed to start pacemaker_remote on node %s" % node) return self.debug("Waiting for remote node to rejoin cluster after being fenced.") with Timer(self._logger, self.name, "remoteMetalRestart"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) return def _add_dummy_rsc(self, node): if self.failed: return # verify we can put a resource on the remote node pats = [ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node), self.templates["Pat:DC_IDLE"] ] watch = self.create_watch(pats, 120) watch.set_watch() # Add a resource that must live on remote-node self._add_primitive_rsc(node) # force that rsc to prefer the remote node. (rc, _) = self._cm.rsh(node, "crm_resource -M -r %s -N %s -f" % (self._remote_rsc, self._remote_node), verbose=1) if rc != 0: self.fail("Failed to place remote resource on remote node.") return with Timer(self._logger, self.name, "remoteMetalRsc"): watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) def test_attributes(self, node): if self.failed: return # This verifies permanent attributes can be set on a remote-node. It also # verifies the remote-node can edit its own cib node section remotely. (rc, line) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % self._remote_node, verbose=1) if rc != 0: self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line)) return (rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % self._remote_node, verbose=1) if rc != 0: self.fail("Failed to get remote-node attribute") return (rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % self._remote_node, verbose=1) if rc != 0: self.fail("Failed to delete remote-node attribute") return def cleanup_metal(self, node): self._enable_services(node) if not self._pcmk_started: return pats = [ ] watch = self.create_watch(pats, 120) watch.set_watch() if self._remote_rsc_added: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_rsc)) if self._remote_node_added: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_node)) with Timer(self._logger, self.name, "remoteMetalCleanup"): self._resume_pcmk_remote(node) if self._remote_rsc_added: # Remove dummy resource added for remote node tests self.debug("Cleaning up dummy rsc put on remote node") self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_rsc) self._del_rsc(node, self._remote_rsc) if self._remote_node_added: # Remove remote node's connection resource self.debug("Cleaning up remote node connection resource") self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_node) self._del_rsc(node, self._remote_node) watch.look_for_all() if watch.unmatched: self.fail("Unmatched patterns: %s" % watch.unmatched) self._stop_pcmk_remote(node) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() if self._remote_node_added: # Remove remote node itself self.debug("Cleaning up node entry for remote node") self._rsh(self._get_other_node(node), "crm_node --force --remove %s" % self._remote_node) def _setup_env(self, node): self._remote_node = "remote-%s" % node # we are assuming if all nodes have a key, that it is # the right key... If any node doesn't have a remote # key, we regenerate it everywhere. if self._rsh.exists_on_all("/etc/pacemaker/authkey", self._env["nodes"]): return # create key locally (handle, keyfile) = tempfile.mkstemp(".cts") os.close(handle) subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # sync key throughout the cluster for n in self._env["nodes"]: self._rsh(n, "mkdir -p --mode=0750 /etc/pacemaker") self._rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % n) self._rsh(n, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey") self._rsh(n, "chmod 0640 /etc/pacemaker/authkey") os.unlink(keyfile) def is_applicable(self): if not self.is_applicable_common(): return False for node in self._env["nodes"]: (rc, _) = self._rsh(node, "which pacemaker-remoted >/dev/null 2>&1") if rc != 0: return False return True def start_new_test(self, node): self.incr("calls") self.reset() ret = self._startall(None) if not ret: return self.failure("setup failed: could not start all nodes") self._setup_env(node) self._start_metal(node) self._add_dummy_rsc(node) return True def __call__(self, node): """ Perform the given test """ raise NotImplementedError @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ return [ r"""is running on remote.*which isn't allowed""", r"""Connection terminated""", r"""Could not send remote""" ] class SimulStartLite(CTSTest): '''Start any stopped nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SimulStartLite" def __call__(self, dummy): '''Perform the 'SimulStartList' setup work. ''' self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... node_list = [] for node in self._env["nodes"]: if self._cm.ShouldBeStatus[node] == "down": self.incr("WasStopped") node_list.append(node) self.set_timer() while len(node_list) > 0: # Repeat until all nodes come up uppat = self.templates["Pat:NonDC_started"] if self._cm.upcount() == 0: uppat = self.templates["Pat:Local_started"] watchpats = [ self.templates["Pat:DC_IDLE"] ] for node in node_list: watchpats.extend([uppat % node, self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node]) # Start all the nodes - at about the same time... watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() stonith = self._cm.prepare_fencing_watcher(self.name) for node in node_list: self._cm.StartaCMnoBlock(node) watch.look_for_all() node_list = self._cm.fencing_cleanup(self.name, stonith) if node_list is None: return self.failure("Cluster did not stabilize") # Remove node_list messages from watch.unmatched for node in node_list: self._logger.debug("Dealing with stonith operations for %s" % node_list) if watch.unmatched: try: watch.unmatched.remove(uppat % node) except ValueError: self.debug("Already matched: %s" % (uppat % node)) try: watch.unmatched.remove(self.templates["Pat:InfraUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node)) try: watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node)) if watch.unmatched: for regex in watch.unmatched: self._logger.log ("Warn: Startup pattern not found: %s" % regex) if not self._cm.cluster_stable(): return self.failure("Cluster did not stabilize") did_fail = False unstable = [] for node in self._env["nodes"]: if self._cm.StataCM(node) == 0: did_fail = True unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: %s" % unstable) unstable = [] for node in self._env["nodes"]: if not self._cm.node_stable(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: %s" % unstable) return self.success() def is_applicable(self): '''SimulStartLite is a setup test and never applicable''' return False class SimulStopLite(CTSTest): '''Stop any active nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name = "SimulStopLite" def __call__(self, dummy): '''Perform the 'SimulStopLite' setup work. ''' self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... watchpats = [] for node in self._env["nodes"]: if self._cm.ShouldBeStatus[node] == "up": self.incr("WasStarted") watchpats.append(self.templates["Pat:We_stopped"] % node) if len(watchpats) == 0: return self.success() # Stop all the nodes - at about the same time... watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() self.set_timer() for node in self._env["nodes"]: if self._cm.ShouldBeStatus[node] == "up": self._cm.StopaCMnoBlock(node) if watch.look_for_all(): # Make sure they're completely down with no residule for node in self._env["nodes"]: self._rsh(node, self.templates["StopCmd"]) return self.success() did_fail = False up_nodes = [] for node in self._env["nodes"]: if self._cm.StataCM(node) == 1: did_fail = True up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: %s" % up_nodes) self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched) return self.failure("Missing log message: %s " % watch.unmatched) def is_applicable(self): '''SimulStopLite is a setup test and never applicable''' return False class StartTest(CTSTest): '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name = "Start" self.debug = debug def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self._cm.upcount() == 0: self.incr("us") else: self.incr("them") if self._cm.ShouldBeStatus[node] != "down": return self.skipped() if self._cm.StartaCM(node): return self.success() return self.failure("Startup %s on node %s failed" % (self._env["Name"], node)) class StopTest(CTSTest): '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Stop" def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self._cm.ShouldBeStatus[node] != "up": return self.skipped() # Technically we should always be able to notice ourselves stopping patterns = [ self.templates["Pat:We_stopped"] % node ] # Any active node needs to notice this one left # (note that this won't work if we have multiple partitions) for other in self._env["nodes"]: if self._cm.ShouldBeStatus[other] == "up" and other != node: patterns.append(self.templates["Pat:They_stopped"] %(other, self._cm.key_for_node(node))) watch = self.create_watch(patterns, self._env["DeadTime"]) watch.set_watch() if node == self._cm.OurNode: self.incr("us") else: if self._cm.upcount() <= 1: self.incr("all") else: self.incr("them") self._cm.StopaCM(node) watch.look_for_all() failreason = None unmatched_str = "||" if watch.unmatched: (_, output) = self._rsh(node, "/bin/ps axf", verbose=1) for line in output: self.debug(line) (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) for line in output: self.debug(line) for regex in watch.unmatched: self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex) unmatched_str += "%s||" % regex failreason = "Missing shutdown pattern" self._cm.cluster_stable(self._env["DeadTime"]) if not watch.unmatched or self._cm.upcount() == 0: return self.success() if len(watch.unmatched) >= self._cm.upcount(): return self.failure("no match against (%s)" % unmatched_str) if failreason is None: return self.success() return self.failure(failreason)