diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py index aaa28ce1a5..7dd59efd8e 100644 --- a/python/pacemaker/_cts/environment.py +++ b/python/pacemaker/_cts/environment.py @@ -1,440 +1,440 @@ """Test environment classes for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["EnvFactory", "set_cts_path"] __copyright__ = "Copyright 2014-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import argparse from contextlib import suppress from glob import glob import os import random import shlex import socket import sys from pacemaker.buildoptions import BuildOptions from pacemaker._cts.logging import LogFactory from pacemaker._cts.remote import RemoteFactory from pacemaker._cts.watcher import LogKind class Environment: """ A class for managing the CTS environment. This consists largely of processing and storing command line parameters. """ # pylint doesn't understand that self._rsh is callable (it stores the # singleton instance of RemoteExec, as returned by the getInstance method # of RemoteFactory). # @TODO See if type annotations fix this. # I think we could also fix this by getting rid of the getInstance methods, # but that's a project for another day. For now, just disable the warning. # pylint: disable=not-callable def __init__(self, args): """ Create a new Environment instance. This class can be treated kind of like a dictionary due to the presence of typical dict functions like __contains__, __getitem__, and __setitem__. However, it is not a dictionary so do not rely on standard dictionary behavior. Arguments: args -- A list of command line parameters, minus the program name. If None, sys.argv will be used. """ self.data = {} # Set some defaults before processing command line arguments. These are # either not set by any command line parameter, or they need a default # that can't be set in add_argument. self["dead_time"] = 300 - self["StableTime"] = 30 - self["tests"] = [] self["log_kind"] = None self["scenario"] = "random" + self["stable_time"] = 30 self["start_time"] = 300 self["syslog_facility"] = "daemon" + self["tests"] = [] # Hard-coded since there is only one supported cluster manager/stack self["Name"] = "crm-corosync" self["Stack"] = "corosync 2+" self.random_gen = random.Random() self._logger = LogFactory() self._rsh = RemoteFactory().getInstance() self._parse_args(args) if not self["ListTests"]: self._validate() self._discover() def dump(self): """Print the current environment.""" for key in sorted(self.data.keys()): self._logger.debug(f"{f'Environment[{key}]':35}: {str(self[key])}") def __contains__(self, key): """Return True if the given key exists in the environment.""" return key in self.data def __getitem__(self, key): """Return the given environment key, or None if it does not exist.""" return self.data.get(key) def __setitem__(self, key, value): """Set the given environment key to the given value, overriding any previous value.""" if key == "nodes": self.data["nodes"] = [] for node in value: node = node.strip() # I don't think I need the IP address, etc. but this validates # the node name against /etc/hosts and/or DNS, so it's a # GoodThing(tm). try: # @TODO This only handles IPv4, use getaddrinfo() instead # (here and in _discover()) socket.gethostbyname_ex(node) self.data["nodes"].append(node) except socket.herror: self._logger.log(f"{node} not found in DNS... aborting") raise else: self.data[key] = value def random_node(self): """Choose a random node from the cluster.""" return self.random_gen.choice(self["nodes"]) def _detect_systemd(self, node): """Detect whether systemd is in use on the target node.""" if "have_systemd" not in self.data: (rc, _) = self._rsh(node, "systemctl list-units", verbose=0) self["have_systemd"] = rc == 0 def _detect_syslog(self, node): """Detect the syslog variant in use on the target node (if any).""" if "syslogd" in self.data: return if self["have_systemd"]: # Systemd (_, lines) = self._rsh(node, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1) else: # SYS-V (_, lines) = self._rsh(node, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1) with suppress(IndexError): self["syslogd"] = lines[0].strip() def disable_service(self, node, service): """Disable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl disable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} off") return rc def enable_service(self, node, service): """Enable the given service on the given node.""" if self["have_systemd"]: # Systemd (rc, _) = self._rsh(node, f"systemctl enable {service}") return rc # SYS-V (rc, _) = self._rsh(node, f"chkconfig {service} on") return rc def service_is_enabled(self, node, service): """Return True if the given service is enabled on the given node.""" if self["have_systemd"]: # Systemd # With "systemctl is-enabled", we should check if the service is # explicitly "enabled" instead of the return code. For example it returns # 0 if the service is "static" or "indirect", but they don't really count # as "enabled". (rc, _) = self._rsh(node, f"systemctl is-enabled {service} | grep enabled") return rc == 0 # SYS-V (rc, _) = self._rsh(node, f"chkconfig --list | grep -e {service}.*on") return rc == 0 def _detect_at_boot(self, node): """Detect if the cluster starts at boot.""" self["at-boot"] = any(self.service_is_enabled(node, service) for service in ("pacemaker", "corosync")) def _detect_ip_offset(self, node): """Detect the offset for IPaddr resources.""" if self["create_resources"] and "IPBase" not in self.data: (_, lines) = self._rsh(node, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0) network = lines[0].strip() (_, lines) = self._rsh(node, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0) try: self["IPBase"] = lines[0].strip() except (IndexError, TypeError): self["IPBase"] = None if not self["IPBase"]: self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.") self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") return last_part = self["IPBase"].split('.')[3] if int(last_part) >= 240: self._logger.log(f"Could not determine an offset for IPaddr resources. Upper bound is too high: {self['IPBase']} {last_part}") self["IPBase"] = " fe80::1234:56:7890:1000" self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""") def _validate(self): """Check that we were given all required command line parameters.""" if not self["nodes"]: raise ValueError("No nodes specified!") def _discover(self): """Probe cluster nodes to figure out how to log and manage services.""" exerciser = socket.gethostname() # Use the IP where possible to avoid name lookup failures for ip in socket.gethostbyname_ex(exerciser)[2]: if ip != "127.0.0.1": exerciser = ip break self["cts-exerciser"] = exerciser node = self["nodes"][0] self._detect_systemd(node) self._detect_syslog(node) self._detect_at_boot(node) self._detect_ip_offset(node) def _parse_args(self, argv): """ Parse and validate command line parameters. Set the appropriate values in the environment dictionary. If argv is None, use sys.argv instead. """ if not argv: argv = sys.argv[1:] parser = argparse.ArgumentParser() grp1 = parser.add_argument_group("Common options") grp1.add_argument("--benchmark", action="store_true", help="Add timing information") grp1.add_argument("--list", "--list-tests", action="store_true", dest="list_tests", help="List the valid tests") grp1.add_argument("--nodes", default="", metavar="NODES", help="List of cluster nodes separated by whitespace") grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly") grp2.add_argument("-L", "--logfile", metavar="PATH", help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)") grp2.add_argument("--ip", "--test-ip-base", metavar="IP", help="Offset for generated IP address resources") grp3 = parser.add_argument_group("Options for release testing") grp3.add_argument("-r", "--populate-resources", action="store_true", help="Generate a sample configuration") grp3.add_argument("--choose", metavar="NAME", help="Run only the named tests, separated by whitespace") grp3.add_argument("--disable-fencing", action="store_false", dest="fencing_enabled", help="Whether to disable fencing") grp3.add_argument("--fencing-agent", metavar="AGENT", default="external/ssh", help="Agent to use for a fencing resource") grp3.add_argument("--fencing-params", metavar="PARAMS", help="Parameters for the fencing resource (as NAME=VALUE), separated by whitespace") grp3.add_argument("--once", action="store_true", help="Run all valid tests once") grp4 = parser.add_argument_group("Additional (less common) options") grp4.add_argument("-c", "--clobber-cib", action="store_true", help="Erase any existing configuration") grp4.add_argument("-y", "--yes", action="store_true", dest="always_continue", help="Continue to run whenever prompted") grp4.add_argument("--boot", action="store_true", help="") grp4.add_argument("--cib-filename", metavar="PATH", help="Install the given CIB file to the cluster") grp4.add_argument("--no-unsafe-tests", action="store_true", help="Don't run tests that are unsafe for use with ocfs2/drbd") grp4.add_argument("--notification-agent", metavar="PATH", default="/var/lib/pacemaker/notify.sh", help="Script to configure for Pacemaker alerts") grp4.add_argument("--notification-recipient", metavar="R", default="/var/lib/pacemaker/notify.log", help="Recipient to pass to alert script") grp4.add_argument("--outputfile", metavar="PATH", help="Location to write logs to") grp4.add_argument("--schema", metavar="SCHEMA", default=f"pacemaker-{BuildOptions.CIB_SCHEMA_VERSION}", help="Create a CIB conforming to the given schema") grp4.add_argument("--seed", metavar="SEED", help="Use the given string as the random number seed") grp4.add_argument("--trunc", action="store_true", dest="truncate", help="Truncate log file before starting") parser.add_argument("iterations", nargs='?', type=int, default=1, help="Number of tests to run") args = parser.parse_args(args=argv) # Set values on this object based on what happened with command line # processing. This has to be done in several blocks. # These values can always be set. Most get a default from the add_argument # calls, they only do one thing, and they do not have any side effects. self["CIBfilename"] = args.cib_filename if args.cib_filename else None self["create_resources"] = args.ip or args.populate_resources self["fencing_agent"] = args.fencing_agent self["fencing_enabled"] = args.fencing_enabled self["fencing_params"] = shlex.split(args.fencing_params) self["ListTests"] = args.list_tests self["overwrite_cib"] = any([args.clobber_cib, args.ip, args.populate_resources]) self["Schema"] = args.schema self["TruncateLog"] = args.truncate self["benchmark"] = args.benchmark self["continue"] = args.always_continue self["iterations"] = args.iterations self["nodes"] = shlex.split(args.nodes) self["notification-agent"] = args.notification_agent self["notification-recipient"] = args.notification_recipient self["unsafe-tests"] = not args.no_unsafe_tests # Everything else either can't have a default set in an add_argument # call (likely because we don't want to always have a value set for it) # or it does something fancier than just set a single value. However, # order does not matter for these as long as the user doesn't provide # conflicting arguments on the command line. So just do Everything # alphabetically. if args.boot: self["scenario"] = "boot" if args.choose: self["scenario"] = "sequence" self["tests"].extend(shlex.split(args.choose)) self["iterations"] = len(self["tests"]) if args.ip: self["IPBase"] = args.ip if args.logfile == "journal": self["LogAuditDisabled"] = True self["log_kind"] = LogKind.JOURNAL elif args.logfile: self["LogAuditDisabled"] = True self["LogFileName"] = args.logfile self["log_kind"] = LogKind.REMOTE_FILE else: # We can't set this as the default on the parser.add_argument call # for this option because then args.logfile will be set, which means # the above branch will be taken and those other values will also be # set. self["LogFileName"] = "/var/log/messages" if args.once: self["scenario"] = "all-once" if args.outputfile: self["OutputFile"] = args.outputfile LogFactory().add_file(self["OutputFile"]) self.random_gen.seed(args.seed) class EnvFactory: """A class for constructing a singleton instance of an Environment object.""" instance = None # pylint: disable=invalid-name def getInstance(self, args=None): """ Return the previously created instance of Environment. If no instance exists, create a new instance and return that. """ if not EnvFactory.instance: EnvFactory.instance = Environment(args) return EnvFactory.instance def set_cts_path(extra=None): """Set the PATH environment variable appropriately for the tests.""" new_path = os.environ['PATH'] # Add any search paths given on the command line if extra is not None: for p in extra: new_path = f"{p}:{new_path}" cwd = os.getcwd() if os.path.exists(f"{cwd}/cts/cts-attrd.in"): # pylint: disable=protected-access print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR}") for d in glob(f"{BuildOptions._BUILD_DIR}/daemons/*/"): new_path = f"{d}:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}" print(f"Using local schemas from: {cwd}/xml") os.environ["PCMK_schema_directory"] = f"{cwd}/xml" else: print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {cwd})") new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}" os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR print(f'Using PATH="{new_path}"') os.environ['PATH'] = new_path diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py index d3e4c2ff66..7135328488 100644 --- a/python/pacemaker/_cts/tests/componentfail.py +++ b/python/pacemaker/_cts/tests/componentfail.py @@ -1,161 +1,161 @@ """Kill a pacemaker daemon and test how the cluster recovers.""" __all__ = ["ComponentFail"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re from pacemaker._cts.audits import AuditResource from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object # @TODO Separate this into a separate test for each component, so the patterns # can be made specific to each component, investigating failures is a little # easier, and specific testing can be done for each component (for example, # set attributes before and after killing pacemaker-attrd and check values). class ComponentFail(CTSTest): """Kill a random pacemaker daemon and wait for the cluster to recover.""" def __init__(self, cm): """ Create a new ComponentFail instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.is_unsafe = True self.name = "ComponentFail" self._complist = cm.components self._okerrpatterns = [] self._patterns = [] self._startall = SimulStartLite(cm) def __call__(self, node): """Perform this test.""" self.incr("calls") self._patterns = [] self._okerrpatterns = [] # start all nodes ret = self._startall(None) if not ret: return self.failure("Setup failed") - if not self._cm.cluster_stable(self._env["StableTime"]): + if not self._cm.cluster_stable(self._env["stable_time"]): return self.failure("Setup failed - unstable") # select a component to kill chosen = self._env.random_gen.choice(self._complist) node_is_dc = self._cm.is_node_dc(node, None) self.debug(f"...component {chosen.name} (dc={node_is_dc})") self.incr(chosen.name) if chosen.name != "corosync": self._patterns.extend([ self._cm.templates["Pat:ChildKilled"] % (node, chosen.name), self._cm.templates["Pat:ChildRespawn"] % (node, chosen.name), ]) self._patterns.extend(chosen.pats) # @TODO this should be a flag in the Component if chosen.name in ["corosync", "pacemaker-based", "pacemaker-fenced"]: # Ignore actions for fence devices if fencer will respawn # (their registration will be lost, and probes will fail) self._okerrpatterns = [ self._cm.templates["Pat:Fencing_active"], ] (_, lines) = self._rsh(node, "crm_resource -c", verbose=1) for line in lines: if re.search("^Resource", line): r = AuditResource(self._cm, line) if r.rclass == "stonith": self._okerrpatterns.extend([ self._cm.templates["Pat:Fencing_recover"] % r.id, self._cm.templates["Pat:Fencing_probe"] % r.id, ]) # supply a copy so self.patterns doesn't end up empty tmp_pats = self._patterns.copy() self._patterns.extend(chosen.badnews_ignore) # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status stonith_pats = [ self._cm.templates["Pat:Fencing_ok"] % node ] stonith = self.create_watch(stonith_pats, 0) stonith.set_watch() # set the watch for stable watch = self.create_watch( - tmp_pats, self._env["dead_time"] + self._env["StableTime"] + self._env["start_time"]) + tmp_pats, self._env["dead_time"] + self._env["stable_time"] + self._env["start_time"]) watch.set_watch() # kill the component chosen.kill(node) self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for any fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") self._cm.cluster_stable(self._env["start_time"]) self.debug(f"Checking if {node} was shot") shot = stonith.look(60) if shot: self.debug(f"Found: {shot!r}") self._okerrpatterns.append(self._cm.templates["Pat:Fencing_start"] % node) if not self._env["at-boot"]: self._cm.expected_status[node] = "down" # If fencing occurred, chances are many (if not all) the expected logs # will not be sent - or will be lost when the node reboots return self.success() # check for logs indicating a graceful recovery matched = watch.look_for_all(allow_multiple_matches=True) if watch.unmatched: self._logger.log(f"Patterns not found: {watch.unmatched!r}") self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["start_time"]) if not matched: return self.failure(f"Didn't find all expected {chosen.name} patterns") if not is_stable: return self.failure(f"Cluster did not become stable after killing {chosen.name}") return self.success() @property def errors_to_ignore(self): """Return a list of errors which should be ignored.""" # Note that okerrpatterns refers to the last time we ran this test # The good news is that this works fine for us... self._okerrpatterns.extend(self._patterns) return self._okerrpatterns diff --git a/python/pacemaker/_cts/tests/fliptest.py b/python/pacemaker/_cts/tests/fliptest.py index 2a87265d7a..4579fea55a 100644 --- a/python/pacemaker/_cts/tests/fliptest.py +++ b/python/pacemaker/_cts/tests/fliptest.py @@ -1,59 +1,59 @@ """Stop running nodes, and start stopped nodes.""" __all__ = ["FlipTest"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import time from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.starttest import StartTest from pacemaker._cts.tests.stoptest import StopTest # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class FlipTest(CTSTest): """Stop running nodes and start stopped nodes.""" def __init__(self, cm): """ Create a new FlipTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Flip" self._start = StartTest(cm) self._stop = StopTest(cm) def __call__(self, node): """Perform this test.""" self.incr("calls") if self._cm.expected_status[node] == "up": self.incr("stopped") ret = self._stop(node) kind = "up->down" # Give the cluster time to recognize it's gone... - time.sleep(self._env["StableTime"]) + time.sleep(self._env["stable_time"]) elif self._cm.expected_status[node] == "down": self.incr("started") ret = self._start(node) kind = "down->up" else: return self.skipped() self.incr(kind) if ret: return self.success() return self.failure(f"{kind} failure") diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py index e294e63ed3..facc0133d5 100644 --- a/python/pacemaker/_cts/tests/stonithdtest.py +++ b/python/pacemaker/_cts/tests/stonithdtest.py @@ -1,134 +1,134 @@ """Fence a running node and wait for it to restart.""" __all__ = ["StonithdTest"] __copyright__ = "Copyright 2000-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" from pacemaker.exitstatus import ExitStatus from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object class StonithdTest(CTSTest): """Fence a running node and wait for it to restart.""" def __init__(self, cm): """ Create a new StonithdTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.benchmark = True self.name = "Stonithd" self._startall = SimulStartLite(cm) def __call__(self, node): """Perform this test.""" self.incr("calls") if len(self._env["nodes"]) < 2: return self.skipped() ret = self._startall(None) if not ret: return self.failure("Setup failed") watchpats = [ self._cm.templates["Pat:Fencing_ok"] % node, self._cm.templates["Pat:NodeFenced"] % node, ] if not self._env["at-boot"]: self.debug(f"Expecting {node} to stay down") self._cm.expected_status[node] = "down" else: self.debug(f"Expecting {node} to come up again {self._env['at-boot']}") watchpats.extend([ f"{node}.* S_STARTING -> S_PENDING", f"{node}.* S_PENDING -> S_NOT_DC", ]) watch = self.create_watch(watchpats, - 30 + self._env["dead_time"] + self._env["StableTime"] + self._env["start_time"]) + 30 + self._env["dead_time"] + self._env["stable_time"] + self._env["start_time"]) watch.set_watch() origin = self._env.random_gen.choice(self._env["nodes"]) (rc, _) = self._rsh(origin, f"stonith_admin --reboot {node} -VVVVVV") if rc == ExitStatus.TIMEOUT: # Look for the patterns, usually this means the required # device was running on the node to be fenced - or that # the required devices were in the process of being loaded # and/or moved # # Effectively the node committed suicide so there will be # no confirmation, but pacemaker should be watching and # fence the node again self._logger.log(f"Fencing command on {origin} to fence {node} timed out") elif origin != node and rc != 0: self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self._logger.log(f"Fencing command on {origin} failed to fence {node} (rc={rc})") elif origin == node and rc != 255: # 255 == broken pipe, ie. the node was fenced as expected self._logger.log(f"Locally originated fencing returned {rc}") with Timer(self._logger, self.name, "fence"): matched = watch.look_for_all() self.set_timer("reform") if watch.unmatched: self._logger.log(f"Patterns not found: {watch.unmatched!r}") self.debug("Waiting for the cluster to recover") self._cm.cluster_stable() self.debug("Waiting for fenced node to come back up") self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600) self.debug("Waiting for the cluster to re-stabilize with all nodes") is_stable = self._cm.cluster_stable(self._env["start_time"]) if not matched: return self.failure("Didn't find all expected patterns") if not is_stable: return self.failure("Cluster did not become stable") self.log_timer("reform") return self.success() @property def errors_to_ignore(self): """Return a list of errors which should be ignored.""" return [ self._cm.templates["Pat:Fencing_start"] % ".*", self._cm.templates["Pat:Fencing_ok"] % ".*", self._cm.templates["Pat:Fencing_active"], r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired" ] def is_applicable(self): """Return True if this test is applicable in the current test configuration.""" return self._env["fencing_enabled"] and CTSTest.is_applicable(self)