diff --git a/INSTALL.md b/INSTALL.md index bfb5acdb7c..84b6c0114f 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1,82 +1,81 @@ # How to Install Pacemaker ## Build Dependencies | Version | Fedora-based | Suse-based | Debian-based | |:---------------:|:------------------:|:------------------:|:--------------:| | 1.13 or later | automake | automake | automake | | 2.64 or later | autoconf | autoconf | autoconf | | | libtool | libtool | libtool | | | libtool-ltdl-devel | | libltdl-dev | | | libuuid-devel | libuuid-devel | uuid-dev | | 0.28 or later | pkgconfig | pkgconfig | pkg-config | | 2.42.0 or later | glib2-devel | glib2-devel | libglib2.0-dev | | 2.9.2 or later | libxml2-devel | libxml2-devel | libxml2-dev | | | libxslt-devel | libxslt-devel | libxslt-dev | | | bzip2-devel | libbz2-devel | libbz2-dev | | 1.0.1 or later | libqb-devel | libqb-devel | libqb-dev | | 3.6 or later | python3 | python3 | python3 | | 0.18 or later | gettext-devel | gettext-tools | gettext | | 0.18 or later | | | autopoint | | 3.4.6 or later | gnutls-devel | libgnutls-devel | libgnutls-dev | Also: * make must be GNU (or compatible) (setting MAKE=gmake might also work but is untested) * GNU (or compatible) getopt must be somewhere on the PATH ### Cluster Stack Dependencies *Only corosync is currently supported* | Version | Fedora-based | Suse-based | Debian-based | |:---------------:|:------------------:|:------------------:|:--------------:| | 2.0.0 or later | corosynclib | libcorosync | corosync | | 2.0.0 or later | corosynclib-devel | libcorosync-devel | | | | | | libcfg-dev | | | | | libcpg-dev | | | | | libcmap-dev | | | | | libquorum-dev | ### Optional Build Dependencies | Feature Enabled | Version | Fedora-based | Suse-based | Debian-based | |:-----------------------------------------------:|:--------------:|:-----------------------:|:-----------------------:|:-----------------------:| | encrypted remote CIB admin | | pam-devel | pam-devel | libpam0g-dev | | interactive crm_mon | | ncurses-devel | ncurses-devel | ncurses-dev | | systemd support | | systemd-devel | systemd-devel | libsystemd-dev | | systemd resource support | 1.5.12 or later| dbus-devel | dbus-devel | libdbus-1-dev | | Linux-HA style fencing agents | | cluster-glue-libs-devel | libglue-devel | cluster-glue-dev | | documentation | | help2man | help2man | help2man | | documentation | | docbook-style-xsl | docbook-xsl-stylesheets | docbook-xsl | | documentation | | python3-sphinx | python3-sphinx | python3-sphinx | | documentation (PDF) | | latexmk texlive texlive-capt-of texlive-collection-xetex texlive-fncychap texlive-framed texlive-multirow texlive-needspace texlive-tabulary texlive-titlesec texlive-threeparttable texlive-upquote texlive-wrapfig texlive-xetex | texlive texlive-latex | texlive texlive-latex-extra | | annotated source code as HTML via "make global" | | global | global | global | | RPM packages via "make rpm" | 4.14 or later | rpm | rpm | (n/a) | | unit tests | 1.1.0 or later | libcmocka-devel | libcmocka-devel | libcmocka-dev | ## Optional Testing Dependencies * procps and psmisc (if running cts-exec, cts-fencing, or CTS lab) * valgrind (if running valgrind tests in cts-cli, cts-scheduler, or CTS lab) * python3-dateutil and python3-systemd (if running CTS lab on cluster nodes running systemd) * nmap (if not specifying an IP address base) -* oprofile (if running CTS lab profiling tests) * dlm (to log DLM debugging info after CTS lab tests) * xmllint (to validate tool output in cts-cli) ## Simple Install $ make && sudo make install If GNU make is not your default make, use "gmake" instead. ## Detailed Install First, browse the build options that are available: $ ./autogen.sh $ ./configure --help Re-run ./configure with any options you want, then proceed with the simple method. diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py index b7589c5269..ab0a21e93d 100644 --- a/python/pacemaker/_cts/clustermanager.py +++ b/python/pacemaker/_cts/clustermanager.py @@ -1,858 +1,817 @@ """ClusterManager class for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["ClusterManager"] __copyright__ = """Copyright 2000-2025 the Pacemaker project contributors. Certain portions by Huang Zhen are copyright 2004 International Business Machines. The version control history for this file may have further details.""" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import time from collections import UserDict from pacemaker.buildoptions import BuildOptions from pacemaker.exitstatus import ExitStatus from pacemaker._cts.CTS import NodeStatus from pacemaker._cts.audits import AuditResource from pacemaker._cts.cib import ConfigFactory from pacemaker._cts.environment import EnvFactory from pacemaker._cts.logging import LogFactory from pacemaker._cts.patterns import PatternSelector from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogWatcher # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). # @TODO See if type annotations fix this. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable # ClusterManager has a lot of methods. # pylint: disable=too-many-public-methods class ClusterManager(UserDict): """ An abstract base class for managing the cluster. This class implements high-level operations on the cluster and/or its cluster managers. Actual cluster-specific management classes should be subclassed from this one. Among other things, this class tracks the state every node is expected to be in. """ def __init__(self): """Create a new ClusterManager instance.""" # Eventually, ClusterManager should not be a UserDict subclass. Until # that point... # pylint: disable=super-init-not-called self.__instance_errors_to_ignore = [] self._cib_installed = False self._logger = LogFactory() self.env = EnvFactory().getInstance() self.expected_status = {} self.name = self.env["Name"] # pylint: disable=invalid-name self.ns = NodeStatus(self.env) self.our_node = os.uname()[1].lower() self.partitions_expected = 1 self.rsh = RemoteFactory().getInstance() self.templates = PatternSelector(self.name) self._cib_factory = ConfigFactory(self) self._cib = self._cib_factory.create_config(self.env["Schema"]) self._cib_sync = {} def clear_instance_errors_to_ignore(self): """Reset instance-specific errors to ignore on each iteration.""" self.__instance_errors_to_ignore = [] @property def instance_errors_to_ignore(self): """Return a list of known errors that should be ignored for a specific test instance.""" return self.__instance_errors_to_ignore @property def errors_to_ignore(self): """Return a list of known error messages that should be ignored.""" return self.templates.get_patterns("BadNewsIgnore") def log(self, args): """Log a message.""" self._logger.log(args) def debug(self, args): """Log a debug message.""" self._logger.debug(args) def upcount(self): """Return how many nodes are up.""" count = 0 for node in self.env["nodes"]: if self.expected_status[node] == "up": count += 1 return count def install_support(self, command="install"): """ Install or uninstall the CTS support files. This includes various init scripts and data, daemons, fencing agents, etc. """ for node in self.env["nodes"]: self.rsh(node, f"{BuildOptions.DAEMON_DIR}/cts-support {command}") def prepare_fencing_watcher(self): """Return a LogWatcher object that watches for fencing log messages.""" # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced if self.has_quorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonith_pats = [] for peer in self.env["nodes"]: if self.expected_status[peer] == "up": continue stonith_pats.extend([ self.templates["Pat:Fencing_ok"] % peer, self.templates["Pat:Fencing_start"] % peer, ]) stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"], self.env["log_kind"], "StartupFencing", 0) stonith.set_watch() return stonith def fencing_cleanup(self, node, stonith): """Wait for a previously fenced node to return to the cluster.""" peer_list = [] peer_state = {} self.debug(f"Looking for nodes that were fenced as a result of {node} starting") # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.has_quorum(None) if not q and len(self.env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug(f"Quorum: {q} Len: {len(self.env['nodes'])}") return peer_list for n in self.env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug(f"looking for: {stonith.regexes!r}") shot = stonith.look(0) while shot: self.debug(f"Found: {shot!r}") del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self._logger.log(f"ERROR: Unknown stonith match: {shot!r}") elif peer not in peer_list: self.debug(f"Found peer: {peer}") peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(f" Peer {peer} was fenced as a result of {node} starting: {peer_state[peer]}") if self.env["at-boot"]: self.expected_status[peer] = "up" else: self.expected_status[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while stonith.regexes and shot: self.debug(f"Found: {shot!r}") del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too self.ns.wait_for_node(peer, self.env["DeadTime"]) # Poll until it comes up if self.env["at-boot"]: if not self.stat_cm(peer): time.sleep(self.env["StartTime"]) if not self.stat_cm(peer): self._logger.log(f"ERROR: Peer {peer} failed to restart after being fenced") return None return peer_list def _install_config(self, node): """Remove and re-install the CIB on the first node in the cluster.""" if not self.ns.wait_for_node(node): self.log(f"Node {node} is not up.") return if node in self._cib_sync or not self.env["ClobberCIB"]: return self._cib_sync[node] = True self.rsh(node, f"rm -f {BuildOptions.CIB_DIR}/cib*") # Only install the CIB on the first node, all the other ones will pick it up from there if self._cib_installed: return self._cib_installed = True if self.env["CIBfilename"]: self.log(f"Installing CIB ({self.env['CIBfilename']}) on node {node}") rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) if rc != 0: raise ValueError(f"Can not scp file to {node} {rc}") else: self.log(f"Installing Generated CIB on node {node}") self._cib.install(node) self.rsh(node, f"chown {BuildOptions.DAEMON_USER} {BuildOptions.CIB_DIR}/cib.xml") def start_cm(self, node, verbose=False): """Start up the cluster manager on a given node.""" log_fn = self._logger.log if verbose else self.debug log_fn(f"Starting {self.name} on node {node}") if node not in self.expected_status: self.expected_status[node] = "down" if self.expected_status[node] != "down": return True # Technically we should always be able to notice ourselves starting patterns = [ self.templates["Pat:Local_started"] % node, ] if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) watch = LogWatcher(self.env["LogFileName"], patterns, self.env["nodes"], self.env["log_kind"], "StartaCM", self.env["StartTime"] + 10) self._install_config(node) self.expected_status[node] = "any" if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): self._logger.log(f"{node} was already started") return True stonith = self.prepare_fencing_watcher() watch.set_watch() (rc, _) = self.rsh(node, self.templates["StartCmd"]) if rc != 0: self._logger.log(f"Warn: Start command failed on node {node}") self.fencing_cleanup(node, stonith) return False self.expected_status[node] = "up" watch_result = watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self._logger.log(f"Warn: Startup pattern not found: {regex}") if watch_result and self.cluster_stable(self.env["DeadTime"]): self.fencing_cleanup(node, stonith) return True if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]): self.fencing_cleanup(node, stonith) return True self._logger.log(f"Warn: Start failed for node {node}") return False def start_cm_async(self, node, verbose=False): """Start up the cluster manager on a given node without blocking.""" log_fn = self._logger.log if verbose else self.debug log_fn(f"Starting {self.name} on node {node}") self._install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=False) self.expected_status[node] = "up" def stop_cm(self, node, verbose=False, force=False): """Stop the cluster manager on a given node.""" log_fn = self._logger.log if verbose else self.debug log_fn(f"Stopping {self.name} on node {node}") if self.expected_status[node] != "up" and not force: return True (rc, _) = self.rsh(node, self.templates["StopCmd"]) if rc == 0: # Make sure we can continue even if corosync leaks self.expected_status[node] = "down" self.cluster_stable(self.env["DeadTime"]) return True self._logger.log(f"ERROR: Could not stop {self.name} on node {node}") return False def stop_cm_async(self, node): """Stop the cluster manager on a given node without blocking.""" self.debug(f"Stopping {self.name} on node {node}") self.rsh(node, self.templates["StopCmd"], synchronous=False) self.expected_status[node] = "down" def startall(self, nodelist=None, verbose=False, quick=False): """Start the cluster manager on every node in the cluster, or on every node in nodelist.""" if not nodelist: nodelist = self.env["nodes"] for node in nodelist: if self.expected_status[node] == "down": self.ns.wait_for_all_nodes(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... return self.start_cm(nodelist[0], verbose=verbose) # Approximation of SimulStartList for --boot watchpats = [ self.templates["Pat:DC_IDLE"], ] for node in nodelist: watchpats.extend([ self.templates["Pat:InfraUp"] % node, self.templates["Pat:PacemakerUp"] % node, self.templates["Pat:Local_started"] % node, self.templates["Pat:They_up"] % (nodelist[0], node), ]) # Start all the nodes - at about the same time... watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"], self.env["log_kind"], "fast-start", self.env["DeadTime"] + 10) watch.set_watch() if not self.start_cm(nodelist[0], verbose=verbose): return False for node in nodelist: self.start_cm_async(node, verbose=verbose) watch.look_for_all() if watch.unmatched: for regex in watch.unmatched: self._logger.log(f"Warn: Startup pattern not found: {regex}") if not self.cluster_stable(): self._logger.log("Cluster did not stabilize") return False return True def stopall(self, nodelist=None, verbose=False, force=False): """Stop the cluster manager on every node in the cluster, or on every node in nodelist.""" ret = True if not nodelist: nodelist = self.env["nodes"] for node in self.env["nodes"]: if self.expected_status[node] == "up" or force: if not self.stop_cm(node, verbose=verbose, force=force): ret = False return ret def statall(self, nodelist=None): """Return the status of the cluster manager on every node in the cluster, or on every node in nodelist.""" result = {} if not nodelist: nodelist = self.env["nodes"] for node in nodelist: if self.stat_cm(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): """Break communication between the target node and all other nodes in the cluster, or nodes.""" if not nodes: nodes = self.env["nodes"] for node in nodes: if node == target: continue (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node) if rc != 0: self._logger.log(f"Could not break the communication between {target} and {node}: {rc}") return False self.debug(f"Communication cut between {target} and {node}") return True def unisolate_node(self, target, nodes=None): """Re-establish communication between the target node and all other nodes in the cluster, or nodes.""" if not nodes: nodes = self.env["nodes"] for node in nodes: if node == target: continue # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False) self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False) self.debug(f"Communication restored between {target} and {node}") - def oprofile_start(self, node=None): - """Start profiling on the given node, or all nodes in the cluster.""" - if not node: - for n in self.env["oprofile"]: - self.oprofile_start(n) - - elif node in self.env["oprofile"]: - self.debug(f"Enabling oprofile on {node}") - self.rsh(node, "opcontrol --init") - self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") - self.rsh(node, "opcontrol --start") - self.rsh(node, "opcontrol --reset") - - def oprofile_save(self, test, node=None): - """Save profiling data and restart profiling on the given node, or all nodes in the cluster.""" - if not node: - for n in self.env["oprofile"]: - self.oprofile_save(test, n) - - elif node in self.env["oprofile"]: - self.rsh(node, "opcontrol --dump") - self.rsh(node, f"opcontrol --save=cts.{test}") - # Read back with: opreport -l session:cts.0 image:/c* - self.oprofile_stop(node) - self.oprofile_start(node) - - def oprofile_stop(self, node=None): - """ - Start profiling on the given node, or all nodes in the cluster. - - This does not save profiling data, so call oprofile_save first if needed. - """ - if not node: - for n in self.env["oprofile"]: - self.oprofile_stop(n) - - elif node in self.env["oprofile"]: - self.debug(f"Stopping oprofile on {node}") - self.rsh(node, "opcontrol --reset") - self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") - def prepare(self): """ Finish initialization. Clear out the expected status and record the current status of every node in the cluster. """ self.partitions_expected = 1 for node in self.env["nodes"]: self.expected_status[node] = "" # This used to be conditional on whether SplitBrainTest was # allowed to run. SplitBrainTest is supposed to unisolate # the nodes, so this shouldn't be necessary here. However, # SplitBrainTest was flagged as "experimental" from 2009 to # 2025 and wasn't allowed to run by default. So uncomment # this if problems emerge. # # @COMPAT Delete this comment if no problems emerge after a # while. # # self.unisolate_node(node) self.stat_cm(node) def test_node_cm(self, node): """ Check the status of a given node. Returns 0 if the node is down, 1 if the node is up but unstable, and 2 if the node is up and stable. """ watchpats = [ "Current ping state: (S_IDLE|S_NOT_DC)", self.templates["Pat:NonDC_started"] % node, self.templates["Pat:DC_started"] % node, ] idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node], self.env["log_kind"], "ClusterIdle") idle_watch.set_watch() (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if not out: out = "" else: out = out[0].strip() self.debug(f"Node {node} status: '{out}'") if out.find('ok') < 0: if self.expected_status[node] == "up": self.log(f"Node status for {node} is down but we think it should be {self.expected_status[node]}") self.expected_status[node] = "down" return 0 if self.expected_status[node] == "down": self.log(f"Node status for {node} is up but we think it should be {self.expected_status[node]}: {out}") self.expected_status[node] = "up" # check the output first - because syslog-ng loses messages if out.find('S_NOT_DC') != -1: # Up and stable return 2 if out.find('S_IDLE') != -1: # Up and stable return 2 # fall back to syslog-ng and wait if not idle_watch.look(): # just up self.debug(f"Warn: Node {node} is unstable: {out}") return 1 # Up and stable return 2 def stat_cm(self, node): """Report the status of the cluster manager on a given node.""" return self.test_node_cm(node) > 0 # Being up and being stable is not the same question... def node_stable(self, node): """Return whether or not the given node is stable.""" if self.test_node_cm(node) == 2: return True self.log(f"Warn: Node {node} not stable") return False def _partition_stable(self, nodes, timeout=None): """Return whether or not all nodes in the given partition are stable.""" watchpats = [ "Current ping state: S_IDLE", self.templates["Pat:DC_IDLE"], ] self.debug("Waiting for cluster stability...") if timeout is None: timeout = self.env["DeadTime"] if len(nodes) < 3: self.debug("Cluster is inactive") return True idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(), self.env["log_kind"], "ClusterStable", timeout) idle_watch.set_watch() for node in nodes.split(): # have each node dump its current state self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) ret = idle_watch.look() while ret: self.debug(ret) for node in nodes.split(): if re.search(node, ret): return True ret = idle_watch.look() self.debug(f"Warn: Partition {nodes!r} not IDLE after {timeout}s") return False def cluster_stable(self, timeout=None, double_check=False): """Return whether or not all nodes in the cluster are stable.""" partitions = self.find_partitions() for partition in partitions: if not self._partition_stable(partition, timeout): return False if not double_check: return True # Make sure we are really stable and that all resources, # including those that depend on transient node attributes, # are started if they were going to be time.sleep(5) for partition in partitions: if not self._partition_stable(partition, timeout): return False return True def is_node_dc(self, node, status_line=None): """ Return whether or not the given node is the cluster DC. Check the given status_line, or query the cluster if None. """ if not status_line: (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) if out: status_line = out[0].strip() if not status_line: return False if status_line.find('S_IDLE') != -1: return True if status_line.find('S_INTEGRATION') != -1: return True if status_line.find('S_FINALIZE_JOIN') != -1: return True if status_line.find('S_POLICY_ENGINE') != -1: return True if status_line.find('S_TRANSITION_ENGINE') != -1: return True return False def active_resources(self, node): """Return a list of primitive resources active on the given node.""" (_, output) = self.rsh(node, "crm_resource -c", verbose=1) resources = [] for line in output: if not re.search("^Resource", line): continue tmp = AuditResource(self, line) if tmp.type == "primitive" and tmp.host == node: resources.append(tmp.id) return resources def resource_location(self, rid): """Return a list of nodes on which the given resource is running.""" resource_nodes = [] for node in self.env["nodes"]: if self.expected_status[node] != "up": continue cmd = self.templates["RscRunning"] % rid (rc, lines) = self.rsh(node, cmd) if rc == 127: self.log(f"Command '{cmd}' failed. Binary or pacemaker-cts package not installed?") for line in lines: self.log(f"Output: {line} ") elif rc == 0: resource_nodes.append(node) return resource_nodes def find_partitions(self): """ Return a list of all partitions in the cluster. Each element of the list is itself a list of all active nodes in that partition. """ ccm_partitions = [] for node in self.env["nodes"]: if self.expected_status[node] != "up": self.debug(f"Node {node} is down... skipping") continue (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) if not out: self.log(f"no partition details for {node}") continue partition = out[0].strip() if len(partition) <= 2: self.log(f"bad partition details for {node}") continue nodes = partition.split() nodes.sort() partition = ' '.join(nodes) found = 0 for a_partition in ccm_partitions: if partition == a_partition: found = 1 if found == 0: self.debug(f"Adding partition from {node}: {partition}") ccm_partitions.append(partition) else: self.debug(f"Partition '{partition}' from {node} is consistent with existing entries") self.debug(f"Found partitions: {ccm_partitions!r}") return ccm_partitions def has_quorum(self, node_list): """Return whether or not the cluster has quorum.""" # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.env["nodes"] for node in node_list: if self.expected_status[node] != "up": continue (rc, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) if rc != ExitStatus.OK: self.debug(f"WARN: Quorum check on {node} returned error ({rc})") continue quorum = quorum[0].strip() if quorum.find("1") != -1: return True if quorum.find("0") != -1: return False self.debug(f"WARN: Unexpected quorum test result from {node}:{quorum}") return False @property def components(self): """ Return a list of all patterns that should be ignored for the cluster's components. This must be provided by all subclasses. """ raise NotImplementedError def in_standby_mode(self, node): """Return whether or not the node is in Standby.""" (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) if not out: return False out = out[0].strip() self.debug(f"Standby result: {out}") return out == "on" def set_standby_mode(self, node, status): """ Set node to Standby if status is True, or Active if status is False. Return whether the node is now in the requested status. """ current_status = self.in_standby_mode(node) if current_status == status: return True if status: cmd = self.templates["StandbyCmd"] % (node, "on") else: cmd = self.templates["StandbyCmd"] % (node, "off") (rc, _) = self.rsh(node, cmd) return rc == 0 def add_dummy_rsc(self, node, rid): """Add a dummy resource with the given ID to the given node.""" rsc_xml = f""" ' '""" constraint_xml = f""" ' '""" self.rsh(node, self.templates['CibAddXml'] % rsc_xml) self.rsh(node, self.templates['CibAddXml'] % constraint_xml) def remove_dummy_rsc(self, node, rid): """Remove the previously added dummy resource given by rid on the given node.""" constraint = f"\"//rsc_location[@rsc='{rid}']\"" rsc = f"\"//primitive[@id='{rid}']\"" self.rsh(node, self.templates['CibDelXpath'] % constraint) self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index 5c69f63168..fa25bddc90 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,492 +1,487 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory", "set_cts_path"] __copyright__ = "Copyright 2014-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse from contextlib import suppress from glob import glob import os import random import shlex import socket import sys from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). # @TODO See if type annotations fix this. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["DeadTime"] = 300 self["StartTime"] = 300 self["StableTime"] = 30 self["tests"] = [] self["DoFencing"] = True self["CIBResource"] = False self["log_kind"] = None self["scenario"] = "random" self["syslog_facility"] = "daemon" # Hard-coded since there is only one supported cluster manager/stack self["Name"] = "crm-corosync" self["Stack"] = "corosync 2+" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def dump(self): """Print the current environment.""" for key in sorted(self.data.keys()): self._logger.debug(f"{f'Environment[{key}]':35}: {str(self[key])}") def __contains__(self, key): """Return True if the given key exists in the environment.""" return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "nodes": self.data["nodes"] = [] for node in value: node = node.strip() # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: # @TODO This only handles IPv4, use getaddrinfo() instead # (here and in _discover()) socket.gethostbyname_ex(node) self.data["nodes"].append(node) except socket.herror: self._logger.log(f"{node} not found in DNS... aborting") raise else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def _detect_systemd(self, node): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(node, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self, node): """Detect the syslog variant in use on the target node (if any).""" if "syslogd" in self.data: return if self["have_systemd"]: # Systemd (_, lines) = self._rsh(node, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) else: # SYS-V (_, lines) = self._rsh(node, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) with suppress(IndexError): self["syslogd"] = lines[0].strip() def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl disable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} off") return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl enable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} on") return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, f"systemctl is-enabled {service} | grep enabled") return rc == 0 # SYS-V (rc, _) = self._rsh(node, f"chkconfig --list | grep -e {service}.*on") return rc == 0 def _detect_at_boot(self, node): """Detect if the cluster starts at boot.""" self["at-boot"] = any(self.service_is_enabled(node, service) for service in ("pacemaker", "corosync")) def _detect_ip_offset(self, node): """Detect the offset for IPaddr resources.""" if self["CIBResource"] and "IPBase" not in self.data: (_, lines) = self._rsh(node, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(node, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") return last_part = self["IPBase"].split('.')[3] if int(last_part) >= 240: self._logger.log(f"Could not determine an offset for IPaddr resources. Upper bound is too high: {self['IPBase']} {last_part}") self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser node = self["nodes"][0] self._detect_systemd(node) self._detect_syslog(node) self._detect_at_boot(node) self._detect_ip_offset(node) def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser(epilog=f"{sys.argv[0]} -g virt1 -r --stonith ssh --schema pacemaker-2.0 500") grp1 = parser.add_argument_group("Common options") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", default="", metavar="NODES", help="List of cluster nodes separated by whitespace") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named tests, separated by whitespace") grp3.add_argument("--fencing", "--stonith", choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"], default="1", help="What fencing agent to use") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") - grp4.add_argument("--oprofile", - default="", - metavar="NODES", - help="List of cluster nodes to run oprofile on") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--schema", metavar="SCHEMA", default=f"pacemaker-{BuildOptions.CIB_SCHEMA_VERSION}", help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--stonith-args", metavar="ARGS", default="hostlist=all,livedangerously=yes", help="") grp4.add_argument("--stonith-type", metavar="TYPE", default="external/ssh", help="") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") grp4.add_argument("--warn-inactive", action="store_true", help="Warn if a resource is assigned to an inactive node") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. Most get a default from the add_argument # calls, they only do one thing, and they do not have any side effects. self["CIBfilename"] = args.cib_filename if args.cib_filename else None self["ClobberCIB"] = args.clobber_cib self["ListTests"] = args.list_tests self["Schema"] = args.schema self["TruncateLog"] = args.truncate self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["iterations"] = args.iterations self["nodes"] = shlex.split(args.nodes) self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient - self["oprofile"] = shlex.split(args.oprofile) self["stonith-params"] = args.stonith_args self["stonith-type"] = args.stonith_type self["unsafe-tests"] = not args.no_unsafe_tests self["warn-inactive"] = args.warn_inactive # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.choose: self["scenario"] = "sequence" self["tests"].extend(shlex.split(args.choose)) self["iterations"] = len(self["tests"]) if args.fencing in ["0", "no"]: self["DoFencing"] = False elif args.fencing in ["rhcs", "virt", "xvm"]: self["stonith-type"] = "fence_xvm" elif args.fencing == "scsi": self["stonith-type"] = "fence_scsi" elif args.fencing in ["lha", "ssh"]: self["stonith-params"] = "hostlist=all,livedangerously=yes" self["stonith-type"] = "external/ssh" elif args.fencing == "openstack": self["stonith-type"] = "fence_openstack" print("Obtaining OpenStack credentials from the current environment") region = os.environ['OS_REGION_NAME'] tenant = os.environ['OS_TENANT_NAME'] auth = os.environ['OS_AUTH_URL'] user = os.environ['OS_USERNAME'] password = os.environ['OS_PASSWORD'] self["stonith-params"] = f"region={region},tenant={tenant},auth={auth},user={user},password={password}" elif args.fencing == "rhevm": self["stonith-type"] = "fence_rhevm" print("Obtaining RHEV-M credentials from the current environment") user = os.environ['RHEVM_USERNAME'] password = os.environ['RHEVM_PASSWORD'] server = os.environ['RHEVM_SERVER'] port = os.environ['RHEVM_PORT'] self["stonith-params"] = f"login={user},passwd={password},ipaddr={server},ipport={port},ssl=1,shell_timeout=10" if args.ip: self["CIBResource"] = True self["ClobberCIB"] = True self["IPBase"] = args.ip if args.logfile == "journal": self["LogAuditDisabled"] = True self["log_kind"] = LogKind.JOURNAL elif args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["log_kind"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) if args.populate_resources: self["CIBResource"] = True self["ClobberCIB"] = True self.random_gen.seed(args.seed) class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance def set_cts_path(extra=None): """Set the PATH environment variable appropriately for the tests.""" new_path = os.environ['PATH'] # Add any search paths given on the command line if extra is not None: for p in extra: new_path = f"{p}:{new_path}" cwd = os.getcwd() if os.path.exists(f"{cwd}/cts/cts-attrd.in"): # pylint: disable=protected-access print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR}") for d in glob(f"{BuildOptions._BUILD_DIR}/daemons/*/"): new_path = f"{d}:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}" print(f"Using local schemas from: {cwd}/xml") os.environ["PCMK_schema_directory"] = f"{cwd}/xml" else: print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {cwd})") new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}" os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR print(f'Using PATH="{new_path}"') os.environ['PATH'] = new_path diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py index 07f2b38805..767ffa13f1 100644 --- a/python/pacemaker/_cts/scenarios.py +++ b/python/pacemaker/_cts/scenarios.py @@ -1,421 +1,409 @@ """Test scenario classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = [ "AllOnce", "Boot", "BootCluster", "LeaveBooted", "RandomTests", "Sequence", ] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import time from pacemaker._cts.audits import ClusterAudit from pacemaker._cts.input import should_continue from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.watcher import LogWatcher class ScenarioComponent: """ The base class for all scenario components. A scenario component is one single step in a scenario. Each component is basically just a setup and teardown method. """ def __init__(self, cm, env): """ Create a new ScenarioComponent instance. Arguments: cm -- A ClusterManager instance env -- An Environment instance """ # pylint: disable=invalid-name self._cm = cm self._env = env def is_applicable(self): """ Return True if this component is applicable in the given Environment. This method must be provided by all subclasses. """ raise NotImplementedError def setup(self): """ Set up the component, returning True on success. This method must be provided by all subclasses. """ raise NotImplementedError def teardown(self): """ Tear down the given component. This method must be provided by all subclasses. """ raise NotImplementedError class Scenario: """ The base class for scenarios. A scenario is an ordered list of ScenarioComponent objects. A scenario proceeds by setting up all its components in sequence, running a list of tests and audits, and then tearing down its components in reverse. """ def __init__(self, cm, components, audits, tests): """ Create a new Scenario instance. Arguments: cm -- A ClusterManager instance components -- A list of ScenarioComponents comprising this Scenario audits -- A list of ClusterAudits that will be performed as part of this Scenario tests -- A list of CTSTests that will be run """ # pylint: disable=invalid-name self.stats = { "success": 0, "failure": 0, "BadNews": 0, "skipped": 0 } self.tests = tests self._audits = audits self._bad_news = None self._cm = cm self._components = components for comp in components: if not issubclass(comp.__class__, ScenarioComponent): raise ValueError("Init value must be subclass of ScenarioComponent") for audit in audits: if not issubclass(audit.__class__, ClusterAudit): raise ValueError("Init value must be subclass of ClusterAudit") for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") def is_applicable(self): """Return True if all ScenarioComponents are applicable.""" for comp in self._components: if not comp.is_applicable(): return False return True def setup(self): """ Set up the scenario, returning True on success. If setup fails at some point, tear down those components that did successfully set up. """ self._cm.prepare() self.audit() # Also detects remote/local log config self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"]) self.audit() self._cm.install_support() self._bad_news = LogWatcher(self._cm.env["LogFileName"], self._cm.templates.get_patterns("BadNews"), self._cm.env["nodes"], self._cm.env["log_kind"], "BadNews", 0) self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit j = 0 while j < len(self._components): if not self._components[j].setup(): # OOPS! We failed. Tear partial setups down. self.audit() self._cm.log("Tearing down partial setup") self.teardown(j) return False j += 1 self.audit() return True def teardown(self, n_components=None): """ Tear down the scenario in the reverse order it was set up. If n_components is not None, only tear down that many components. """ if not n_components: n_components = len(self._components) - 1 j = n_components while j >= 0: self._components[j].teardown() j -= 1 self.audit() self._cm.install_support("uninstall") def incr(self, name): """Increment the given stats key.""" if name not in self.stats: self.stats[name] = 0 self.stats[name] += 1 def run(self, iterations): - """Run all tests in the scenario the given number of times.""" - self._cm.oprofile_start() - - try: - self._run_loop(iterations) - self._cm.oprofile_stop() - except: # noqa: E722 - self._cm.oprofile_stop() - raise - - def _run_loop(self, iterations): """Run all the tests the given number of times.""" raise NotImplementedError def run_test(self, test, testcount): """ Run the given test. testcount is the number of tests (including this one) that have been run across all iterations. """ nodechoice = self._cm.env.random_node() ret = True did_run = False self._cm.clear_instance_errors_to_ignore() self._cm.log(f"Running test {test.name:<22} {f'({nodechoice})':<15} [{testcount:>3}]") starttime = test.set_timer() if not test.setup(nodechoice): self._cm.log("Setup failed") ret = False else: did_run = True ret = test(nodechoice) if not test.teardown(nodechoice): self._cm.log("Teardown failed") if not should_continue(self._cm.env): raise ValueError(f"Teardown of {test.name} on {nodechoice} failed") ret = False stoptime = time.time() - self._cm.oprofile_save(testcount) elapsed_time = stoptime - starttime test_time = stoptime - test.get_timer() if "min_time" not in test.stats: test.stats["elapsed_time"] = elapsed_time test.stats["min_time"] = test_time test.stats["max_time"] = test_time else: test.stats["elapsed_time"] += elapsed_time if test_time < test.stats["min_time"]: test.stats["min_time"] = test_time if test_time > test.stats["max_time"]: test.stats["max_time"] = test_time if ret: self.incr("success") test.log_timer() else: self.incr("failure") self._cm.statall() did_run = True # Force the test count to be incremented anyway so test extraction works self.audit(test.errors_to_ignore) return did_run def summarize(self): """Output scenario results.""" self._cm.log("****************") self._cm.log("Overall Results:%r" % self.stats) self._cm.log("****************") stat_filter = { "calls": 0, "failure": 0, "skipped": 0, "auditfail": 0, } self._cm.log("Test Summary") for test in self.tests: for key in stat_filter: stat_filter[key] = test.stats[key] self._cm.log(f"{f'Test {test.name}':<25} {stat_filter!r}") self._cm.debug("Detailed Results") for test in self.tests: self._cm.debug(f"{f'Test {test.name}: ':<25} {test.stats!r}") self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") def audit(self, local_ignore=None): """ Perform all scenario audits and log results. If there are too many failures, prompt the user to confirm that the scenario should continue running. """ errcount = 0 ignorelist = ["CTS:"] if local_ignore: ignorelist.extend(local_ignore) ignorelist.extend(self._cm.errors_to_ignore) ignorelist.extend(self._cm.instance_errors_to_ignore) # This makes sure everything is stabilized before starting... failed = 0 for audit in self._audits: if not audit(): self._cm.log(f"Audit {audit.name} FAILED.") failed += 1 else: self._cm.debug(f"Audit {audit.name} passed.") while errcount < 1000: match = None if self._bad_news: match = self._bad_news.look(0) if match: add_err = True for ignore in ignorelist: if add_err and re.search(ignore, match): add_err = False if add_err: self._cm.log(f"BadNews: {match}") self.incr("BadNews") errcount += 1 else: break else: print("Big problems") if not should_continue(self._cm.env): self._cm.log("Shutting down.") self.summarize() self.teardown() raise ValueError("Looks like we hit a BadNews jackpot!") if self._bad_news: self._bad_news.end() return failed class AllOnce(Scenario): """Every Test Once.""" - def _run_loop(self, iterations): + def run(self, iterations): testcount = 1 for test in self.tests: self.run_test(test, testcount) testcount += 1 class RandomTests(Scenario): """Random Test Execution.""" - def _run_loop(self, iterations): + def run(self, iterations): testcount = 1 while testcount <= iterations: test = self._cm.env.random_gen.choice(self.tests) self.run_test(test, testcount) testcount += 1 class Sequence(Scenario): """Named Tests in Sequence.""" - def _run_loop(self, iterations): + def run(self, iterations): testcount = 1 while testcount <= iterations: for test in self.tests: self.run_test(test, testcount) testcount += 1 class Boot(Scenario): """Start the Cluster.""" - def _run_loop(self, iterations): + def run(self, iterations): return class BootCluster(ScenarioComponent): """ Start the cluster manager on all nodes. Wait for each to come up before starting in order to account for the possibility that a given node might have been rebooted or crashed beforehand. """ def is_applicable(self): """Return whether this scenario is applicable.""" return True def setup(self): """Set up the component, returning True on success.""" self._cm.prepare() # Clear out the cobwebs ;-) self._cm.stopall(verbose=True, force=True) # Now start the Cluster Manager on all the nodes. self._cm.log("Starting Cluster Manager on all nodes.") return self._cm.startall(verbose=True, quick=True) def teardown(self): """Tear down the component.""" self._cm.log("Stopping Cluster Manager on all nodes") self._cm.stopall(verbose=True, force=False) class LeaveBooted(BootCluster): """Leave all nodes up when the scenario is complete.""" def teardown(self): """Tear down the component.""" self._cm.log("Leaving Cluster running on all nodes")