diff --git a/cts/lab/CM_corosync.py b/cts/lab/CM_corosync.py index dce7e98fd8..9a79e46d0a 100644 --- a/cts/lab/CM_corosync.py +++ b/cts/lab/CM_corosync.py @@ -1,60 +1,60 @@ """ Corosync-specific class for Pacemaker's Cluster Test Suite (CTS) """ __copyright__ = "Copyright 2007-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" -from cts.ClusterManager import ClusterManager - from pacemaker._cts.CTS import Process +from pacemaker._cts.clustermanager import ClusterManager from pacemaker._cts.patterns import PatternSelector class crm_corosync(ClusterManager): ''' Corosync version 2 cluster manager class ''' def __init__(self, name=None): if not name: name="crm-corosync" ClusterManager.__init__(self) self.fullcomplist = {} self.templates = PatternSelector(self.name) - def Components(self): + @property + def components(self): complist = [] if not len(list(self.fullcomplist.keys())): for c in [ "pacemaker-based", "pacemaker-controld", "pacemaker-attrd", "pacemaker-execd", "pacemaker-fenced" ]: self.fullcomplist[c] = Process( self, c, pats = self.templates.get_component(c), badnews_ignore = self.templates.get_component("%s-ignore" % c) + self.templates.get_component("common-ignore")) # the scheduler uses dc_pats instead of pats self.fullcomplist["pacemaker-schedulerd"] = Process( self, "pacemaker-schedulerd", dc_pats = self.templates.get_component("pacemaker-schedulerd"), badnews_ignore = self.templates.get_component("pacemaker-schedulerd-ignore") + self.templates.get_component("common-ignore")) # add (or replace) extra components self.fullcomplist["corosync"] = Process( self, "corosync", pats = self.templates.get_component("corosync"), badnews_ignore = self.templates.get_component("corosync-ignore") + self.templates.get_component("common-ignore") ) # Processes running under valgrind can't be shot with "killall -9 processname", # so don't include them in the returned list - vgrind = self.Env["valgrind-procs"].split() + vgrind = self.env["valgrind-procs"].split() for key in list(self.fullcomplist.keys()): - if self.Env["valgrind-tests"]: + if self.env["valgrind-tests"]: if key in vgrind: self.log("Filtering %s from the component list as it is being profiled by valgrind" % key) continue - if key == "pacemaker-fenced" and not self.Env["DoFencing"]: + if key == "pacemaker-fenced" and not self.env["DoFencing"]: continue complist.append(self.fullcomplist[key]) return complist diff --git a/cts/lab/ClusterManager.py b/cts/lab/ClusterManager.py deleted file mode 100644 index 4f199fc694..0000000000 --- a/cts/lab/ClusterManager.py +++ /dev/null @@ -1,935 +0,0 @@ -""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS) -""" - -__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors. -Certain portions by Huang Zhen are copyright 2004 -International Business Machines. The version control history for this file -may have further details.""" -__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" - -import os -import re -import time - -from collections import UserDict - -from pacemaker.buildoptions import BuildOptions -from pacemaker._cts.CTS import NodeStatus, Process -from pacemaker._cts.audits import AuditResource -from pacemaker._cts.cib import ConfigFactory -from pacemaker._cts.environment import EnvFactory -from pacemaker._cts.logging import LogFactory -from pacemaker._cts.patterns import PatternSelector -from pacemaker._cts.remote import RemoteFactory -from pacemaker._cts.watcher import LogWatcher - -class ClusterManager(UserDict): - '''The Cluster Manager class. - This is an subclass of the Python dictionary class. - (this is because it contains lots of {name,value} pairs, - not because it's behavior is that terribly similar to a - dictionary in other ways.) - - This is an abstract class which class implements high-level - operations on the cluster and/or its cluster managers. - Actual cluster managers classes are subclassed from this type. - - One of the things we do is track the state we think every node should - be in. - ''' - - def __InitialConditions(self): - #if os.geteuid() != 0: - # raise ValueError("Must Be Root!") - None - - def _finalConditions(self): - for key in list(self.keys()): - if self[key] == None: - raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") - - def __init__(self): - self.Env = EnvFactory().getInstance() - self.templates = PatternSelector(self.Env["Name"]) - self.__InitialConditions() - self.logger = LogFactory() - self.TestLoggingLevel=0 - self.data = {} - self.name = self.Env["Name"] - - self.rsh = RemoteFactory().getInstance() - self.ShouldBeStatus={} - self.ns = NodeStatus(self.Env) - self.OurNode = os.uname()[1].lower() - self.__instance_errorstoignore = [] - - self.cib_installed = 0 - self.config = None - self.use_short_names = 1 - - self._finalConditions() - - self.check_transitions = 0 - self.check_elections = 0 - self.CIBsync = {} - self.CibFactory = ConfigFactory(self) - self.cib = self.CibFactory.create_config(self.Env["Schema"]) - - def __getitem__(self, key): - if key == "Name": - return self.name - - print("FIXME: Getting %s from %s" % (key, repr(self))) - if key in self.data: - return self.data[key] - - return self.templates.get_patterns(key) - - def __setitem__(self, key, value): - print("FIXME: Setting %s=%s on %s" % (key, value, repr(self))) - self.data[key] = value - - def key_for_node(self, node): - return node - - def instance_errorstoignore_clear(self): - '''Allows the test scenario to reset instance errors to ignore on each iteration.''' - self.__instance_errorstoignore = [] - - def instance_errorstoignore(self): - '''Return list of errors which are 'normal' for a specific test instance''' - return self.__instance_errorstoignore - - def log(self, args): - self.logger.log(args) - - def debug(self, args): - self.logger.debug(args) - - def upcount(self): - '''How many nodes are up?''' - count = 0 - for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up": - count = count + 1 - return count - - def install_support(self, command="install"): - for node in self.Env["nodes"]: - self.rsh(node, BuildOptions.DAEMON_DIR + "/cts-support " + command) - - def prepare_fencing_watcher(self, name): - # If we don't have quorum now but get it as a result of starting this node, - # then a bunch of nodes might get fenced - upnode = None - if self.HasQuorum(None): - self.debug("Have quorum") - return None - - if not self.templates["Pat:Fencing_start"]: - print("No start pattern") - return None - - if not self.templates["Pat:Fencing_ok"]: - print("No ok pattern") - return None - - stonith = None - stonithPats = [] - for peer in self.Env["nodes"]: - if self.ShouldBeStatus[peer] != "up": - stonithPats.append(self.templates["Pat:Fencing_ok"] % peer) - stonithPats.append(self.templates["Pat:Fencing_start"] % peer) - - stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0) - stonith.set_watch() - return stonith - - def fencing_cleanup(self, node, stonith): - peer_list = [] - peer_state = {} - - self.debug("Looking for nodes that were fenced as a result of %s starting" % node) - - # If we just started a node, we may now have quorum (and permission to fence) - if not stonith: - self.debug("Nothing to do") - return peer_list - - q = self.HasQuorum(None) - if not q and len(self.Env["nodes"]) > 2: - # We didn't gain quorum - we shouldn't have shot anyone - self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"]))) - return peer_list - - for n in self.Env["nodes"]: - peer_state[n] = "unknown" - - # Now see if any states need to be updated - self.debug("looking for: " + repr(stonith.regexes)) - shot = stonith.look(0) - while shot: - line = repr(shot) - self.debug("Found: " + line) - del stonith.regexes[stonith.whichmatch] - - # Extract node name - for n in self.Env["nodes"]: - if re.search(self.templates["Pat:Fencing_ok"] % n, shot): - peer = n - peer_state[peer] = "complete" - self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer) - - elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): - # TODO: Correctly detect multiple fencing operations for the same host - peer = n - peer_state[peer] = "in-progress" - self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer) - - if not peer: - self.logger.log("ERROR: Unknown stonith match: %s" % line) - - elif not peer in peer_list: - self.debug("Found peer: " + peer) - peer_list.append(peer) - - # Get the next one - shot = stonith.look(60) - - for peer in peer_list: - - self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) - if self.Env["at-boot"]: - self.ShouldBeStatus[peer] = "up" - else: - self.ShouldBeStatus[peer] = "down" - - if peer_state[peer] == "in-progress": - # Wait for any in-progress operations to complete - shot = stonith.look(60) - while len(stonith.regexes) and shot: - line = repr(shot) - self.debug("Found: " + line) - del stonith.regexes[stonith.whichmatch] - shot = stonith.look(60) - - # Now make sure the node is alive too - self.ns.wait_for_node(peer, self.Env["DeadTime"]) - - # Poll until it comes up - if self.Env["at-boot"]: - if not self.StataCM(peer): - time.sleep(self.Env["StartTime"]) - - if not self.StataCM(peer): - self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) - return None - - return peer_list - - def StartaCM(self, node, verbose=False): - - '''Start up the cluster manager on a given node''' - if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node)) - else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) - ret = 1 - - if not node in self.ShouldBeStatus: - self.ShouldBeStatus[node] = "down" - - if self.ShouldBeStatus[node] != "down": - return 1 - - patterns = [] - # Technically we should always be able to notice ourselves starting - patterns.append(self.templates["Pat:Local_started"] % node) - if self.upcount() == 0: - patterns.append(self.templates["Pat:DC_started"] % node) - else: - patterns.append(self.templates["Pat:NonDC_started"] % node) - - watch = LogWatcher( - self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10) - - self.install_config(node) - - self.ShouldBeStatus[node] = "any" - if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): - self.logger.log ("%s was already started" % (node)) - return 1 - - stonith = self.prepare_fencing_watcher(node) - watch.set_watch() - - (rc, _) = self.rsh(node, self.templates["StartCmd"]) - if rc != 0: - self.logger.log ("Warn: Start command failed on node %s" % (node)) - self.fencing_cleanup(node, stonith) - return None - - self.ShouldBeStatus[node] = "up" - watch_result = watch.look_for_all() - - if watch.unmatched: - for regex in watch.unmatched: - self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) - - if watch_result and self.cluster_stable(self.Env["DeadTime"]): - #self.debug("Found match: "+ repr(watch_result)) - self.fencing_cleanup(node, stonith) - return 1 - - elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): - self.fencing_cleanup(node, stonith) - return 1 - - self.logger.log ("Warn: Start failed for node %s" % (node)) - return None - - def StartaCMnoBlock(self, node, verbose=False): - - '''Start up the cluster manager on a given node with none-block mode''' - - if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node)) - else: self.debug("Starting %s on node %s" % (self["Name"], node)) - - self.install_config(node) - self.rsh(node, self.templates["StartCmd"], synchronous=False) - self.ShouldBeStatus[node] = "up" - return 1 - - def StopaCM(self, node, verbose=False, force=False): - - '''Stop the cluster manager on a given node''' - - if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node)) - else: self.debug("Stopping %s on node %s" % (self["Name"], node)) - - if self.ShouldBeStatus[node] != "up" and force == False: - return 1 - - (rc, _) = self.rsh(node, self.templates["StopCmd"]) - if rc == 0: - # Make sure we can continue even if corosync leaks - # fdata-* is the old name - #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*") - self.ShouldBeStatus[node] = "down" - self.cluster_stable(self.Env["DeadTime"]) - return 1 - else: - self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) - - return None - - def StopaCMnoBlock(self, node): - - '''Stop the cluster manager on a given node with none-block mode''' - - self.debug("Stopping %s on node %s" % (self["Name"], node)) - - self.rsh(node, self.templates["StopCmd"], synchronous=False) - self.ShouldBeStatus[node] = "down" - return 1 - - def RereadCM(self, node): - - '''Force the cluster manager on a given node to reread its config - This may be a no-op on certain cluster managers. - ''' - (rc, _) = self.rsh(node, self.templates["RereadCmd"]) - if rc == 0: - return 1 - else: - self.logger.log ("Could not force %s on node %s to reread its config" - % (self["Name"], node)) - return None - - def startall(self, nodelist=None, verbose=False, quick=False): - - '''Start the cluster manager on every node in the cluster. - We can do it on a subset of the cluster if nodelist is not None. - ''' - map = {} - if not nodelist: - nodelist = self.Env["nodes"] - - for node in nodelist: - if self.ShouldBeStatus[node] == "down": - self.ns.wait_for_all_nodes(nodelist, 300) - - if not quick: - # This is used for "basic sanity checks", so only start one node ... - if not self.StartaCM(node, verbose=verbose): - return 0 - return 1 - - # Approximation of SimulStartList for --boot - watchpats = [ ] - watchpats.append(self.templates["Pat:DC_IDLE"]) - for node in nodelist: - watchpats.append(self.templates["Pat:InfraUp"] % node) - watchpats.append(self.templates["Pat:PacemakerUp"] % node) - watchpats.append(self.templates["Pat:Local_started"] % node) - watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node)) - - # Start all the nodes - at about the same time... - watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10) - watch.set_watch() - - if not self.StartaCM(nodelist[0], verbose=verbose): - return 0 - for node in nodelist: - self.StartaCMnoBlock(node, verbose=verbose) - - watch.look_for_all() - if watch.unmatched: - for regex in watch.unmatched: - self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) - - if not self.cluster_stable(): - self.logger.log("Cluster did not stabilize") - return 0 - - return 1 - - def stopall(self, nodelist=None, verbose=False, force=False): - - '''Stop the cluster managers on every node in the cluster. - We can do it on a subset of the cluster if nodelist is not None. - ''' - - ret = 1 - map = {} - if not nodelist: - nodelist = self.Env["nodes"] - for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up" or force == True: - if not self.StopaCM(node, verbose=verbose, force=force): - ret = 0 - return ret - - def rereadall(self, nodelist=None): - - '''Force the cluster managers on every node in the cluster - to reread their config files. We can do it on a subset of the - cluster if nodelist is not None. - ''' - - map = {} - if not nodelist: - nodelist = self.Env["nodes"] - for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up": - self.RereadCM(node) - - def statall(self, nodelist=None): - - '''Return the status of the cluster managers in the cluster. - We can do it on a subset of the cluster if nodelist is not None. - ''' - - result = {} - if not nodelist: - nodelist = self.Env["nodes"] - for node in nodelist: - if self.StataCM(node): - result[node] = "up" - else: - result[node] = "down" - return result - - def isolate_node(self, target, nodes=None): - '''isolate the communication between the nodes''' - if not nodes: - nodes = self.Env["nodes"] - - for node in nodes: - if node != target: - (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node)) - if rc != 0: - self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) - return None - else: - self.debug("Communication cut between %s and %s" % (target, node)) - return 1 - - def unisolate_node(self, target, nodes=None): - '''fix the communication between the nodes''' - if not nodes: - nodes = self.Env["nodes"] - - for node in nodes: - if node != target: - restored = 0 - - # Limit the amount of time we have asynchronous connectivity for - # Restore both sides as simultaneously as possible - self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False) - self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False) - self.debug("Communication restored between %s and %s" % (target, node)) - - def oprofileStart(self, node=None): - if not node: - for n in self.Env["oprofile"]: - self.oprofileStart(n) - - elif node in self.Env["oprofile"]: - self.debug("Enabling oprofile on %s" % node) - self.rsh(node, "opcontrol --init") - self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") - self.rsh(node, "opcontrol --start") - self.rsh(node, "opcontrol --reset") - - def oprofileSave(self, test, node=None): - if not node: - for n in self.Env["oprofile"]: - self.oprofileSave(test, n) - - elif node in self.Env["oprofile"]: - self.rsh(node, "opcontrol --dump") - self.rsh(node, "opcontrol --save=cts.%d" % test) - # Read back with: opreport -l session:cts.0 image:/c* - if None: - self.rsh(node, "opcontrol --reset") - else: - self.oprofileStop(node) - self.oprofileStart(node) - - def oprofileStop(self, node=None): - if not node: - for n in self.Env["oprofile"]: - self.oprofileStop(n) - - elif node in self.Env["oprofile"]: - self.debug("Stopping oprofile on %s" % node) - self.rsh(node, "opcontrol --reset") - self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") - - def errorstoignore(self): - # At some point implement a more elegant solution that - # also produces a report at the end - """ Return a list of known error messages that should be ignored """ - return self.templates.get_patterns("BadNewsIgnore") - - def install_config(self, node): - if not self.ns.wait_for_node(node): - self.log("Node %s is not up." % node) - return None - - if not node in self.CIBsync and self.Env["ClobberCIB"]: - self.CIBsync[node] = 1 - self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*") - - # Only install the CIB on the first node, all the other ones will pick it up from there - if self.cib_installed == 1: - return None - - self.cib_installed = 1 - if self.Env["CIBfilename"] == None: - self.log("Installing Generated CIB on node %s" % (node)) - self.cib.install(node) - - else: - self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node)) - if self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) != 0: - raise ValueError("Can not scp file to %s %d"%(node)) - - self.rsh(node, "chown " + BuildOptions.DAEMON_USER + " " + BuildOptions.CIB_DIR + "/cib.xml") - - def prepare(self): - '''Finish the Initialization process. Prepare to test...''' - - self.partitions_expected = 1 - for node in self.Env["nodes"]: - self.ShouldBeStatus[node] = "" - if self.Env["experimental-tests"]: - self.unisolate_node(node) - self.StataCM(node) - - def test_node_CM(self, node): - '''Report the status of the cluster manager on a given node''' - - watchpats = [ ] - watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") - watchpats.append(self.templates["Pat:NonDC_started"] % node) - watchpats.append(self.templates["Pat:DC_started"] % node) - idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle") - idle_watch.set_watch() - - (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) - - if not out: - out = "" - else: - out = out[0].strip() - - self.debug("Node %s status: '%s'" %(node, out)) - - if out.find('ok') < 0: - if self.ShouldBeStatus[node] == "up": - self.log( - "Node status for %s is %s but we think it should be %s" - % (node, "down", self.ShouldBeStatus[node])) - self.ShouldBeStatus[node] = "down" - return 0 - - if self.ShouldBeStatus[node] == "down": - self.log( - "Node status for %s is %s but we think it should be %s: %s" - % (node, "up", self.ShouldBeStatus[node], out)) - - self.ShouldBeStatus[node] = "up" - - # check the output first - because syslog-ng loses messages - if out.find('S_NOT_DC') != -1: - # Up and stable - return 2 - if out.find('S_IDLE') != -1: - # Up and stable - return 2 - - # fall back to syslog-ng and wait - if not idle_watch.look(): - # just up - self.debug("Warn: Node %s is unstable: %s" % (node, out)) - return 1 - - # Up and stable - return 2 - - # Is the node up or is the node down - def StataCM(self, node): - '''Report the status of the cluster manager on a given node''' - - if self.test_node_CM(node) > 0: - return 1 - return None - - # Being up and being stable is not the same question... - def node_stable(self, node): - '''Report the status of the cluster manager on a given node''' - - if self.test_node_CM(node) == 2: - return 1 - self.log("Warn: Node %s not stable" % (node)) - return None - - def partition_stable(self, nodes, timeout=None): - watchpats = [ ] - watchpats.append("Current ping state: S_IDLE") - watchpats.append(self.templates["Pat:DC_IDLE"]) - self.debug("Waiting for cluster stability...") - - if timeout == None: - timeout = self.Env["DeadTime"] - - if len(nodes) < 3: - self.debug("Cluster is inactive") - return 1 - - idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout) - idle_watch.set_watch() - - for node in nodes.split(): - # have each node dump its current state - self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) - - ret = idle_watch.look() - while ret: - self.debug(ret) - for node in nodes.split(): - if re.search(node, ret): - return 1 - ret = idle_watch.look() - - self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout)) - return None - - def cluster_stable(self, timeout=None, double_check=False): - partitions = self.find_partitions() - - for partition in partitions: - if not self.partition_stable(partition, timeout): - return None - - if double_check: - # Make sure we are really stable and that all resources, - # including those that depend on transient node attributes, - # are started if they were going to be - time.sleep(5) - for partition in partitions: - if not self.partition_stable(partition, timeout): - return None - - return 1 - - def is_node_dc(self, node, status_line=None): - rc = 0 - - if not status_line: - (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) - - if out: - status_line = out[0].strip() - - if not status_line: - rc = 0 - elif status_line.find('S_IDLE') != -1: - rc = 1 - elif status_line.find('S_INTEGRATION') != -1: - rc = 1 - elif status_line.find('S_FINALIZE_JOIN') != -1: - rc = 1 - elif status_line.find('S_POLICY_ENGINE') != -1: - rc = 1 - elif status_line.find('S_TRANSITION_ENGINE') != -1: - rc = 1 - - return rc - - def active_resources(self, node): - (_, output) = self.rsh(node, "crm_resource -c", verbose=1) - resources = [] - for line in output: - if re.search("^Resource", line): - tmp = AuditResource(self, line) - if tmp.type == "primitive" and tmp.host == node: - resources.append(tmp.id) - return resources - - def ResourceLocation(self, rid): - ResourceNodes = [] - for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up": - - cmd = self.templates["RscRunning"] % (rid) - (rc, lines) = self.rsh(node, cmd) - - if rc == 127: - self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) - for line in lines: - self.log("Output: "+line) - elif rc == 0: - ResourceNodes.append(node) - - return ResourceNodes - - def find_partitions(self): - ccm_partitions = [] - - for node in self.Env["nodes"]: - if self.ShouldBeStatus[node] == "up": - (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) - - if not out: - self.log("no partition details for %s" % node) - continue - - partition = out[0].strip() - - if len(partition) > 2: - nodes = partition.split() - nodes.sort() - partition = ' '.join(nodes) - - found = 0 - for a_partition in ccm_partitions: - if partition == a_partition: - found = 1 - if found == 0: - self.debug("Adding partition from %s: %s" % (node, partition)) - ccm_partitions.append(partition) - else: - self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) - - else: - self.log("bad partition details for %s" % node) - else: - self.debug("Node %s is down... skipping" % node) - - self.debug("Found partitions: %s" % repr(ccm_partitions) ) - return ccm_partitions - - def HasQuorum(self, node_list): - # If we are auditing a partition, then one side will - # have quorum and the other not. - # So the caller needs to tell us which we are checking - # If no value for node_list is specified... assume all nodes - if not node_list: - node_list = self.Env["nodes"] - - for node in node_list: - if self.ShouldBeStatus[node] == "up": - (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) - quorum = quorum[0].strip() - - if quorum.find("1") != -1: - return 1 - elif quorum.find("0") != -1: - return 0 - else: - self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum) - - return 0 - - def Components(self): - complist = [] - common_ignore = [ - "Pending action:", - "(ERROR|error): crm_log_message_adv:", - "(ERROR|error): MSG: No message to dump", - "pending LRM operations at shutdown", - "Lost connection to the CIB manager", - "Connection to the CIB terminated...", - "Sending message to the CIB manager FAILED", - "Action A_RECOVER .* not supported", - "(ERROR|error): stonithd_op_result_ready: not signed on", - "pingd.*(ERROR|error): send_update: Could not send update", - "send_ipc_message: IPC Channel to .* is not connected", - "unconfirmed_actions: Waiting on .* unconfirmed actions", - "cib_native_msgready: Message pending on command channel", - r": Performing A_EXIT_1 - forcefully exiting ", - r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.", - ] - - stonith_ignore = [ - r"Updating failcount for child_DoFencing", - r"error.*: Fencer connection failed \(will retry\)", - "pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.", - ] - - stonith_ignore.extend(common_ignore) - - ccm = Process(self, "ccm", pats = [ - "State transition .* S_RECOVERY", - "pacemaker-controld.*Action A_RECOVER .* not supported", - r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", - r"pacemaker-controld.*: Could not recover from internal error", - "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", - # these status numbers are likely wrong now - r"pacemaker-controld.*exited with status 2", - r"attrd.*exited with status 1", - r"cib.*exited with status 2", - -# Not if it was fenced -# "A new node joined the cluster", - -# "WARN: determine_online_status: Node .* is unclean", -# "Scheduling node .* for fencing", -# "Executing .* fencing operation", -# "tengine_stonith_callback: .*result=0", -# "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE", -# "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN", - "State transition S_STARTING -> S_PENDING", - ], badnews_ignore = common_ignore) - - based = Process(self, "pacemaker-based", pats = [ - "State transition .* S_RECOVERY", - "Lost connection to the CIB manager", - "Connection to the CIB manager terminated", - r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", - "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", - r"pacemaker-controld.*: Could not recover from internal error", - # these status numbers are likely wrong now - r"pacemaker-controld.*exited with status 2", - r"attrd.*exited with status 1", - ], badnews_ignore = common_ignore) - - execd = Process(self, "pacemaker-execd", pats = [ - "State transition .* S_RECOVERY", - "LRM Connection failed", - "pacemaker-controld.*I_ERROR.*lrm_connection_destroy", - "State transition S_STARTING -> S_PENDING", - r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", - r"pacemaker-controld.*: Could not recover from internal error", - # this status number is likely wrong now - r"pacemaker-controld.*exited with status 2", - ], badnews_ignore = common_ignore) - - controld = Process(self, "pacemaker-controld", - pats = [ -# "WARN: determine_online_status: Node .* is unclean", -# "Scheduling node .* for fencing", -# "Executing .* fencing operation", -# "tengine_stonith_callback: .*result=0", - "State transition .* S_IDLE", - "State transition S_STARTING -> S_PENDING", - ], badnews_ignore = common_ignore) - - schedulerd = Process(self, "pacemaker-schedulerd", pats = [ - "State transition .* S_RECOVERY", - r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", - r"pacemaker-controld.*: Could not recover from internal error", - r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed", - "pacemaker-controld.*I_ERROR.*save_cib_contents", - # this status number is likely wrong now - r"pacemaker-controld.*exited with status 2", - ], badnews_ignore = common_ignore, dc_only=True) - - if self.Env["DoFencing"]: - complist.append(Process(self, "stoniths", dc_pats = [ - r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed", - "Attempting connection to fencing daemon", - ], badnews_ignore = stonith_ignore)) - - ccm.pats.extend([ - # these status numbers are likely wrong now - r"attrd.*exited with status 1", - r"pacemaker-(based|controld).*exited with status 2", - ]) - based.pats.extend([ - # these status numbers are likely wrong now - r"attrd.*exited with status 1", - r"pacemaker-controld.*exited with status 2", - ]) - execd.pats.extend([ - # these status numbers are likely wrong now - r"pacemaker-controld.*exited with status 2", - ]) - - complist.append(ccm) - complist.append(based) - complist.append(execd) - complist.append(controld) - complist.append(schedulerd) - - return complist - - def StandbyStatus(self, node): - (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) - if not out: - return "off" - out = out[0].strip() - self.debug("Standby result: "+out) - return out - - # status == "on" : Enter Standby mode - # status == "off": Enter Active mode - def SetStandbyMode(self, node, status): - current_status = self.StandbyStatus(node) - cmd = self.templates["StandbyCmd"] % (node, status) - self.rsh(node, cmd) - return True - - def AddDummyRsc(self, node, rid): - rsc_xml = """ ' - - - - - '""" % (rid, rid) - constraint_xml = """ ' - - ' - """ % (rid, node, node, rid) - - self.rsh(node, self.templates['CibAddXml'] % (rsc_xml)) - self.rsh(node, self.templates['CibAddXml'] % (constraint_xml)) - - def RemoveDummyRsc(self, node, rid): - constraint = "\"//rsc_location[@rsc='%s']\"" % (rid) - rsc = "\"//primitive[@id='%s']\"" % (rid) - - self.rsh(node, self.templates['CibDelXpath'] % constraint) - self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/cts/lab/Makefile.am b/cts/lab/Makefile.am index 0f4f5721fd..88c771a45a 100644 --- a/cts/lab/Makefile.am +++ b/cts/lab/Makefile.am @@ -1,26 +1,25 @@ # # Copyright 2001-2023 the Pacemaker project contributors # # The version control history for this file may have further details. # # This source code is licensed under the GNU General Public License version 2 # or later (GPLv2+) WITHOUT ANY WARRANTY. # MAINTAINERCLEANFILES = Makefile.in noinst_SCRIPTS = cluster_test \ OCFIPraTest.py # Commands intended to be run only via other commands halibdir = $(CRM_DAEMON_DIR) dist_halib_SCRIPTS = cts-log-watcher ctslibdir = $(pythondir)/cts ctslib_PYTHON = __init__.py \ - ClusterManager.py \ CM_corosync.py ctsdir = $(datadir)/$(PACKAGE)/tests/cts cts_SCRIPTS = CTSlab.py \ cts diff --git a/python/pacemaker/_cts/Makefile.am b/python/pacemaker/_cts/Makefile.am index 0a94f75bb7..f20726874a 100644 --- a/python/pacemaker/_cts/Makefile.am +++ b/python/pacemaker/_cts/Makefile.am @@ -1,33 +1,34 @@ # # Copyright 2023 the Pacemaker project contributors # # The version control history for this file may have further details. # # This source code is licensed under the GNU General Public License version 2 # or later (GPLv2+) WITHOUT ANY WARRANTY. # MAINTAINERCLEANFILES = Makefile.in pkgpythondir = $(pythondir)/$(PACKAGE)/_cts pkgpython_PYTHON = CTS.py \ __init__.py \ audits.py \ cib.py \ cibxml.py \ + clustermanager.py \ corosync.py \ environment.py \ errors.py \ input.py \ logging.py \ network.py \ patterns.py \ process.py \ remote.py \ scenarios.py \ test.py \ timer.py \ watcher.py SUBDIRS = tests diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py index e814f81906..4584867e01 100644 --- a/python/pacemaker/_cts/audits.py +++ b/python/pacemaker/_cts/audits.py @@ -1,1029 +1,1029 @@ """ Auditing classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time import uuid from pacemaker.buildoptions import BuildOptions from pacemaker._cts.input import should_continue from pacemaker._cts.watcher import LogKind, LogWatcher class ClusterAudit: """ The base class for various kinds of auditors. Specific audit implementations should be built on top of this one. Audits can do all kinds of checks on the system. The basic interface for callers is the `__call__` method, which returns True if the audit passes and False if it fails. """ def __init__(self, cm): """ Create a new ClusterAudit instance Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.name = None def __call__(self): raise NotImplementedError def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. This method must be implemented by all subclasses. """ raise NotImplementedError def log(self, args): """ Log a message """ self._cm.log("audit: %s" % args) def debug(self, args): """ Log a debug message """ self._cm.debug("audit: %s" % args) class LogAudit(ClusterAudit): """ Audit each cluster node to verify that some logging system is usable. This is done by logging a unique test message and then verifying that we can read back that test message using logging tools. """ def __init__(self, cm): """ Create a new LogAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "LogAudit" def _restart_cluster_logging(self, nodes=None): """ Restart logging on the given nodes, or all if none are given """ if not nodes: - nodes = self._cm.Env["nodes"] + nodes = self._cm.env["nodes"] self._cm.debug("Restarting logging on: %r" % nodes) for node in nodes: - if self._cm.Env["have_systemd"]: + if self._cm.env["have_systemd"]: (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket") if rc != 0: self._cm.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service") if rc != 0: self._cm.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) - (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.Env["syslogd"]) + (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"]) if rc != 0: - self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.Env["syslogd"], node)) + self._cm.log ("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node)) def _create_watcher(self, patterns, kind): """ Create a new LogWatcher instance for the given patterns """ - watch = LogWatcher(self._cm.Env["LogFileName"], patterns, - self._cm.Env["nodes"], kind, "LogAudit", 5, + watch = LogWatcher(self._cm.env["LogFileName"], patterns, + self._cm.env["nodes"], kind, "LogAudit", 5, silent=True) watch.set_watch() return watch def _test_logging(self): """ Perform the log audit """ patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} - for node in self._cm.Env["nodes"]: + for node in self._cm.env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) - watch_pref = self._cm.Env["LogWatcher"] + watch_pref = self._cm.env["LogWatcher"] if watch_pref == LogKind.ANY: kinds = [ LogKind.FILE ] - if self._cm.Env["have_systemd"]: + if self._cm.env["have_systemd"]: kinds += [ LogKind.JOURNAL ] kinds += [ LogKind.REMOTE_FILE ] for k in kinds: watch[k] = self._create_watcher(patterns, k) self._cm.log("Logging test message with identifier %s" % suffix) else: watch[watch_pref] = self._create_watcher(patterns, watch_pref) - for node in self._cm.Env["nodes"]: - cmd = "logger -p %s.info %s %s %s" % (self._cm.Env["SyslogFacility"], prefix, node, suffix) + for node in self._cm.env["nodes"]: + cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix) (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0) if rc != 0: self._cm.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in list(watch.keys()): w = watch[k] if watch_pref == LogKind.ANY: self._cm.log("Checking for test message in %s logs" % k) w.look_for_all(silent=True) if w.unmatched: for regex in w.unmatched: self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind)) else: if watch_pref == LogKind.ANY: self._cm.log("Found test message in %s logs" % k) - self._cm.Env["LogWatcher"] = k + self._cm.env["LogWatcher"] = k return 1 return False def __call__(self): max_attempts = 3 attempt = 0 - self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) + self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) while attempt <= max_attempts and not self._test_logging(): attempt += 1 self._restart_cluster_logging() time.sleep(60*attempt) if attempt > max_attempts: self._cm.log("ERROR: Cluster logging unrecoverable.") return False return True def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ - if self._cm.Env["LogAuditDisabled"]: + if self._cm.env["LogAuditDisabled"]: return False return True class DiskAudit(ClusterAudit): """ Audit disk usage on cluster nodes to verify that there is enough free space left on whichever mounted file system holds the logs. Warn on: less than 100 MB or 10% of free space Error on: less than 10 MB or 5% of free space """ def __init__(self, cm): """ Create a new DiskAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "DiskspaceAudit" def __call__(self): result = True # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR - self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) - for node in self._cm.Env["nodes"]: + self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) + for node in self._cm.env["nodes"]: (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1) if not dfout: self._cm.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) continue dfout = dfout[0].strip() try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = False - if not should_continue(self._cm.Env): + if not should_continue(self._cm.env): raise ValueError("Disk full on %s" % node) elif remaining_mb < 100 or used_percent > 90: self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ return True class FileAudit(ClusterAudit): """ Audit the filesystem looking for various failure conditions: * The presence of core dumps from corosync or Pacemaker daemons * Stale IPC files """ def __init__(self, cm): """ Create a new FileAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.known = [] self.name = "FileAudit" def __call__(self): result = True - self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) - for node in self._cm.Env["nodes"]: + self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) + for node in self._cm.env["nodes"]: (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) for line in lsout: line = line.strip() if line not in self.known: result = False self.known.append(line) self._cm.log("Warning: Corosync core file on %s: %s" % (node, line)) - if self._cm.ShouldBeStatus.get(node) == "down": + if self._cm.expected_status.get(node) == "down": clean = False (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) for line in lsout: result = False clean = True self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) for line in lsout: self._cm.debug("ps[%s]: %s" % (node, line)) self._cm.rsh(node, "rm -rf /dev/shm/qb-*") else: self._cm.debug("Skipping %s" % node) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ return True class AuditResource: """ A base class for storing information about a cluster resource """ def __init__(self, cm, line): """ Create a new AuditResource instance Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single resource """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None @property def unique(self): """ Is this resource unique? """ return self.flags & 0x20 @property def orphan(self): """ Is this resource an orphan? """ return self.flags & 0x01 @property def managed(self): """ Is this resource managed by the cluster? """ return self.flags & 0x02 class AuditConstraint: """ A base class for storing information about a cluster constraint """ def __init__(self, cm, line): """ Create a new AuditConstraint instance Arguments: cm -- A ClusterManager instance line -- One line of output from `crm_resource` describing a single constraint """ # pylint: disable=invalid-name fields = line.split() self._cm = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): """ Audit primitive resources to verify a variety of conditions, including that they are active and managed only when expected; they are active on the expected clusted node; and that they are not orphaned. """ def __init__(self, cm): """ Create a new PrimitiveAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PrimitiveAudit" self._active_nodes = [] self._constraints = [] self._inactive_nodes = [] self._resources = [] self._target = None def _audit_resource(self, resource, quorum): """ Perform the audit of a single resource """ rc = True - active = self._cm.ResourceLocation(resource.id) + active = self._cm.resource_location(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %r" % (resource.id, active)) elif resource.needs_quorum == 1: self._cm.log("Resource %s active without quorum: %r" % (resource.id, active)) rc = False elif not resource.managed: self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active)) elif not resource.unique: # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %r" % (resource.id, active)) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active)) rc = False elif resource.orphan: self.debug("Resource %s is an inactive orphan" % resource.id) elif not self._inactive_nodes: self._cm.log("WARN: Resource %s not served anywhere" % resource.id) rc = False - elif self._cm.Env["warn-inactive"]: + elif self._cm.env["warn-inactive"]: if quorum or not resource.needs_quorum: self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) else: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %r)" % (resource.id, self._inactive_nodes)) return rc def _setup(self): """ Verify cluster nodes are active, and collect resource and colocation information used for performing the audit. """ - for node in self._cm.Env["nodes"]: - if self._cm.ShouldBeStatus[node] == "up": + for node in self._cm.env["nodes"]: + if self._cm.expected_status[node] == "up": self._active_nodes.append(node) else: self._inactive_nodes.append(node) - for node in self._cm.Env["nodes"]: - if self._target is None and self._cm.ShouldBeStatus[node] == "up": + for node in self._cm.env["nodes"]: + if self._target is None and self._cm.expected_status[node] == "up": self._target = node if not self._target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name) return False (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): self._resources.append(AuditResource(self._cm, line)) elif re.search("^Constraint", line): self._constraints.append(AuditConstraint(self._cm, line)) else: self._cm.log("Unknown entry: %s" % line) return True def __call__(self): result = True if not self._setup(): return result - quorum = self._cm.HasQuorum(None) + quorum = self._cm.has_quorum(None) for resource in self._resources: if resource.type == "primitive" and not self._audit_resource(resource, quorum): result = False return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class GroupAudit(PrimitiveAudit): """ Audit group resources to verify that each of its child primitive resources is active on the expected cluster node. """ def __init__(self, cm): """ Create a new GroupAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "GroupAudit" def __call__(self): result = True if not self._setup(): return result for group in self._resources: if group.type != "group": continue first_match = True group_location = None for child in self._resources: if child.parent != group.id: continue - nodes = self._cm.ResourceLocation(child.id) + nodes = self._cm.resource_location(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = False if len(nodes) > 1: result = False self._cm.log("Child %s of %s is active more than once: %r" % (child.id, group.id, nodes)) elif not nodes: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: result = False self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return result class CloneAudit(PrimitiveAudit): """ Audit clone resources. NOTE: Currently, this class does not perform any actual audit functions. """ def __init__(self, cm): """ Create a new CloneAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "CloneAudit" def __call__(self): result = True if not self._setup(): return result for clone in self._resources: if clone.type != "clone": continue for child in self._resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return result class ColocationAudit(PrimitiveAudit): """ Audit cluster resources to verify that those that should be colocated with each other actually are. """ def __init__(self, cm): """ Create a new ColocationAudit instance Arguments: cm -- A ClusterManager instance """ PrimitiveAudit.__init__(self, cm) self.name = "ColocationAudit" def _crm_location(self, resource): """ Return a list of cluster nodes where a given resource is running """ (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): result = True if not self._setup(): return result for coloc in self._constraints: if coloc.type != "rsc_colocation": continue source = self._crm_location(coloc.rsc) target = self._crm_location(coloc.target) if not source: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: result = False self._cm.log("Colocation audit (%s): %s running on %s (not in %r)" % (coloc.id, coloc.rsc, node, target)) else: self.debug("Colocation audit (%s): %s running on %s (in %r)" % (coloc.id, coloc.rsc, node, target)) return result class ControllerStateAudit(ClusterAudit): """ Audit cluster nodes to verify that those we expect to be active are active, and those that are expected to be inactive are inactive. """ def __init__(self, cm): """ Create a new ControllerStateAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "ControllerStateAudit" def __call__(self): result = True up_are_down = 0 down_are_up = 0 unstable_list = [] - for node in self._cm.Env["nodes"]: - should_be = self._cm.ShouldBeStatus[node] - rc = self._cm.test_node_CM(node) + for node in self._cm.env["nodes"]: + should_be = self._cm.expected_status[node] + rc = self._cm.test_node_cm(node) if rc > 0: if should_be == "down": down_are_up += 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down += 1 if len(unstable_list) > 0: result = False self._cm.log("Cluster is not stable: %d (of %d): %r" % (len(unstable_list), self._cm.upcount(), unstable_list)) if up_are_down > 0: result = False self._cm.log("%d (of %d) nodes expected to be up were down." - % (up_are_down, len(self._cm.Env["nodes"]))) + % (up_are_down, len(self._cm.env["nodes"]))) if down_are_up > 0: result = False self._cm.log("%d (of %d) nodes expected to be down were up." - % (down_are_up, len(self._cm.Env["nodes"]))) + % (down_are_up, len(self._cm.env["nodes"]))) return result def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class CIBAudit(ClusterAudit): """ Audit the CIB by verifying that it is identical across cluster nodes """ def __init__(self, cm): """ Create a new CIBAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "CibAudit" def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: self.debug("\tNo partitions to audit") return result for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) if self._audit_cib_contents(partition) == 0: result = False return result def _audit_cib_contents(self, hostlist): """ Perform the CIB audit on the given hosts """ passed = True node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self._store_remote_cib(node, node0) if node_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node) passed = False elif node0 is None: node0 = node node0_xml = node_xml elif node0_xml is None: self._cm.log("Could not perform audit: No configuration from %s" % node0) passed = False else: (rc, result) = self._cm.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) if rc != 0: self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = False for line in result: if not re.search("", line): passed = False self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) return passed def _store_remote_cib(self, node, target): """ Store a copy of the given node's CIB on the given target node. If no target is given, store the CIB on the given node. """ filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1) if rc != 0: self._cm.log("Could not retrieve configuration") return None self._cm.rsh("localhost", "rm -f %s" % filename) for line in lines: self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self._cm.log("Could not store configuration") return None return filename def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False class PartitionAudit(ClusterAudit): """ Audit each partition in a cluster to verify a variety of conditions: * The number of partitions and the nodes in each is as expected * Each node is active when it should be active and inactive when it should be inactive * The status and epoch of each node is as expected * A partition has quorum * A partition has a DC when expected """ def __init__(self, cm): """ Create a new PartitionAudit instance Arguments: cm -- A ClusterManager instance """ ClusterAudit.__init__(self, cm) self.name = "PartitionAudit" self._node_epoch = {} self._node_state = {} self._node_quorum = {} def __call__(self): result = True ccm_partitions = self._cm.find_partitions() if not ccm_partitions: return result self._cm.cluster_stable(double_check=True) if len(ccm_partitions) != self._cm.partitions_expected: self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) result = False for partition in ccm_partitions: self._cm.log("\t %s" % partition) for partition in ccm_partitions: if self._audit_partition(partition) == 0: result = False return result def _trim_string(self, avalue): """ Remove the last character from a multi-character string """ if not avalue: return None if len(avalue) > 1: return avalue[:-1] return avalue def _trim2int(self, avalue): """ Remove the last character from a multi-character string and convert the result to an int. """ trimmed = self._trim_string(avalue) if trimmed: return int(trimmed) return None def _audit_partition(self, partition): """ Perform the audit of a single partition """ passed = True dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % partition) for node in node_list: - if self._cm.ShouldBeStatus[node] != "up": + if self._cm.expected_status[node] != "up": self._cm.log("Warn: Node %s appeared out of nowhere" % node) - self._cm.ShouldBeStatus[node] = "up" + self._cm.expected_status[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1) self._node_state[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1) self._node_epoch[node] = out[0].strip() (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1) self._node_quorum[node] = out[0].strip() self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node])) self._node_state[node] = self._trim_string(self._node_state[node]) self._node_epoch[node] = self._trim2int(self._node_epoch[node]) self._node_quorum[node] = self._trim_string(self._node_quorum[node]) if not self._node_epoch[node]: self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node) - self._cm.ShouldBeStatus[node] = "down" + self._cm.expected_status[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch: lowest_epoch = self._node_epoch[node] if not lowest_epoch: self._cm.log("Lowest epoch not determined in %s" % partition) passed = False for node in node_list: - if self._cm.ShouldBeStatus[node] != "up": + if self._cm.expected_status[node] != "up": continue if self._cm.is_node_dc(node, self._node_state[node]): dc_found.append(node) if self._node_epoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self._node_epoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self._cm.log("DC %s is not the oldest node (%d vs. %d)" % (node, self._node_epoch[node], lowest_epoch)) passed = False if not dc_found: self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self._cm.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = False if not passed: for node in node_list: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self._cm.log("epoch %s : %s" % (self._node_epoch[node], self._node_state[node])) return passed def is_applicable(self): """ Return True if this audit is applicable in the current test configuration. """ # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self._cm["Name"] == "crm-corosync": # return True return False # pylint: disable=invalid-name def audit_list(cm): """ Return a list of instances of applicable audits that can be performed for the given ClusterManager. """ result = [] for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit, PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit, ColocationAudit, CIBAudit]: a = auditclass(cm) if a.is_applicable(): result.append(a) return result diff --git a/python/pacemaker/_cts/cib.py b/python/pacemaker/_cts/cib.py index f272e4707c..63295ac0c2 100644 --- a/python/pacemaker/_cts/cib.py +++ b/python/pacemaker/_cts/cib.py @@ -1,424 +1,424 @@ """ CIB generator for Pacemaker's Cluster Test Suite (CTS) """ __all__ = ["ConfigFactory"] __copyright__ = "Copyright 2008-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import warnings import tempfile from pacemaker.buildoptions import BuildOptions from pacemaker._cts.cibxml import Alerts, Clone, Expression, FencingTopology, Group, Nodes, OpDefaults, Option, Resource, Rule from pacemaker._cts.network import next_ip class CIB: """ A class for generating, representing, and installing a CIB file onto cluster nodes """ def __init__(self, cm, version, factory, tmpfile=None): """ Create a new CIB instance Arguments: cm -- A ClusterManager instance version -- The schema syntax version factory -- A ConfigFactory instance tmpfile -- Where to store the CIB, or None to use a new tempfile """ # pylint: disable=invalid-name self._cib = None self._cm = cm self._counter = 1 self._factory = factory self._num_nodes = 0 self.version = version if not tmpfile: warnings.filterwarnings("ignore") # pylint: disable=consider-using-with f = tempfile.NamedTemporaryFile(delete=True) f.close() tmpfile = f.name warnings.resetwarnings() self._factory.tmpfile = tmpfile def _show(self): """ Query a cluster node for its generated CIB; log and return the result """ output = "" (_, result) = self._factory.rsh(self._factory.target, "HOME=/root CIB_file=%s cibadmin -Ql" % self._factory.tmpfile, verbose=1) for line in result: output += line self._factory.debug("Generated Config: %s" % line) return output def new_ip(self, name=None): """ Generate an IP resource for the next available IP address, optionally specifying the resource's name. """ - if self._cm.Env["IPagent"] == "IPaddr2": - ip = next_ip(self._cm.Env["IPBase"]) + if self._cm.env["IPagent"] == "IPaddr2": + ip = next_ip(self._cm.env["IPBase"]) if not name: if ":" in ip: (_, _, suffix) = ip.rpartition(":") name = "r%s" % suffix else: name = "r%s" % ip - r = Resource(self._factory, name, self._cm.Env["IPagent"], "ocf") + r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf") r["ip"] = ip if ":" in ip: r["cidr_netmask"] = "64" r["nic"] = "eth0" else: r["cidr_netmask"] = "32" else: if not name: - name = "r%s%d" % (self._cm.Env["IPagent"], self._counter) + name = "r%s%d" % (self._cm.env["IPagent"], self._counter) self._counter += 1 - r = Resource(self._factory, name, self._cm.Env["IPagent"], "ocf") + r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf") r.add_op("monitor", "5s") return r def get_node_id(self, node_name): """ Check the cluster configuration for the node ID for the given node_name """ # We can't account for every possible configuration, # so we only return a node ID if: # * The node is specified in /etc/corosync/corosync.conf # with "ring0_addr:" equal to node_name and "nodeid:" # explicitly specified. # In all other cases, we return 0. node_id = 0 # awkward command: use } as record separator # so each corosync.conf "object" is one record; # match the "node {" record that has "ring0_addr: node_name"; # then print the substring of that record after "nodeid:" (rc, output) = self._factory.rsh(self._factory.target, r"""awk -v RS="}" """ r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/""" r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s""" % (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1) if rc == 0 and len(output) == 1: try: node_id = int(output[0]) except ValueError: node_id = 0 return node_id def install(self, target): """ Generate a CIB file and install it to the given cluster node """ old = self._factory.tmpfile # Force a rebuild self._cib = None self._factory.tmpfile = "%s/cib.xml" % BuildOptions.CIB_DIR self.contents(target) self._factory.rsh(self._factory.target, "chown %s %s" % (BuildOptions.DAEMON_USER, self._factory.tmpfile)) self._factory.tmpfile = old def contents(self, target): """ Generate a complete CIB file """ # fencing resource if self._cib: return self._cib if target: self._factory.target = target self._factory.rsh(self._factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self._factory.tmpfile)) - self._num_nodes = len(self._cm.Env["nodes"]) + self._num_nodes = len(self._cm.env["nodes"]) no_quorum = "stop" if self._num_nodes < 3: no_quorum = "ignore" self._factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self._num_nodes) # We don't need a nodes section unless we add attributes stn = None # Fencing resource # Define first so that the shell doesn't reject every update - if self._cm.Env["DoFencing"]: + if self._cm.env["DoFencing"]: # Define the "real" fencing device - st = Resource(self._factory, "Fencing", self._cm.Env["stonith-type"], "stonith") + st = Resource(self._factory, "Fencing", self._cm.env["stonith-type"], "stonith") # Set a threshold for unreliable stonith devices such as the vmware one st.add_meta("migration-threshold", "5") st.add_op("monitor", "120s", timeout="120s") st.add_op("stop", "0", timeout="60s") st.add_op("start", "0", timeout="60s") # For remote node tests, a cluster node is stopped and brought back up # as a remote node with the name "remote-OLDNAME". To allow fencing # devices to fence these nodes, create a list of all possible node names. - all_node_names = [ prefix+n for n in self._cm.Env["nodes"] for prefix in ('', 'remote-') ] + all_node_names = [ prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-') ] # Add all parameters specified by user - entries = self._cm.Env["stonith-params"].split(',') + entries = self._cm.env["stonith-params"].split(',') for entry in entries: try: (name, value) = entry.split('=', 1) except ValueError: print("Warning: skipping invalid fencing parameter: %s" % entry) continue # Allow user to specify "all" as the node list, and expand it here if name in [ "hostlist", "pcmk_host_list" ] and value == "all": value = ' '.join(all_node_names) st[name] = value st.commit() # Test advanced fencing logic stf_nodes = [] stt_nodes = [] attr_nodes = {} # Create the levels stl = FencingTopology(self._factory) - for node in self._cm.Env["nodes"]: + for node in self._cm.env["nodes"]: # Remote node tests will rename the node remote_node = "remote-%s" % node # Randomly assign node to a fencing method - ftype = self._cm.Env.random_gen.choice(["levels-and", "levels-or ", "broadcast "]) + ftype = self._cm.env.random_gen.choice(["levels-and", "levels-or ", "broadcast "]) # For levels-and, randomly choose targeting by node name or attribute by = "" if ftype == "levels-and": node_id = self.get_node_id(node) - if node_id == 0 or self._cm.Env.random_gen.choice([True, False]): + if node_id == 0 or self._cm.env.random_gen.choice([True, False]): by = " (by name)" else: attr_nodes[node] = node_id by = " (by attribute)" self._cm.log(" - Using %s fencing for node: %s%s" % (ftype, node, by)) if ftype == "levels-and": # If targeting by name, add a topology level for this node if node not in attr_nodes: stl.level(1, node, "FencingPass,Fencing") # Always target remote nodes by name, otherwise we would need to add # an attribute to the remote node only during remote tests (we don't # want nonexistent remote nodes showing up in the non-remote tests). # That complexity is not worth the effort. stl.level(1, remote_node, "FencingPass,Fencing") # Add the node (and its remote equivalent) to the list of levels-and nodes. stt_nodes.extend([node, remote_node]) elif ftype == "levels-or ": for n in [ node, remote_node ]: stl.level(1, n, "FencingFail") stl.level(2, n, "Fencing") stf_nodes.extend([node, remote_node]) # If any levels-and nodes were targeted by attribute, # create the attributes and a level for the attribute. if attr_nodes: stn = Nodes(self._factory) for (node_name, node_id) in attr_nodes.items(): stn.add_node(node_name, node_id, { "cts-fencing" : "levels-and" }) stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and") # Create a Dummy agent that always passes for levels-and if stt_nodes: stt = Resource(self._factory, "FencingPass", "fence_dummy", "stonith") stt["pcmk_host_list"] = " ".join(stt_nodes) # Wait this many seconds before doing anything, handy for letting disks get flushed too stt["random_sleep_range"] = "30" stt["mode"] = "pass" stt.commit() # Create a Dummy agent that always fails for levels-or if stf_nodes: stf = Resource(self._factory, "FencingFail", "fence_dummy", "stonith") stf["pcmk_host_list"] = " ".join(stf_nodes) # Wait this many seconds before doing anything, handy for letting disks get flushed too stf["random_sleep_range"] = "30" stf["mode"] = "fail" stf.commit() # Now commit the levels themselves stl.commit() o = Option(self._factory) - o["stonith-enabled"] = self._cm.Env["DoFencing"] + o["stonith-enabled"] = self._cm.env["DoFencing"] o["start-failure-is-fatal"] = "false" o["pe-input-series-max"] = "5000" o["shutdown-escalation"] = "5min" o["batch-limit"] = "10" o["dc-deadtime"] = "5s" o["no-quorum-policy"] = no_quorum o.commit() o = OpDefaults(self._factory) o["timeout"] = "90s" o.commit() # Commit the nodes section if we defined one if stn is not None: stn.commit() # Add an alerts section if possible - if self._factory.rsh.exists_on_all(self._cm.Env["notification-agent"], self._cm.Env["nodes"]): + if self._factory.rsh.exists_on_all(self._cm.env["notification-agent"], self._cm.env["nodes"]): alerts = Alerts(self._factory) - alerts.add_alert(self._cm.Env["notification-agent"], - self._cm.Env["notification-recipient"]) + alerts.add_alert(self._cm.env["notification-agent"], + self._cm.env["notification-recipient"]) alerts.commit() # Add resources? - if self._cm.Env["CIBResource"]: + if self._cm.env["CIBResource"]: self.add_resources() # generate cib self._cib = self._show() if self._factory.tmpfile != "%s/cib.xml" % BuildOptions.CIB_DIR: self._factory.rsh(self._factory.target, "rm -f %s" % self._factory.tmpfile) return self._cib def add_resources(self): """ Add various resources and their constraints to the CIB """ # Per-node resources - for node in self._cm.Env["nodes"]: + for node in self._cm.env["nodes"]: name = "rsc_%s" % node r = self.new_ip(name) r.prefer(node, "100") r.commit() # Migrator # Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach m = Resource(self._factory, "migrator","Dummy", "ocf", "pacemaker") m["passwd"] = "whatever" m.add_meta("resource-stickiness","1") m.add_meta("allow-migrate", "1") m.add_op("monitor", "P10S") m.commit() # Ping the test exerciser p = Resource(self._factory, "ping-1","ping", "ocf", "pacemaker") p.add_op("monitor", "60s") - p["host_list"] = self._cm.Env["cts-exerciser"] + p["host_list"] = self._cm.env["cts-exerciser"] p["name"] = "connected" p["debug"] = "true" c = Clone(self._factory, "Connectivity", p) c["globally-unique"] = "false" c.commit() # promotable clone resource s = Resource(self._factory, "stateful-1", "Stateful", "ocf", "pacemaker") s.add_op("monitor", "15s", timeout="60s") s.add_op("monitor", "16s", timeout="60s", role="Promoted") ms = Clone(self._factory, "promotable-1", s) ms["promotable"] = "true" ms["clone-max"] = self._num_nodes ms["clone-node-max"] = 1 ms["promoted-max"] = 1 ms["promoted-node-max"] = 1 # Require connectivity to run the promotable clone r = Rule(self._factory, "connected", "-INFINITY", op="or") r.add_child(Expression(self._factory, "m1-connected-1", "connected", "lt", "1")) r.add_child(Expression(self._factory, "m1-connected-2", "connected", "not_defined", None)) ms.prefer("connected", rule=r) ms.commit() # Group Resource g = Group(self._factory, "group-1") g.add_child(self.new_ip()) - if self._cm.Env["have_systemd"]: + if self._cm.env["have_systemd"]: sysd = Resource(self._factory, "petulant", "pacemaker-cts-dummyd@10", "service") sysd.add_op("monitor", "P10S") g.add_child(sysd) else: g.add_child(self.new_ip()) g.add_child(self.new_ip()) # Make group depend on the promotable clone g.after("promotable-1", first="promote", then="start") g.colocate("promotable-1", "INFINITY", withrole="Promoted") g.commit() # LSB resource lsb = Resource(self._factory, "lsb-dummy", "LSBDummy", "lsb") lsb.add_op("monitor", "5s") # LSB with group lsb.after("group-1") lsb.colocate("group-1") lsb.commit() class ConfigFactory: """ Singleton to generate a CIB file for the environment's schema version """ def __init__(self, cm): """ Create a new ConfigFactory instance Arguments: cm -- A ClusterManager instance """ # pylint: disable=invalid-name self._cm = cm self.rsh = self._cm.rsh - if not self._cm.Env["ListTests"]: - self.target = self._cm.Env["nodes"][0] + if not self._cm.env["ListTests"]: + self.target = self._cm.env["nodes"][0] self.tmpfile = None def log(self, args): """ Log a message """ self._cm.log("cib: %s" % args) def debug(self, args): """ Log a debug message """ self._cm.debug("cib: %s" % args) def create_config(self, name="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION): """ Return a CIB object for the given schema version """ return CIB(self._cm, name, self) diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py new file mode 100644 index 0000000000..9f4f39fa40 --- /dev/null +++ b/python/pacemaker/_cts/clustermanager.py @@ -0,0 +1,904 @@ +""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS) """ + +__all__ = ["ClusterManager"] +__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors. +Certain portions by Huang Zhen are copyright 2004 +International Business Machines. The version control history for this file +may have further details.""" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import os +import re +import time + +from collections import UserDict + +from pacemaker.buildoptions import BuildOptions +from pacemaker._cts.CTS import NodeStatus +from pacemaker._cts.audits import AuditResource +from pacemaker._cts.cib import ConfigFactory +from pacemaker._cts.environment import EnvFactory +from pacemaker._cts.logging import LogFactory +from pacemaker._cts.patterns import PatternSelector +from pacemaker._cts.remote import RemoteFactory +from pacemaker._cts.watcher import LogWatcher + +# pylint doesn't understand that self._rsh is callable (it stores the +# singleton instance of RemoteExec, as returned by the getInstance method +# of RemoteFactory). It's possible we could fix this with type annotations, +# but those were introduced with python 3.5 and we only support python 3.4. +# I think we could also fix this by getting rid of the getInstance methods, +# but that's a project for another day. For now, just disable the warning. +# pylint: disable=not-callable + +# ClusterManager has a lot of methods. +# pylint: disable=too-many-public-methods + +class ClusterManager(UserDict): + """ An abstract base class for managing the cluster. This class implements + high-level operations on the cluster and/or its cluster managers. + Actual cluster-specific management classes should be subclassed from this + one. + + Among other things, this class tracks the state every node is expected to + be in. + """ + + def _final_conditions(self): + """ Check all keys to make sure they have a non-None value """ + + for (key, val) in self._data.items(): + if val is None: + raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key) + + def __init__(self): + """ Create a new ClusterManager instance. This class can be treated + kind of like a dictionary due to the process of certain dict + functions like __getitem__ and __setitem__. This is because it + contains a lot of name/value pairs. However, it is not actually + a dictionary so do not rely on standard dictionary behavior. + """ + + # Eventually, ClusterManager should not be a UserDict subclass. Until + # that point... + # pylint: disable=super-init-not-called + self.__instance_errors_to_ignore = [] + + self._cib_installed = False + self._data = {} + self._logger = LogFactory() + + self.env = EnvFactory().getInstance() + self.expected_status = {} + self.name = self.env["Name"] + # pylint: disable=invalid-name + self.ns = NodeStatus(self.env) + self.our_node = os.uname()[1].lower() + self.partitions_expected = 1 + self.rsh = RemoteFactory().getInstance() + self.templates = PatternSelector(self.env["Name"]) + + self._final_conditions() + + self._cib_factory = ConfigFactory(self) + self._cib = self._cib_factory.create_config(self.env["Schema"]) + self._cib_sync = {} + + def __getitem__(self, key): + if key == "Name": + return self.name + + print("FIXME: Getting %s from %r" % (key, self)) + if key in self._data: + return self._data[key] + + return self.templates.get_patterns(key) + + def __setitem__(self, key, value): + print("FIXME: Setting %s=%s on %r" % (key, value, self)) + self._data[key] = value + + def clear_instance_errors_to_ignore(self): + """ Reset instance-specific errors to ignore on each iteration """ + + self.__instance_errors_to_ignore = [] + + @property + def instance_errors_to_ignore(self): + """ Return a list of known errors that should be ignored for a specific + test instance + """ + + return self.__instance_errors_to_ignore + + @property + def errors_to_ignore(self): + """ Return a list of known error messages that should be ignored """ + + return self.templates.get_patterns("BadNewsIgnore") + + def log(self, args): + """ Log a message """ + + self._logger.log(args) + + def debug(self, args): + """ Log a debug message """ + + self._logger.debug(args) + + def upcount(self): + """ How many nodes are up? """ + + count = 0 + + for node in self.env["nodes"]: + if self.expected_status[node] == "up": + count += 1 + + return count + + def install_support(self, command="install"): + """ Install or uninstall the CTS support files - various init scripts and data, + daemons, fencing agents, etc. + """ + + for node in self.env["nodes"]: + self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command)) + + def prepare_fencing_watcher(self): + """ Return a LogWatcher object that watches for fencing log messages """ + + # If we don't have quorum now but get it as a result of starting this node, + # then a bunch of nodes might get fenced + if self.has_quorum(None): + self.debug("Have quorum") + return None + + if not self.templates["Pat:Fencing_start"]: + print("No start pattern") + return None + + if not self.templates["Pat:Fencing_ok"]: + print("No ok pattern") + return None + + stonith = None + stonith_pats = [] + for peer in self.env["nodes"]: + if self.expected_status[peer] == "up": + continue + + stonith_pats.extend([ self.templates["Pat:Fencing_ok"] % peer, + self.templates["Pat:Fencing_start"] % peer ]) + + stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"], + self.env["LogWatcher"], "StartupFencing", 0) + stonith.set_watch() + return stonith + + def fencing_cleanup(self, node, stonith): + """ Wait for a previously fenced node to return to the cluster """ + + peer_list = [] + peer_state = {} + + self.debug("Looking for nodes that were fenced as a result of %s starting" % node) + + # If we just started a node, we may now have quorum (and permission to fence) + if not stonith: + self.debug("Nothing to do") + return peer_list + + q = self.has_quorum(None) + if not q and len(self.env["nodes"]) > 2: + # We didn't gain quorum - we shouldn't have shot anyone + self.debug("Quorum: %s Len: %d" % (q, len(self.env["nodes"]))) + return peer_list + + for n in self.env["nodes"]: + peer_state[n] = "unknown" + + # Now see if any states need to be updated + self.debug("looking for: %r" % stonith.regexes) + shot = stonith.look(0) + + while shot: + self.debug("Found: %r" % shot) + del stonith.regexes[stonith.whichmatch] + + # Extract node name + for n in self.env["nodes"]: + if re.search(self.templates["Pat:Fencing_ok"] % n, shot): + peer = n + peer_state[peer] = "complete" + self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer) + + elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): + # TODO: Correctly detect multiple fencing operations for the same host + peer = n + peer_state[peer] = "in-progress" + self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer) + + if not peer: + self._logger.log("ERROR: Unknown stonith match: %r" % shot) + + elif not peer in peer_list: + self.debug("Found peer: %s" % peer) + peer_list.append(peer) + + # Get the next one + shot = stonith.look(60) + + for peer in peer_list: + self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) + if self.env["at-boot"]: + self.expected_status[peer] = "up" + else: + self.expected_status[peer] = "down" + + if peer_state[peer] == "in-progress": + # Wait for any in-progress operations to complete + shot = stonith.look(60) + + while stonith.regexes and shot: + self.debug("Found: %r" % shot) + del stonith.regexes[stonith.whichmatch] + shot = stonith.look(60) + + # Now make sure the node is alive too + self.ns.wait_for_node(peer, self.env["DeadTime"]) + + # Poll until it comes up + if self.env["at-boot"]: + if not self.stat_cm(peer): + time.sleep(self.env["StartTime"]) + + if not self.stat_cm(peer): + self._logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) + return None + + return peer_list + + def start_cm(self, node, verbose=False): + """ Start up the cluster manager on a given node """ + + if verbose: + self._logger.log("Starting %s on node %s" % (self.templates["Name"], node)) + else: + self.debug("Starting %s on node %s" % (self.templates["Name"], node)) + + if not node in self.expected_status: + self.expected_status[node] = "down" + + if self.expected_status[node] != "down": + return True + + # Technically we should always be able to notice ourselves starting + patterns = [ self.templates["Pat:Local_started"] % node ] + + if self.upcount() == 0: + patterns.append(self.templates["Pat:DC_started"] % node) + else: + patterns.append(self.templates["Pat:NonDC_started"] % node) + + watch = LogWatcher(self.env["LogFileName"], patterns, self.env["nodes"], self.env["LogWatcher"], + "StartaCM", self.env["StartTime"] + 10) + + self.install_config(node) + + self.expected_status[node] = "any" + + if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): + self._logger.log ("%s was already started" % node) + return True + + stonith = self.prepare_fencing_watcher() + watch.set_watch() + + (rc, _) = self.rsh(node, self.templates["StartCmd"]) + if rc != 0: + self._logger.log ("Warn: Start command failed on node %s" % node) + self.fencing_cleanup(node, stonith) + return False + + self.expected_status[node] = "up" + watch_result = watch.look_for_all() + + if watch.unmatched: + for regex in watch.unmatched: + self._logger.log ("Warn: Startup pattern not found: %s" % regex) + + if watch_result and self.cluster_stable(self.env["DeadTime"]): + self.fencing_cleanup(node, stonith) + return True + + if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): + self.fencing_cleanup(node, stonith) + return True + + self._logger.log ("Warn: Start failed for node %s" % node) + return False + + def start_cm_async(self, node, verbose=False): + """ Start up the cluster manager on a given node without blocking """ + + if verbose: + self._logger.log("Starting %s on node %s" % (self["Name"], node)) + else: + self.debug("Starting %s on node %s" % (self["Name"], node)) + + self.install_config(node) + self.rsh(node, self.templates["StartCmd"], synchronous=False) + self.expected_status[node] = "up" + + def stop_cm(self, node, verbose=False, force=False): + """ Stop the cluster manager on a given node """ + + if verbose: + self._logger.log("Stopping %s on node %s" % (self["Name"], node)) + else: + self.debug("Stopping %s on node %s" % (self["Name"], node)) + + if self.expected_status[node] != "up" and not force: + return True + + (rc, _) = self.rsh(node, self.templates["StopCmd"]) + if rc == 0: + # Make sure we can continue even if corosync leaks + self.expected_status[node] = "down" + self.cluster_stable(self.env["DeadTime"]) + return True + + self._logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) + return False + + def stop_cm_async(self, node): + """ Stop the cluster manager on a given node without blocking """ + + self.debug("Stopping %s on node %s" % (self["Name"], node)) + + self.rsh(node, self.templates["StopCmd"], synchronous=False) + self.expected_status[node] = "down" + + def startall(self, nodelist=None, verbose=False, quick=False): + """ Start the cluster manager on every node in the cluster, or on every + node in nodelist if not None + """ + + if not nodelist: + nodelist = self.env["nodes"] + + for node in nodelist: + if self.expected_status[node] == "down": + self.ns.wait_for_all_nodes(nodelist, 300) + + if not quick: + # This is used for "basic sanity checks", so only start one node ... + return self.start_cm(nodelist[0], verbose=verbose) + + # Approximation of SimulStartList for --boot + watchpats = [ self.templates["Pat:DC_IDLE"] ] + for node in nodelist: + watchpats.extend([ self.templates["Pat:InfraUp"] % node, + self.templates["Pat:PacemakerUp"] % node, + self.templates["Pat:Local_started"] % node, + self.templates["Pat:They_up"] % (nodelist[0], node) ]) + + # Start all the nodes - at about the same time... + watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"], + self.env["LogWatcher"], "fast-start", self.env["DeadTime"] + 10) + watch.set_watch() + + if not self.start_cm(nodelist[0], verbose=verbose): + return False + + for node in nodelist: + self.start_cm_async(node, verbose=verbose) + + watch.look_for_all() + if watch.unmatched: + for regex in watch.unmatched: + self._logger.log ("Warn: Startup pattern not found: %s" % regex) + + if not self.cluster_stable(): + self._logger.log("Cluster did not stabilize") + return False + + return True + + def stopall(self, nodelist=None, verbose=False, force=False): + """ Stop the cluster manager on every node in the cluster, or on every + node in nodelist if not None + """ + + ret = True + + if not nodelist: + nodelist = self.env["nodes"] + + for node in self.env["nodes"]: + if self.expected_status[node] == "up" or force: + if not self.stop_cm(node, verbose=verbose, force=force): + ret = False + + return ret + + def statall(self, nodelist=None): + """ Return the status of the cluster manager on every node in the cluster, + or on every node in nodelist if not None + """ + + result = {} + + if not nodelist: + nodelist = self.env["nodes"] + + for node in nodelist: + if self.stat_cm(node): + result[node] = "up" + else: + result[node] = "down" + + return result + + def isolate_node(self, target, nodes=None): + """ Break communication between the target node and all other nodes in the + cluster, or nodes if not None + """ + + if not nodes: + nodes = self.env["nodes"] + + for node in nodes: + if node == target: + continue + + (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node) + if rc != 0: + self._logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) + return False + + self.debug("Communication cut between %s and %s" % (target, node)) + + return True + + def unisolate_node(self, target, nodes=None): + """ Re-establish communication between the target node and all other nodes + in the cluster, or nodes if not None + """ + + if not nodes: + nodes = self.env["nodes"] + + for node in nodes: + if node == target: + continue + + # Limit the amount of time we have asynchronous connectivity for + # Restore both sides as simultaneously as possible + self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False) + self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False) + self.debug("Communication restored between %s and %s" % (target, node)) + + def oprofile_start(self, node=None): + """ Start profiling on the given node, or all nodes in the cluster """ + + if not node: + for n in self.env["oprofile"]: + self.oprofile_start(n) + + elif node in self.env["oprofile"]: + self.debug("Enabling oprofile on %s" % node) + self.rsh(node, "opcontrol --init") + self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") + self.rsh(node, "opcontrol --start") + self.rsh(node, "opcontrol --reset") + + def oprofile_save(self, test, node=None): + """ Save profiling data and restart profiling on the given node, or all + nodes in the cluster if None + """ + + if not node: + for n in self.env["oprofile"]: + self.oprofile_save(test, n) + + elif node in self.env["oprofile"]: + self.rsh(node, "opcontrol --dump") + self.rsh(node, "opcontrol --save=cts.%d" % test) + # Read back with: opreport -l session:cts.0 image:/c* + self.oprofile_stop(node) + self.oprofile_start(node) + + def oprofile_stop(self, node=None): + """ Start profiling on the given node, or all nodes in the cluster. This + does not save profiling data, so call oprofile_save first if needed. + """ + + if not node: + for n in self.env["oprofile"]: + self.oprofile_stop(n) + + elif node in self.env["oprofile"]: + self.debug("Stopping oprofile on %s" % node) + self.rsh(node, "opcontrol --reset") + self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") + + def install_config(self, node): + """ Remove and re-install the CIB on the first node in the cluster """ + + if not self.ns.wait_for_node(node): + self.log("Node %s is not up." % node) + return + + if node in self._cib_sync or not self.env["ClobberCIB"]: + return + + self._cib_sync[node] = True + self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR) + + # Only install the CIB on the first node, all the other ones will pick it up from there + if self._cib_installed: + return + + self._cib_installed = True + if self.env["CIBfilename"] is None: + self.log("Installing Generated CIB on node %s" % node) + self._cib.install(node) + + else: + self.log("Installing CIB (%s) on node %s" % (self.env["CIBfilename"], node)) + + rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) + + if rc != 0: + raise ValueError("Can not scp file to %s %d" % (node, rc)) + + self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR)) + + def prepare(self): + """ Finish initialization by clearing out the expected status and recording + the current status of every node in the cluster + """ + + self.partitions_expected = 1 + for node in self.env["nodes"]: + self.expected_status[node] = "" + + if self.env["experimental-tests"]: + self.unisolate_node(node) + + self.stat_cm(node) + + def test_node_cm(self, node): + """ Check the status of a given node. Returns 0 if the node is + down, 1 if the node is up but unstable, and 2 if the node is + up and stable + """ + + watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)", + self.templates["Pat:NonDC_started"] % node, + self.templates["Pat:DC_started"] % node ] + + idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node], + self.env["LogWatcher"], "ClusterIdle") + idle_watch.set_watch() + + (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) + + if not out: + out = "" + else: + out = out[0].strip() + + self.debug("Node %s status: '%s'" % (node, out)) + + if out.find('ok') < 0: + if self.expected_status[node] == "up": + self.log("Node status for %s is %s but we think it should be %s" + % (node, "down", self.expected_status[node])) + + self.expected_status[node] = "down" + return 0 + + if self.expected_status[node] == "down": + self.log("Node status for %s is %s but we think it should be %s: %s" + % (node, "up", self.expected_status[node], out)) + + self.expected_status[node] = "up" + + # check the output first - because syslog-ng loses messages + if out.find('S_NOT_DC') != -1: + # Up and stable + return 2 + + if out.find('S_IDLE') != -1: + # Up and stable + return 2 + + # fall back to syslog-ng and wait + if not idle_watch.look(): + # just up + self.debug("Warn: Node %s is unstable: %s" % (node, out)) + return 1 + + # Up and stable + return 2 + + def stat_cm(self, node): + """ Report the status of the cluster manager on a given node """ + + return self.test_node_cm(node) > 0 + + # Being up and being stable is not the same question... + def node_stable(self, node): + """ Return whether or not the given node is stable """ + + if self.test_node_cm(node) == 2: + return True + + self.log("Warn: Node %s not stable" % node) + return False + + def partition_stable(self, nodes, timeout=None): + """ Return whether or not all nodes in the given partition are stable """ + + watchpats = [ "Current ping state: S_IDLE", + self.templates["Pat:DC_IDLE"] ] + + self.debug("Waiting for cluster stability...") + + if timeout is None: + timeout = self.env["DeadTime"] + + if len(nodes) < 3: + self.debug("Cluster is inactive") + return True + + idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(), + self.env["LogWatcher"], "ClusterStable", timeout) + idle_watch.set_watch() + + for node in nodes.split(): + # have each node dump its current state + self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) + + ret = idle_watch.look() + + while ret: + self.debug(ret) + + for node in nodes.split(): + if re.search(node, ret): + return True + + ret = idle_watch.look() + + self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout)) + return False + + def cluster_stable(self, timeout=None, double_check=False): + """ Return whether or not all nodes in the cluster are stable """ + + partitions = self.find_partitions() + + for partition in partitions: + if not self.partition_stable(partition, timeout): + return False + + if not double_check: + return True + + # Make sure we are really stable and that all resources, + # including those that depend on transient node attributes, + # are started if they were going to be + time.sleep(5) + for partition in partitions: + if not self.partition_stable(partition, timeout): + return False + + return True + + def is_node_dc(self, node, status_line=None): + """ Return whether or not the given node is the cluster DC by checking + the given status_line, or by querying the cluster if None + """ + + if not status_line: + (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) + + if out: + status_line = out[0].strip() + + if not status_line: + return False + + if status_line.find('S_IDLE') != -1: + return True + + if status_line.find('S_INTEGRATION') != -1: + return True + + if status_line.find('S_FINALIZE_JOIN') != -1: + return True + + if status_line.find('S_POLICY_ENGINE') != -1: + return True + + if status_line.find('S_TRANSITION_ENGINE') != -1: + return True + + return False + + def active_resources(self, node): + """ Return a list of primitive resources active on the given node """ + + (_, output) = self.rsh(node, "crm_resource -c", verbose=1) + resources = [] + for line in output: + if not re.search("^Resource", line): + continue + + tmp = AuditResource(self, line) + if tmp.type == "primitive" and tmp.host == node: + resources.append(tmp.id) + + return resources + + def resource_location(self, rid): + """ Return a list of nodes on which the given resource is running """ + + resource_nodes = [] + for node in self.env["nodes"]: + if self.expected_status[node] != "up": + continue + + cmd = self.templates["RscRunning"] % rid + (rc, lines) = self.rsh(node, cmd) + + if rc == 127: + self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) + for line in lines: + self.log("Output: %s " % line) + + elif rc == 0: + resource_nodes.append(node) + + return resource_nodes + + def find_partitions(self): + """ Return a list of all partitions in the cluster. Each element of the + list is itself a list of all active nodes in that partition. + """ + + ccm_partitions = [] + + for node in self.env["nodes"]: + if self.expected_status[node] != "up": + self.debug("Node %s is down... skipping" % node) + continue + + (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) + + if not out: + self.log("no partition details for %s" % node) + continue + + partition = out[0].strip() + + if len(partition) <= 2: + self.log("bad partition details for %s" % node) + continue + + nodes = partition.split() + nodes.sort() + partition = ' '.join(nodes) + + found = 0 + for a_partition in ccm_partitions: + if partition == a_partition: + found = 1 + + if found == 0: + self.debug("Adding partition from %s: %s" % (node, partition)) + ccm_partitions.append(partition) + else: + self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) + + self.debug("Found partitions: %r" % ccm_partitions) + return ccm_partitions + + def has_quorum(self, node_list): + """ Return whether or not the cluster has quorum """ + + # If we are auditing a partition, then one side will + # have quorum and the other not. + # So the caller needs to tell us which we are checking + # If no value for node_list is specified... assume all nodes + if not node_list: + node_list = self.env["nodes"] + + for node in node_list: + if self.expected_status[node] != "up": + continue + + (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) + quorum = quorum[0].strip() + + if quorum.find("1") != -1: + return True + + if quorum.find("0") != -1: + return False + + self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum)) + + return False + + @property + def components(self): + """ A list of all patterns that should be ignored for the cluster's + components. This must be provided by all subclasses. + """ + + raise NotImplementedError + + def in_standby_mode(self, node): + """ Return whether or not the node is in Standby """ + + (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) + + if not out: + return False + + out = out[0].strip() + self.debug("Standby result: %s" % out) + return out == "on" + + def set_standby_mode(self, node, status): + """ Set node to Standby if status is True, or Active if status is False. + Return whether the node is now in the requested status. + """ + + current_status = self.in_standby_mode(node) + + if current_status == status: + return True + + if status: + cmd = self.templates["StandbyCmd"] % (node, "on") + else: + cmd = self.templates["StandbyCmd"] % (node, "off") + + (rc, _) = self.rsh(node, cmd) + return rc == 0 + + def add_dummy_rsc(self, node, rid): + """ Add a dummy resource with the given ID to the given node """ + + rsc_xml = """ ' + + + + + '""" % (rid, rid) + constraint_xml = """ ' + + ' + """ % (rid, node, node, rid) + + self.rsh(node, self.templates['CibAddXml'] % rsc_xml) + self.rsh(node, self.templates['CibAddXml'] % constraint_xml) + + def remove_dummy_rsc(self, node, rid): + """ Remove the previously added dummy resource given by rid on the + given node + """ + + constraint = "\"//rsc_location[@rsc='%s']\"" % rid + rsc = "\"//primitive[@id='%s']\"" % rid + + self.rsh(node, self.templates['CibDelXpath'] % constraint) + self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py index b3eef1fcf0..6c3de3ab40 100644 --- a/python/pacemaker/_cts/scenarios.py +++ b/python/pacemaker/_cts/scenarios.py @@ -1,408 +1,408 @@ """ Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """ __all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence" ] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time from pacemaker._cts.audits import ClusterAudit from pacemaker._cts.input import should_continue from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.watcher import LogWatcher class ScenarioComponent: """ The base class for all scenario components. A scenario component is one single step in a scenario. Each component is basically just a setup and teardown method. """ def __init__(self, cm, env): """ Create a new ScenarioComponent instance Arguments: cm -- A ClusterManager instance env -- An Environment instance """ # pylint: disable=invalid-name self._cm = cm self._env = env def is_applicable(self): """ Return True if this component is applicable in the given Environment. This method must be provided by all subclasses. """ raise NotImplementedError def setup(self): """ Set up the component, returning True on success. This method must be provided by all subclasses. """ raise NotImplementedError def teardown(self): """ Tear down the given component. This method must be provided by all subclasses. """ raise NotImplementedError class Scenario: """ The base class for scenario. A scenario is an ordered list of ScenarioComponent objects. A scenario proceeds by setting up all its components in sequence, running a list of tests and audits, and then tearing down its components in reverse. """ def __init__(self, cm, components, audits, tests): """ Create a new Scenario instance Arguments: cm -- A ClusterManager instance components -- A list of ScenarioComponents comprising this Scenario audits -- A list of ClusterAudits that will be performed as part of this Scenario tests -- A list of CTSTests that will be run """ # pylint: disable=invalid-name self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 } self.tests = tests self._audits = audits self._bad_news = None self._cm = cm self._components = components for comp in components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def is_applicable(self): """ Return True if all ScenarioComponents are applicable """ for comp in self._components: if not comp.is_applicable(): return False return True def setup(self): """ Set up the scenario, returning True on success. If setup fails at some point, tear down those components that did successfully set up. """ self._cm.prepare() self.audit() # Also detects remote/local log config - self._cm.ns.wait_for_all_nodes(self._cm.Env["nodes"]) + self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) self.audit() self._cm.install_support() - self._bad_news = LogWatcher(self._cm.Env["LogFileName"], + self._bad_news = LogWatcher(self._cm.env["LogFileName"], self._cm.templates.get_patterns("BadNews"), - self._cm.Env["nodes"], - self._cm.Env["LogWatcher"], + self._cm.env["nodes"], + self._cm.env["LogWatcher"], "BadNews", 0) self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self._components): if not self._components[j].setup(): # OOPS! We failed. Tear partial setups down. self.audit() self._cm.log("Tearing down partial setup") self.teardown(j) return False j += 1 self.audit() return True def teardown(self, n_components=None): """ Tear down the scenario in the reverse order it was set up. If n_components is not None, only tear down that many components. """ if not n_components: n_components = len(self._components)-1 j = n_components while j >= 0: self._components[j].teardown() j -= 1 self.audit() self._cm.install_support("uninstall") def incr(self, name): """ Increment the given stats key """ if not name in self.stats: self.stats[name] = 0 self.stats[name] += 1 def run(self, iterations): """ Run all tests in the scenario the given number of times """ - self._cm.oprofileStart() + self._cm.oprofile_start() try: self._run_loop(iterations) - self._cm.oprofileStop() + self._cm.oprofile_stop() except: - self._cm.oprofileStop() + self._cm.oprofile_stop() raise def _run_loop(self, iterations): """ Do the hard part of the run method - actually run all the tests the given number of times. """ raise NotImplementedError def run_test(self, test, testcount): """ Run the given test. testcount is the number of tests (including this one) that have been run across all iterations. """ - nodechoice = self._cm.Env.random_node() + nodechoice = self._cm.env.random_node() ret = True did_run = False - self._cm.instance_errorstoignore_clear() + self._cm.clear_instance_errors_to_ignore() choice = "(%s)" % nodechoice self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount)) starttime = test.set_timer() if not test.setup(nodechoice): self._cm.log("Setup failed") ret = False elif not test.can_run_now(nodechoice): self._cm.log("Skipped") test.skipped() else: did_run = True ret = test(nodechoice) if not test.teardown(nodechoice): self._cm.log("Teardown failed") - if not should_continue(self._cm.Env): + if not should_continue(self._cm.env): raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) ret = False stoptime = time.time() - self._cm.oprofileSave(testcount) + self._cm.oprofile_save(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if "min_time" not in test.stats: test.stats["elapsed_time"] = elapsed_time test.stats["min_time"] = test_time test.stats["max_time"] = test_time else: test.stats["elapsed_time"] += elapsed_time if test_time < test.stats["min_time"]: test.stats["min_time"] = test_time if test_time > test.stats["max_time"]: test.stats["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self._cm.statall() did_run = True # Force the test count to be incremented anyway so test extraction works self.audit(test.errors_to_ignore) return did_run def summarize(self): """ Output scenario results """ self._cm.log("****************") self._cm.log("Overall Results:%r" % self.stats) self._cm.log("****************") stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0 } self._cm.log("Test Summary") for test in self.tests: for key in stat_filter: stat_filter[key] = test.stats[key] name = "Test %s:" % test.name self._cm.log("{:<25} {!r}".format(name, stat_filter)) self._cm.debug("Detailed Results") for test in self.tests: name = "Test %s:" % test.name self._cm.debug("{:<25} {!r}".format(name, stat_filter)) self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, local_ignore=None): """ Perform all scenario audits and log results. If there are too many failures, prompt the user to confirm that the scenario should continue running. """ errcount = 0 ignorelist = ["CTS:"] if local_ignore: ignorelist.extend(local_ignore) - ignorelist.extend(self._cm.errorstoignore()) - ignorelist.extend(self._cm.instance_errorstoignore()) + ignorelist.extend(self._cm.errors_to_ignore) + ignorelist.extend(self._cm.instance_errors_to_ignore) # This makes sure everything is stabilized before starting... failed = 0 for audit in self._audits: if not audit(): self._cm.log("Audit %s FAILED." % audit.name) failed += 1 else: self._cm.debug("Audit %s passed." % audit.name) while errcount < 1000: match = None if self._bad_news: match = self._bad_news.look(0) if match: add_err = True for ignore in ignorelist: if add_err and re.search(ignore, match): add_err = False if add_err: self._cm.log("BadNews: %s" % match) self.incr("BadNews") errcount += 1 else: break else: print("Big problems") - if not should_continue(self._cm.Env): + if not should_continue(self._cm.env): self._cm.log("Shutting down.") self.summarize() self.teardown() raise ValueError("Looks like we hit a BadNews jackpot!") if self._bad_news: self._bad_news.end() return failed class AllOnce(Scenario): """ Every Test Once """ def _run_loop(self, iterations): testcount = 1 for test in self.tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): """ Random Test Execution """ def _run_loop(self, iterations): testcount = 1 while testcount <= iterations: - test = self._cm.Env.random_gen.choice(self.tests) + test = self._cm.env.random_gen.choice(self.tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): """ Named Tests in Sequence """ def _run_loop(self, iterations): testcount = 1 while testcount <= iterations: for test in self.tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): """ Start the Cluster """ def _run_loop(self, iterations): return class BootCluster(ScenarioComponent): """ The BootCluster component simply starts the cluster manager on all nodes, waiting for each to come up before starting given that a node might have been rebooted or crashed beforehand. """ def is_applicable(self): """ BootCluster is always applicable """ return True def setup(self): """ Set up the component, returning True on success """ self._cm.prepare() # Clear out the cobwebs ;-) self._cm.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. self._cm.log("Starting Cluster Manager on all nodes.") return self._cm.startall(verbose=True, quick=True) def teardown(self): """ Tear down the component """ self._cm.log("Stopping Cluster Manager on all nodes") self._cm.stopall(verbose=True, force=False) class LeaveBooted(BootCluster): """ The LeaveBooted component leaves all nodes up when the scenario is complete. """ def teardown(self): """ Tear down the component """ self._cm.log("Leaving Cluster running on all nodes") diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py index 329ba2fa38..fba883dc7a 100644 --- a/python/pacemaker/_cts/tests/componentfail.py +++ b/python/pacemaker/_cts/tests/componentfail.py @@ -1,159 +1,159 @@ """ Kill a pacemaker daemon and test how the cluster recovers """ __all__ = ["ComponentFail"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re from pacemaker._cts.audits import AuditResource from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class ComponentFail(CTSTest): """ A concrete test that kills a random pacemaker daemon and waits for the cluster to recover """ def __init__(self, cm): """ Create a new ComponentFail instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.is_unsafe = True self.name = "ComponentFail" - self._complist = cm.Components() + self._complist = cm.components self._okerrpatterns = [] self._patterns = [] self._startall = SimulStartLite(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") self._patterns = [] self._okerrpatterns = [] # start all nodes ret = self._startall(None) if not ret: return self.failure("Setup failed") if not self._cm.cluster_stable(self._env["StableTime"]): return self.failure("Setup failed - unstable") node_is_dc = self._cm.is_node_dc(node, None) # select a component to kill chosen = self._env.random_gen.choice(self._complist) - while chosen.dc_only and node_is_dc == 0: + while chosen.dc_only and not node_is_dc: chosen = self._env.random_gen.choice(self._complist) - self.debug("...component %s (dc=%d)" % (chosen.name, node_is_dc)) + self.debug("...component %s (dc=%s)" % (chosen.name, node_is_dc)) self.incr(chosen.name) if chosen.name != "corosync": self._patterns.extend([ self.templates["Pat:ChildKilled"] % (node, chosen.name), self.templates["Pat:ChildRespawn"] % (node, chosen.name) ]) self._patterns.extend(chosen.pats) if node_is_dc: self._patterns.extend(chosen.dc_pats) # @TODO this should be a flag in the Component if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]: # Ignore actions for fence devices if fencer will respawn # (their registration will be lost, and probes will fail) self._okerrpatterns = [ self.templates["Pat:Fencing_active"] ] (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self._cm, line) if r.rclass == "stonith": self._okerrpatterns.extend([ self.templates["Pat:Fencing_recover"] % r.id, self.templates["Pat:Fencing_probe"] % r.id ]) # supply a copy so self.patterns doesn't end up empty tmp_pats = self._patterns.copy() self._patterns.extend(chosen.badnews_ignore) # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status stonith_pats = [ self.templates["Pat:Fencing_ok"] % node ] stonith = self.create_watch(stonith_pats, 0) stonith.set_watch() # set the watch for stable watch = self.create_watch( tmp_pats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"]) watch.set_watch() # kill the component chosen.kill(node) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for any fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") self._cm.cluster_stable(self._env["StartTime"]) self.debug("Checking if %s was shot" % node) shot = stonith.look(60) if shot: self.debug("Found: %r" % shot) self._okerrpatterns.append(self.templates["Pat:Fencing_start"] % node) if not self._env["at-boot"]: - self._cm.ShouldBeStatus[node] = "down" + self._cm.expected_status[node] = "down" # If fencing occurred, chances are many (if not all) the expected logs # will not be sent - or will be lost when the node reboots return self.success() # check for logs indicating a graceful recovery matched = watch.look_for_all(allow_multiple_matches=True) if watch.unmatched: self._logger.log("Patterns not found: %r" % watch.unmatched) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["StartTime"]) if not matched: return self.failure("Didn't find all expected %s patterns" % chosen.name) if not is_stable: return self.failure("Cluster did not become stable after killing %s" % chosen.name) return self.success() @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ # Note that okerrpatterns refers to the last time we ran this test # The good news is that this works fine for us... self._okerrpatterns.extend(self._patterns) return self._okerrpatterns diff --git a/python/pacemaker/_cts/tests/fliptest.py b/python/pacemaker/_cts/tests/fliptest.py index d3f1571419..5e779367e4 100644 --- a/python/pacemaker/_cts/tests/fliptest.py +++ b/python/pacemaker/_cts/tests/fliptest.py @@ -1,61 +1,61 @@ """ Stop running nodes, and start stopped nodes """ __all__ = ["FlipTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import time from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.tests.stoptest import StopTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class FlipTest(CTSTest): """ A concrete test that stops running nodes and starts stopped nodes """ def __init__(self, cm): """ Create a new FlipTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Flip" self._start = StartTest(cm) self._stop = StopTest(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self.incr("stopped") ret = self._stop(node) kind = "up->down" # Give the cluster time to recognize it's gone... time.sleep(self._env["StableTime"]) - elif self._cm.ShouldBeStatus[node] == "down": + elif self._cm.expected_status[node] == "down": self.incr("started") ret = self._start(node) kind = "down->up" else: return self.skipped() self.incr(kind) if ret: return self.success() return self.failure("%s failure" % kind) diff --git a/python/pacemaker/_cts/tests/maintenancemode.py b/python/pacemaker/_cts/tests/maintenancemode.py index f9ed8678bd..d8adcb1a41 100644 --- a/python/pacemaker/_cts/tests/maintenancemode.py +++ b/python/pacemaker/_cts/tests/maintenancemode.py @@ -1,228 +1,228 @@ """ Toggle nodes in and out of maintenance mode """ __all__ = ["MaintenanceMode"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re from pacemaker._cts.audits import AuditResource from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable class MaintenanceMode(CTSTest): """ A concrete test that toggles nodes in and out of maintenance mode """ def __init__(self, cm): """ Create a new MaintenanceMode instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "MaintenanceMode" self._action = "asyncmon" self._rid = "maintenanceDummy" self._start = StartTest(cm) self._startall = SimulStartLite(cm) def _toggle_maintenance_mode(self, node, enabled): """ Toggle maintenance mode on the given node """ pats = [ self.templates["Pat:DC_IDLE"] ] if enabled: action = "On" else: action = "Off" # fail the resource right after turning Maintenance mode on # verify it is not recovered until maintenance mode is turned off if enabled: pats.append(self.templates["Pat:RscOpFail"] % (self._action, self._rid)) else: pats.extend([ self.templates["Pat:RscOpOK"] % ("stop", self._rid), self.templates["Pat:RscOpOK"] % ("start", self._rid) ]) watch = self.create_watch(pats, 60) watch.set_watch() self.debug("Turning maintenance mode %s" % action) self._rsh(node, self.templates["MaintenanceMode%s" % action]) if enabled: self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node)) with Timer(self._logger, self.name, "recover%s" % action): watch.look_for_all() if watch.unmatched: self.debug("Failed to find patterns when turning maintenance mode %s" % action) return repr(watch.unmatched) return "" def _insert_maintenance_dummy(self, node): """ Create a dummy resource on the given node """ pats = [ ("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self._rid)) ] watch = self.create_watch(pats, 60) watch.set_watch() - self._cm.AddDummyRsc(node, self._rid) + self._cm.add_dummy_rsc(node, self._rid) with Timer(self._logger, self.name, "addDummy"): watch.look_for_all() if watch.unmatched: self.debug("Failed to find patterns when adding maintenance dummy resource") return repr(watch.unmatched) return "" def _remove_maintenance_dummy(self, node): """ Remove the previously created dummy resource on the given node """ pats = [ self.templates["Pat:RscOpOK"] % ("stop", self._rid) ] watch = self.create_watch(pats, 60) watch.set_watch() - self._cm.RemoveDummyRsc(node, self._rid) + self._cm.remove_dummy_rsc(node, self._rid) with Timer(self._logger, self.name, "removeDummy"): watch.look_for_all() if watch.unmatched: self.debug("Failed to find patterns when removing maintenance dummy resource") return repr(watch.unmatched) return "" def _managed_rscs(self, node): """ Return a list of all resources managed by the cluster """ rscs = [] (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): tmp = AuditResource(self._cm, line) if tmp.managed: rscs.append(tmp.id) return rscs def _verify_resources(self, node, rscs, managed): """ Verify that all resources in rscList are managed if they are expected to be, or unmanaged if they are expected to be. """ managed_rscs = rscs managed_str = "managed" if not managed: managed_str = "unmanaged" (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): tmp = AuditResource(self._cm, line) if managed and not tmp.managed: continue if not managed and tmp.managed: continue if managed_rscs.count(tmp.id): managed_rscs.remove(tmp.id) if not managed_rscs: self.debug("Found all %s resources on %s" % (managed_str, node)) return True self._logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managed_rscs)) return False def __call__(self, node): """ Perform this test """ self.incr("calls") verify_managed = False verify_unmanaged = False fail_pat = "" if not self._startall(None): return self.failure("Setup failed") # get a list of all the managed resources. We use this list # after enabling maintenance mode to verify all managed resources # become un-managed. After maintenance mode is turned off, we use # this list to verify all the resources become managed again. managed_rscs = self._managed_rscs(node) if not managed_rscs: self._logger.log("No managed resources on %s" % node) return self.skipped() # insert a fake resource we can fail during maintenance mode # so we can verify recovery does not take place until after maintenance # mode is disabled. fail_pat += self._insert_maintenance_dummy(node) # toggle maintenance mode ON, then fail dummy resource. fail_pat += self._toggle_maintenance_mode(node, True) # verify all the resources are now unmanaged if self._verify_resources(node, managed_rscs, False): verify_unmanaged = True # Toggle maintenance mode OFF, verify dummy is recovered. fail_pat += self._toggle_maintenance_mode(node, False) # verify all the resources are now managed again if self._verify_resources(node, managed_rscs, True): verify_managed = True # Remove our maintenance dummy resource. fail_pat += self._remove_maintenance_dummy(node) self._cm.cluster_stable() if fail_pat != "": return self.failure("Unmatched patterns: %s" % fail_pat) if not verify_unmanaged: return self.failure("Failed to verify resources became unmanaged during maintenance mode") if not verify_managed: return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode") return self.success() @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ return [ r"Updating failcount for %s" % self._rid, r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self._rid, r"Unknown operation: fail", self.templates["Pat:RscOpOK"] % (self._action, self._rid), r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, 0) ] diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py index b342415516..a0a65b2f75 100644 --- a/python/pacemaker/_cts/tests/nearquorumpointtest.py +++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py @@ -1,125 +1,125 @@ """ Randomly start and stop nodes to bring the cluster close to the quorum point """ __all__ = ["NearQuorumPointTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class NearQuorumPointTest(CTSTest): """ A concrete test that randomly starts and stops nodes to bring the cluster close to the quorum point """ def __init__(self, cm): """ Create a new NearQuorumPointTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "NearQuorumPoint" def __call__(self, dummy): """ Perform this test """ self.incr("calls") startset = [] stopset = [] - stonith = self._cm.prepare_fencing_watcher("NearQuorumPoint") + stonith = self._cm.prepare_fencing_watcher() #decide what to do with each node for node in self._env["nodes"]: action = self._env.random_gen.choice(["start", "stop"]) if action == "start" : startset.append(node) elif action == "stop" : stopset.append(node) self.debug("start nodes:%r" % startset) self.debug("stop nodes:%r" % stopset) #add search patterns watchpats = [ ] for node in stopset: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": watchpats.append(self.templates["Pat:We_stopped"] % node) for node in startset: - if self._cm.ShouldBeStatus[node] == "down": + if self._cm.expected_status[node] == "down": watchpats.append(self.templates["Pat:Local_started"] % node) else: for stopping in stopset: - if self._cm.ShouldBeStatus[stopping] == "up": - watchpats.append(self.templates["Pat:They_stopped"] % (node, self._cm.key_for_node(stopping))) + if self._cm.expected_status[stopping] == "up": + watchpats.append(self.templates["Pat:They_stopped"] % (node, stopping)) if not watchpats: return self.skipped() if startset: watchpats.append(self.templates["Pat:DC_IDLE"]) watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) watch.set_watch() #begin actions for node in stopset: - if self._cm.ShouldBeStatus[node] == "up": - self._cm.StopaCMnoBlock(node) + if self._cm.expected_status[node] == "up": + self._cm.stop_cm_async(node) for node in startset: - if self._cm.ShouldBeStatus[node] == "down": - self._cm.StartaCMnoBlock(node) + if self._cm.expected_status[node] == "down": + self._cm.start_cm_async(node) #get the result if watch.look_for_all(): self._cm.cluster_stable() self._cm.fencing_cleanup("NearQuorumPoint", stonith) return self.success() self._logger.log("Warn: Patterns not found: %r" % watch.unmatched) #get the "bad" nodes upnodes = [] for node in stopset: - if self._cm.StataCM(node) == 1: + if self._cm.stat_cm(node): upnodes.append(node) downnodes = [] for node in startset: - if self._cm.StataCM(node) == 0: + if not self._cm.stat_cm(node): downnodes.append(node) self._cm.fencing_cleanup("NearQuorumPoint", stonith) if not upnodes and not downnodes: self._cm.cluster_stable() # Make sure they're completely down with no residule for node in stopset: self._rsh(node, self.templates["StopCmd"]) return self.success() if upnodes: self._logger.log("Warn: Unstoppable nodes: %r" % upnodes) if downnodes: self._logger.log("Warn: Unstartable nodes: %r" % downnodes) return self.failure() diff --git a/python/pacemaker/_cts/tests/partialstart.py b/python/pacemaker/_cts/tests/partialstart.py index a5104ba86c..745baf2394 100644 --- a/python/pacemaker/_cts/tests/partialstart.py +++ b/python/pacemaker/_cts/tests/partialstart.py @@ -1,71 +1,71 @@ """ Start a node and then tell it to stop before it is fully running """ __all__ = ["PartialStart"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.simulstoplite import SimulStopLite from pacemaker._cts.tests.stoptest import StopTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class PartialStart(CTSTest): """ A concrete test that interrupts a node before it's finished starting up """ def __init__(self, cm): """ Create a new PartialStart instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "PartialStart" self._startall = SimulStartLite(cm) self._stop = StopTest(cm) self._stopall = SimulStopLite(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") ret = self._stopall(None) if not ret: return self.failure("Setup failed") watchpats = [ "pacemaker-controld.*Connecting to .* cluster infrastructure" ] watch = self.create_watch(watchpats, self._env["DeadTime"] + 10) watch.set_watch() - self._cm.StartaCMnoBlock(node) + self._cm.start_cm_async(node) ret = watch.look_for_all() if not ret: self._logger.log("Patterns not found: %r" % watch.unmatched) return self.failure("Setup of %s failed" % node) ret = self._stop(node) if not ret: return self.failure("%s did not stop in time" % node) return self.success() @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ # We might do some fencing in the 2-node case if we make it up far enough return [ r"Executing reboot fencing operation", r"Requesting fencing \([^)]+\) targeting node " ] diff --git a/python/pacemaker/_cts/tests/resourcerecover.py b/python/pacemaker/_cts/tests/resourcerecover.py index f04abda463..a2ecfa8e31 100644 --- a/python/pacemaker/_cts/tests/resourcerecover.py +++ b/python/pacemaker/_cts/tests/resourcerecover.py @@ -1,171 +1,171 @@ """ Fail a random resource and verify its fail count increases """ __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.audits import AuditResource from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable class ResourceRecover(CTSTest): """ A concrete test that fails a random resource """ def __init__(self, cm): """ Create a new ResourceRecover instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "ResourceRecover" self._action = "asyncmon" self._interval = 0 self._rid = None self._rid_alt = None self._start = StartTest(cm) self._startall = SimulStartLite(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") if not self._startall(None): return self.failure("Setup failed") # List all resources active on the node (skip test if none) resourcelist = self._cm.active_resources(node) if not resourcelist: self._logger.log("No active resources on %s" % node) return self.skipped() # Choose one resource at random rsc = self._choose_resource(node, resourcelist) if rsc is None: return self.failure("Could not get details of resource '%s'" % self._rid) if rsc.id == rsc.clone_id: self.debug("Failing %s" % rsc.id) else: self.debug("Failing %s (also known as %s)" % (rsc.id, rsc.clone_id)) # Log patterns to watch for (failure, plus restart if managed) pats = [ self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id) ] if rsc.managed: pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._rid)) if rsc.unique: pats.append(self.templates["Pat:RscOpOK"] % ("start", self._rid)) else: # Anonymous clones may get restarted with a different clone number pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*")) # Fail resource. (Ideally, we'd fail it twice, to ensure the fail count # is incrementing properly, but it might restart on a different node. # We'd have to temporarily ban it from all other nodes and ensure the # migration-threshold hasn't been reached.) if self._fail_resource(rsc, node, pats) is None: # self.failure() already called return None return self.success() def _choose_resource(self, node, resourcelist): """ Choose a random resource to target """ self._rid = self._env.random_gen.choice(resourcelist) self._rid_alt = self._rid (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if line.startswith("Resource: "): rsc = AuditResource(self._cm, line) if rsc.id == self._rid: # Handle anonymous clones that get renamed self._rid = rsc.clone_id return rsc return None def _get_failcount(self, node): """ Check the fail count of targeted resource on given node """ cmd = "crm_failcount --quiet --query --resource %s --operation %s --interval %d --node %s" (rc, lines) = self._rsh(node, cmd % (self._rid, self._action, self._interval, node), verbose=1) if rc != 0 or len(lines) != 1: lines = [l.strip() for l in lines] self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(lines))) return -1 try: failcount = int(lines[0]) except (IndexError, ValueError): self._logger.log("crm_failcount output on %s unparseable: %s" % (node, " ".join(lines))) return -1 return failcount def _fail_resource(self, rsc, node, pats): """ Fail the targeted resource, and verify as expected """ orig_failcount = self._get_failcount(node) watch = self.create_watch(pats, 60) watch.set_watch() self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node)) with Timer(self._logger, self.name, "recover"): watch.look_for_all() self._cm.cluster_stable() - recovered = self._cm.ResourceLocation(self._rid) + recovered = self._cm.resource_location(self._rid) if watch.unmatched: return self.failure("Patterns not found: %r" % watch.unmatched) if rsc.unique and len(recovered) > 1: return self.failure("%s is now active on more than one node: %r" % (self._rid, recovered)) if recovered: self.debug("%s is running on: %r" % (self._rid, recovered)) elif rsc.managed: return self.failure("%s was not recovered and is inactive" % self._rid) new_failcount = self._get_failcount(node) if new_failcount != orig_failcount + 1: return self.failure("%s fail count is %d not %d" % (self._rid, new_failcount, orig_failcount + 1)) return 0 # Anything but None is success @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ return [ r"Updating failcount for %s" % self._rid, r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt), r"Unknown operation: fail", self.templates["Pat:RscOpOK"] % (self._action, self._rid), r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval) ] diff --git a/python/pacemaker/_cts/tests/restarttest.py b/python/pacemaker/_cts/tests/restarttest.py index b2d8e8d6c9..3b628ce6f4 100644 --- a/python/pacemaker/_cts/tests/restarttest.py +++ b/python/pacemaker/_cts/tests/restarttest.py @@ -1,49 +1,49 @@ """ Stop and restart a node """ __all__ = ["RestartTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.tests.stoptest import StopTest class RestartTest(CTSTest): """ A concrete test that stops and restarts a node """ def __init__(self, cm): """ Create a new RestartTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Restart" self._start = StartTest(cm) self._stop = StopTest(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") self.incr("node:%s" % node) - if self._cm.StataCM(node): + if self._cm.stat_cm(node): self.incr("WasStopped") if not self._start(node): return self.failure("start (setup) failure: %s" % node) self.set_timer() if not self._stop(node): return self.failure("stop failure: %s" % node) if not self._start(node): return self.failure("start failure: %s" % node) return self.success() diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py index 66f4d5eaf1..16cf67472c 100644 --- a/python/pacemaker/_cts/tests/simulstartlite.py +++ b/python/pacemaker/_cts/tests/simulstartlite.py @@ -1,131 +1,131 @@ """ Simultaneously start stopped nodes """ __all__ = ["SimulStartLite"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStartLite(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class starts any stopped nodes more or less simultaneously. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStartLite instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self,cm) self.name = "SimulStartLite" def __call__(self, dummy): """ Start all stopped nodes more or less simultaneously, returning whether this succeeded or not. """ self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... node_list = [] for node in self._env["nodes"]: - if self._cm.ShouldBeStatus[node] == "down": + if self._cm.expected_status[node] == "down": self.incr("WasStopped") node_list.append(node) self.set_timer() while len(node_list) > 0: # Repeat until all nodes come up uppat = self.templates["Pat:NonDC_started"] if self._cm.upcount() == 0: uppat = self.templates["Pat:Local_started"] watchpats = [ self.templates["Pat:DC_IDLE"] ] for node in node_list: watchpats.extend([uppat % node, self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node]) # Start all the nodes - at about the same time... watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() - stonith = self._cm.prepare_fencing_watcher(self.name) + stonith = self._cm.prepare_fencing_watcher() for node in node_list: - self._cm.StartaCMnoBlock(node) + self._cm.start_cm_async(node) watch.look_for_all() node_list = self._cm.fencing_cleanup(self.name, stonith) if node_list is None: return self.failure("Cluster did not stabilize") # Remove node_list messages from watch.unmatched for node in node_list: self._logger.debug("Dealing with stonith operations for %s" % node_list) if watch.unmatched: try: watch.unmatched.remove(uppat % node) except ValueError: self.debug("Already matched: %s" % (uppat % node)) try: watch.unmatched.remove(self.templates["Pat:InfraUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node)) try: watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node) except ValueError: self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node)) if watch.unmatched: for regex in watch.unmatched: self._logger.log ("Warn: Startup pattern not found: %s" % regex) if not self._cm.cluster_stable(): return self.failure("Cluster did not stabilize") did_fail = False unstable = [] for node in self._env["nodes"]: - if self._cm.StataCM(node) == 0: + if not self._cm.stat_cm(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstarted nodes exist: %s" % unstable) unstable = [] for node in self._env["nodes"]: if not self._cm.node_stable(node): did_fail = True unstable.append(node) if did_fail: return self.failure("Unstable cluster nodes exist: %s" % unstable) return self.success() def is_applicable(self): """ SimulStartLite is a setup test and never applicable """ return False diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py index a5b965e798..9dbc1f2d2d 100644 --- a/python/pacemaker/_cts/tests/simulstoplite.py +++ b/python/pacemaker/_cts/tests/simulstoplite.py @@ -1,91 +1,91 @@ """ Simultaneously stop running nodes """ __all__ = ["SimulStopLite"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class SimulStopLite(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class stops any running nodes more or less simultaneously. It can be used both to set up a test or to clean up a test. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new SimulStopLite instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self,cm) self.name = "SimulStopLite" def __call__(self, dummy): """ Stop all running nodes more or less simultaneously, returning whether this succeeded or not. """ self.incr("calls") self.debug("Setup: %s" % self.name) # We ignore the "node" parameter... watchpats = [] for node in self._env["nodes"]: - if self._cm.ShouldBeStatus[node] == "up": + if self._cm.expected_status[node] == "up": self.incr("WasStarted") watchpats.append(self.templates["Pat:We_stopped"] % node) if len(watchpats) == 0: return self.success() # Stop all the nodes - at about the same time... watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() self.set_timer() for node in self._env["nodes"]: - if self._cm.ShouldBeStatus[node] == "up": - self._cm.StopaCMnoBlock(node) + if self._cm.expected_status[node] == "up": + self._cm.stop_cm_async(node) if watch.look_for_all(): # Make sure they're completely down with no residule for node in self._env["nodes"]: self._rsh(node, self.templates["StopCmd"]) return self.success() did_fail = False up_nodes = [] for node in self._env["nodes"]: - if self._cm.StataCM(node) == 1: + if self._cm.stat_cm(node): did_fail = True up_nodes.append(node) if did_fail: return self.failure("Active nodes exist: %s" % up_nodes) self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched) return self.failure("Missing log message: %s " % watch.unmatched) def is_applicable(self): """ SimulStopLite is a setup test and never applicable """ return False diff --git a/python/pacemaker/_cts/tests/standbytest.py b/python/pacemaker/_cts/tests/standbytest.py index 76731f3f99..64046b9e22 100644 --- a/python/pacemaker/_cts/tests/standbytest.py +++ b/python/pacemaker/_cts/tests/standbytest.py @@ -1,111 +1,108 @@ """ Put a node into standby mode and check that resources migrate """ __all__ = ["StandbyTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.tests.starttest import StartTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StandbyTest(CTSTest): """ A concrete tests that puts a node into standby and checks that resources migrate away from the node """ def __init__(self, cm): """ Create a new StandbyTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Standby" self._start = StartTest(cm) self._startall = SimulStartLite(cm) # make sure the node is active # set the node to standby mode # check resources, none resource should be running on the node # set the node to active mode # check resources, resources should have been migrated back (SHOULD THEY?) def __call__(self, node): """ Perform this test """ self.incr("calls") ret = self._startall(None) if not ret: return self.failure("Start all nodes failed") self.debug("Make sure node %s is active" % node) - if self._cm.StandbyStatus(node) != "off": - if not self._cm.SetStandbyMode(node, "off"): + if self._cm.in_standby_mode(node): + if not self._cm.set_standby_mode(node, False): return self.failure("can't set node %s to active mode" % node) self._cm.cluster_stable() - status = self._cm.StandbyStatus(node) - if status != "off": - return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) + if self._cm.in_standby_mode(node): + return self.failure("standby status of %s is [on] but we expect [off]" % node) watchpats = [ r"State transition .* -> S_POLICY_ENGINE" ] watch = self.create_watch(watchpats, self._env["DeadTime"]+10) watch.set_watch() self.debug("Setting node %s to standby mode" % node) - if not self._cm.SetStandbyMode(node, "on"): + if not self._cm.set_standby_mode(node, True): return self.failure("can't set node %s to standby mode" % node) self.set_timer("on") ret = watch.look_for_all() if not ret: self._logger.log("Patterns not found: %r" % watch.unmatched) - self._cm.SetStandbyMode(node, "off") + self._cm.set_standby_mode(node, False) return self.failure("cluster didn't react to standby change on %s" % node) self._cm.cluster_stable() - status = self._cm.StandbyStatus(node) - if status != "on": - return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status)) + if not self._cm.in_standby_mode(node): + return self.failure("standby status of %s is [off] but we expect [on]" % node) self.log_timer("on") self.debug("Checking resources") rscs_on_node = self._cm.active_resources(node) if rscs_on_node: rc = self.failure("%s set to standby, %r is still running on it" % (node, rscs_on_node)) self.debug("Setting node %s to active mode" % node) - self._cm.SetStandbyMode(node, "off") + self._cm.set_standby_mode(node, False) return rc self.debug("Setting node %s to active mode" % node) - if not self._cm.SetStandbyMode(node, "off"): + if not self._cm.set_standby_mode(node, False): return self.failure("can't set node %s to active mode" % node) self.set_timer("off") self._cm.cluster_stable() - status = self._cm.StandbyStatus(node) - if status != "off": - return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) + if self._cm.in_standby_mode(node): + return self.failure("standby status of %s is [on] but we expect [off]" % node) self.log_timer("off") return self.success() diff --git a/python/pacemaker/_cts/tests/starttest.py b/python/pacemaker/_cts/tests/starttest.py index aba2899fd2..53a347a392 100644 --- a/python/pacemaker/_cts/tests/starttest.py +++ b/python/pacemaker/_cts/tests/starttest.py @@ -1,54 +1,54 @@ """ Start the cluster manager on a given node """ __all__ = ["StartTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StartTest(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class starts the cluster manager on a given node. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new StartTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self,cm) self.name = "Start" def __call__(self, node): """ Start the given node, returning whether this succeeded or not """ self.incr("calls") if self._cm.upcount() == 0: self.incr("us") else: self.incr("them") - if self._cm.ShouldBeStatus[node] != "down": + if self._cm.expected_status[node] != "down": return self.skipped() - if self._cm.StartaCM(node): + if self._cm.start_cm(node): return self.success() return self.failure("Startup %s on node %s failed" % (self._env["Name"], node)) diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py index da3848ad0c..e2fa341227 100644 --- a/python/pacemaker/_cts/tests/stonithdtest.py +++ b/python/pacemaker/_cts/tests/stonithdtest.py @@ -1,139 +1,139 @@ """ Fence a running node and wait for it to restart """ __all__ = ["StonithdTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker.exitstatus import ExitStatus from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StonithdTest(CTSTest): """ A concrete test that fences a running node and waits for it to restart """ def __init__(self, cm): """ Create a new StonithdTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Stonithd" self._startall = SimulStartLite(cm) def __call__(self, node): """ Perform this test """ self.incr("calls") if len(self._env["nodes"]) < 2: return self.skipped() ret = self._startall(None) if not ret: return self.failure("Setup failed") watchpats = [ self.templates["Pat:Fencing_ok"] % node, self.templates["Pat:NodeFenced"] % node ] if not self._env["at-boot"]: self.debug("Expecting %s to stay down" % node) - self._cm.ShouldBeStatus[node] = "down" + self._cm.expected_status[node] = "down" else: self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"])) watchpats.extend([ "%s.* S_STARTING -> S_PENDING" % node, "%s.* S_PENDING -> S_NOT_DC" % node ]) watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"]) watch.set_watch() origin = self._env.random_gen.choice(self._env["nodes"]) (rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node) if rc == ExitStatus.TIMEOUT: # Look for the patterns, usually this means the required # device was running on the node to be fenced - or that # the required devices were in the process of being loaded # and/or moved # # Effectively the node committed suicide so there will be # no confirmation, but pacemaker should be watching and # fence the node again self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node)) elif origin != node and rc != 0: self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc)) elif origin == node and rc != 255: # 255 == broken pipe, ie. the node was fenced as expected self._logger.log("Locally originated fencing returned %d" % rc) with Timer(self._logger, self.name, "fence"): matched = watch.look_for_all() self.set_timer("reform") if watch.unmatched: self._logger.log("Patterns not found: %r" % watch.unmatched) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["StartTime"]) if not matched: return self.failure("Didn't find all expected patterns") if not is_stable: return self.failure("Cluster did not become stable") self.log_timer("reform") return self.success() @property def errors_to_ignore(self): """ Return list of errors which should be ignored """ return [ self.templates["Pat:Fencing_start"] % ".*", self.templates["Pat:Fencing_ok"] % ".*", self.templates["Pat:Fencing_active"], r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ] def is_applicable(self): """ Return True if this test is applicable in the current test configuration. """ if not CTSTest.is_applicable(self): return False # pylint gets confused because of EnvFactory here. # pylint: disable=unsupported-membership-test if "DoFencing" in self._env: return self._env["DoFencing"] return True diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py index a068b4d828..1caa7cd56c 100644 --- a/python/pacemaker/_cts/tests/stoptest.py +++ b/python/pacemaker/_cts/tests/stoptest.py @@ -1,97 +1,97 @@ """ Stop the cluster manager on a given node """ __all__ = ["StopTest"] __copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker._cts.tests.ctstest import CTSTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StopTest(CTSTest): """ A pseudo-test that is only used to set up conditions before running some other test. This class stops the cluster manager on a given node. Other test classes should not use this one as a superclass. """ def __init__(self, cm): """ Create a new StopTest instance Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Stop" def __call__(self, node): """ Stop the given node, returning whether this succeeded or not """ self.incr("calls") - if self._cm.ShouldBeStatus[node] != "up": + if self._cm.expected_status[node] != "up": return self.skipped() # Technically we should always be able to notice ourselves stopping patterns = [ self.templates["Pat:We_stopped"] % node ] # Any active node needs to notice this one left # (note that this won't work if we have multiple partitions) for other in self._env["nodes"]: - if self._cm.ShouldBeStatus[other] == "up" and other != node: - patterns.append(self.templates["Pat:They_stopped"] %(other, self._cm.key_for_node(node))) + if self._cm.expected_status[other] == "up" and other != node: + patterns.append(self.templates["Pat:They_stopped"] %(other, node)) watch = self.create_watch(patterns, self._env["DeadTime"]) watch.set_watch() - if node == self._cm.OurNode: + if node == self._cm.our_node: self.incr("us") else: if self._cm.upcount() <= 1: self.incr("all") else: self.incr("them") - self._cm.StopaCM(node) + self._cm.stop_cm(node) watch.look_for_all() failreason = None unmatched_str = "||" if watch.unmatched: (_, output) = self._rsh(node, "/bin/ps axf", verbose=1) for line in output: self.debug(line) (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) for line in output: self.debug(line) for regex in watch.unmatched: self._logger.log ("ERROR: Shutdown pattern not found: %s" % regex) unmatched_str += "%s||" % regex failreason = "Missing shutdown pattern" self._cm.cluster_stable(self._env["DeadTime"]) if not watch.unmatched or self._cm.upcount() == 0: return self.success() if len(watch.unmatched) >= self._cm.upcount(): return self.failure("no match against (%s)" % unmatched_str) if failreason is None: return self.success() return self.failure(failreason)