diff --git a/cts/cts-attrd.in b/cts/cts-attrd.in
index 88e7964cf4..94eef777a3 100644
--- a/cts/cts-attrd.in
+++ b/cts/cts-attrd.in
@@ -1,414 +1,414 @@
#!@PYTHON@
"""Regression tests for Pacemaker's attribute daemon."""
# pylint doesn't like the module name "cts-attrd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2023-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import subprocess
import sys
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
TEST_DIR = sys.path[0]
def update_path():
"""Set the PATH environment variable appropriately for the tests."""
new_path = os.environ['PATH']
if os.path.exists(f"{TEST_DIR}/cts-attrd.in"):
# pylint: disable=protected-access
print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR} ({TEST_DIR})")
# For pacemaker-attrd
new_path = f"{BuildOptions._BUILD_DIR}/daemons/attrd:{new_path}"
else:
print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {TEST_DIR})")
# For pacemaker-attrd
new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
print(f'Using PATH="{new_path}"')
os.environ['PATH'] = new_path
class AttributeTest(Test):
"""Executor for a single test."""
def __init__(self, name, description, **kwargs):
"""
Create a new AttributeTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
"""
Test.__init__(self, name, description, **kwargs)
self._daemon_location = "pacemaker-attrd"
self._enable_corosync = True
def _kill_daemons(self):
killall([self._daemon_location])
def _start_daemons(self):
if self.verbose:
print(f"Starting {self._daemon_location}")
cmd = [self._daemon_location, "-s", "-l", self.logpath]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
class AttributeTests(Tests):
"""Collection of all attribute regression tests."""
def __init__(self, **kwargs):
"""Create a new AttributeTests instance."""
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-attrd")
def new_test(self, name, description):
"""Create a named test."""
test = AttributeTest(name, description, verbose=self.verbose, logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self, use_corosync):
"""Prepare the host before executing any tests."""
if use_corosync:
self._corosync.start(kill_first=True)
def cleanup_environment(self, use_corosync):
"""Clean up the host after executing desired tests."""
if use_corosync:
self._corosync.stop()
def build_basic_tests(self):
"""Add basic tests - setting, querying, updating, and deleting attributes."""
test = self.new_test("set_attr_1",
"Set and query an attribute")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="111"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="111"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
# Setting the delay on an attribute that doesn't exist fails, but the failure is
# not passed back to attrd_updater.
test = self.new_test("set_attr_2",
"Set an attribute's delay")
test.add_cmd("attrd_updater", args="--name AAA -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Processed update-delay request from client .*: Error \(Attribute AAA does not exist\)",
regex=True)
test = self.new_test("set_attr_3",
"Set and query an attribute's delay and value")
test.add_cmd("attrd_updater", args="--name AAA -B 111 -d 5 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="111"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="111"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111 \| from .* with 5s write delay",
regex=True)
test = self.new_test("set_attr_4",
"Update an attribute that does not exist with a delay")
test.add_cmd("attrd_updater", args="--name BBB -U 999 -d 10 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="999"')
test.add_cmd("attrd_updater", args="--name BBB -Q",
stdout_match='name="BBB" host="[^"]+" value="999"',
validate=False)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 999 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_1",
"Update an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 333 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="333"')
test.add_cmd("attrd_updater", args="--name BBB -Q",
stdout_match='name="BBB" host="[^"]+" value="333"',
validate=False)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("update_attr_2",
"Update an attribute using a delay other than its default")
test.add_cmd("attrd_updater", args="--name BBB -U 777 -d 10 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 888 -d 7 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 777 -> 888 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_delay_1",
"Update the delay of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test = self.new_test("update_attr_delay_2",
"Update the delay and value of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -B 333 -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("missing_attr_1",
"Query an attribute that does not exist")
test.add_cmd("attrd_updater", args="--name NOSUCH --output-as=xml",
expected_exitcode=ExitStatus.CONFIG)
test = self.new_test("delete_attr_1",
"Delete an existing attribute")
test.add_cmd("attrd_updater", args="--name CCC -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name CCC -D --output-as=xml")
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
test = self.new_test("missing_attr_2",
"Delete an attribute that does not exist")
test.add_cmd("attrd_updater", args="--name NOSUCH2 -D --output-as=xml")
test = self.new_test("attr_in_set_1",
"Set and query an attribute in a specific set")
test.add_cmd("attrd_updater", args="--name DDD -U 555 --set=foo --output-as=xml")
test.add_cmd("attrd_updater", args="--name DDD -Q --output-as=xml",
stdout_match='name="DDD" value="555"')
test.add_cmd("attrd_updater", args="--name DDD -Q",
stdout_match='name="DDD" host="[^"]+" value="555"',
validate=False)
test.add_log_pattern("Processed 1 private change for DDD (set foo)")
def build_multiple_query_tests(self):
"""Add tests that set and query an attribute across multiple nodes."""
# NOTE: These tests make use of the fact that nothing in attrd actually
# cares about whether a node exists when you set or query an attribute.
# It just keeps creating new hash tables for each node you ask it about.
test = self.new_test("multi_query_1",
"Query an attribute set across multiple nodes")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --node cluster1 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -U 222 --node cluster2 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
stdout_match=r'\n.*')
test.add_cmd("attrd_updater", args="--name AAA -QA",
stdout_match='name="AAA" host="cluster1" value="111"\nname="AAA" host="cluster2" value="222"',
validate=False)
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1 --output-as=xml",
stdout_match='')
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1",
stdout_match='name="AAA" host="cluster1" value="111"',
validate=False)
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
stdout_match='')
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2",
stdout_match='name="AAA" host="cluster2" value="222"',
validate=False)
test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
stdout_match=r'\n.*',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -QA",
stdout_match='name="AAA" host="cluster1" value="111"\nname="AAA" host="cluster2" value="222"',
validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="cluster1" value="111"',
validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
stdout_match='',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2",
stdout_match='name="AAA" host="cluster2" value="222"',
validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
def build_regex_tests(self):
"""Add tests that use regexes."""
test = self.new_test("regex_update_1",
"Update attributes using a regex")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd("attrd_updater", args="--name ABB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'A.*' -U 333 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="333"')
test.add_cmd("attrd_updater", args="--name ABB -Q --output-as=xml",
stdout_match='name="ABB" value="333"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="333"',
validate=False)
test.add_cmd("attrd_updater", args="--name ABB -Q",
stdout_match='name="ABB" host="[^"]+" value="333"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: 111 -> 333",
regex=True)
test = self.new_test("regex_delete_1",
"Delete attributes using a regex")
test.add_cmd("attrd_updater", args="--name XAX -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name XBX -U 555 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'X[A|B]X' -D --output-as=xml")
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: \(unset\) -> 555",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: 555 -> \(unset\)",
regex=True)
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
def build_utilization_tests(self):
"""Add tests that involve utilization attributes."""
test = self.new_test("utilization_1",
"Set and query a utilization attribute")
test.add_cmd("attrd_updater", args="--name AAA -U ABC -z --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="ABC"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="ABC"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in utilization: \(unset\) -> ABC",
regex=True)
def build_sync_point_tests(self):
"""Add tests that involve sync points."""
test = self.new_test("local_sync_point",
"Wait for a local sync point")
test.add_cmd("attrd_updater", args="--name AAA -U 123 --wait=local --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="123"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="123"',
validate=False)
test.add_log_pattern(r"Alerting client .* for reached local sync point",
regex=True)
test = self.new_test("cluster_sync_point",
"Wait for a cluster-wide sync point")
test.add_cmd("attrd_updater", args="--name BBB -U 456 --wait=cluster --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="456"')
test.add_cmd("attrd_updater", args="--name BBB -Q",
stdout_match='name="BBB" host="[^"]+" value="456"',
validate=False)
test.add_log_pattern(r"Alerting client .* for reached cluster sync point",
regex=True)
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-attrd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
- "\t " + sys.argv[0] + " --run-only start_stop\n\n"
+ f"\t {sys.argv[0]} --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
- "\t " + sys.argv[0] + " --run-only-pattern systemd")
+ f"\t {sys.argv[0]} --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run attrd regression tests as specified by arguments."""
update_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
exit_if_proc_running("pacemaker-attrd")
# Create a temporary directory for log files (the directory and its
# contents will automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-attrd-") as logdir:
tests = AttributeTests(verbose=opts.verbose, logdir=logdir)
tests.build_basic_tests()
tests.build_multiple_query_tests()
tests.build_regex_tests()
tests.build_utilization_tests()
tests.build_sync_point_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
try:
tests.setup_environment(True)
except TimeoutError:
print("corosync did not start in time, exiting")
sys.exit(ExitStatus.TIMEOUT)
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(True)
tests.exit()
if __name__ == "__main__":
main()
diff --git a/cts/cts-exec.in b/cts/cts-exec.in
index dcc3c37f6d..a0e7116609 100644
--- a/cts/cts-exec.in
+++ b/cts/cts-exec.in
@@ -1,930 +1,929 @@
#!@PYTHON@
"""Regression tests for Pacemaker's pacemaker-execd."""
# pylint doesn't like the module name "cts-execd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2012-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import stat
import sys
import subprocess
import shutil
import tempfile
# Where to find test binaries
# Prefer the source tree if available
TEST_DIR = sys.path[0]
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
from pacemaker._cts.process import killall, exit_if_proc_running, stdout_from_command
from pacemaker._cts.test import Test, Tests
# File permissions for executable scripts we create
EXECMODE = stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH
def update_path():
# pylint: disable=protected-access
"""Set the PATH environment variable appropriately for the tests."""
new_path = os.environ['PATH']
if os.path.exists(f"{TEST_DIR}/cts-exec.in"):
print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR} ({TEST_DIR})")
# For pacemaker-execd, cts-exec-helper, and pacemaker-remoted
new_path = f"{BuildOptions._BUILD_DIR}/daemons/execd:{new_path}"
new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" # For crm_resource
# For pacemaker-fenced
new_path = f"{BuildOptions._BUILD_DIR}/daemons/fenced:{new_path}"
# For cts-support
new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}"
else:
print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {TEST_DIR})")
# For cts-exec-helper, cts-support, pacemaker-execd, pacemaker-fenced,
# and pacemaker-remoted
new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
print(f'Using PATH="{new_path}"')
os.environ['PATH'] = new_path
class ExecTest(Test):
"""Executor for a single pacemaker-execd regression test."""
def __init__(self, name, description, **kwargs):
"""Create a new ExecTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
Keyword arguments:
tls -- Enable pacemaker-remoted.
"""
Test.__init__(self, name, description, **kwargs)
self.tls = kwargs.get("tls", False)
# If we are going to run the stonith resource tests, we will need to
# launch and track Corosync and pacemaker-fenced.
self._corosync = None
self._fencer = None
self._is_stonith_test = "stonith" in self.name
if self.tls:
self._daemon_location = "pacemaker-remoted"
else:
self._daemon_location = "pacemaker-execd"
if self._is_stonith_test:
self._corosync = Corosync(self.verbose, self.logdir, "cts-exec")
self._test_tool_location = "cts-exec-helper"
def _kill_daemons(self):
killall([
"corosync",
"pacemaker-fenced",
"lt-pacemaker-fenced",
"pacemaker-execd",
"lt-pacemaker-execd",
"cts-exec-helper",
"lt-cts-exec-helper",
"pacemaker-remoted",
])
def _start_daemons(self):
if self._corosync:
self._corosync.start(kill_first=True)
# pylint: disable=consider-using-with
self._fencer = subprocess.Popen(["pacemaker-fenced", "-s"])
cmd = [self._daemon_location, "-l", self.logpath]
if self.verbose:
cmd += ["-V"]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
def clean_environment(self):
"""Clean up the host after running a test."""
if self._daemon_process:
self._daemon_process.terminate()
self._daemon_process.wait()
if self.verbose:
print("Daemon Output Start")
with open(self.logpath, "rt", errors="replace", encoding="utf-8") as logfile:
for line in logfile:
print(line.strip())
print("Daemon Output End")
if self._corosync:
self._fencer.terminate()
self._fencer.wait()
self._corosync.stop()
self._daemon_process = None
self._fencer = None
self._corosync = None
def add_cmd(self, cmd=None, **kwargs):
"""Add a cts-exec-helper command to be executed as part of this test."""
if cmd is None:
cmd = self._test_tool_location
if cmd == self._test_tool_location:
if self.verbose:
kwargs["args"] += " -V "
if self.tls:
kwargs["args"] += " -S "
kwargs["validate"] = False
kwargs["check_rng"] = False
kwargs["check_stderr"] = False
Test.add_cmd(self, cmd, **kwargs)
def run(self):
"""Execute this test."""
if self.tls and self._is_stonith_test:
self._result_txt = f"SKIPPED - '{self.name}' - disabled when testing pacemaker_remote"
print(self._result_txt)
return
Test.run(self)
class ExecTests(Tests):
"""Collection of all pacemaker-execd regression tests."""
def __init__(self, **kwargs):
"""
Create a new ExecTests instance.
Keyword arguments:
tls -- Enable pacemaker-remoted.
"""
Tests.__init__(self, **kwargs)
self.tls = kwargs.get("tls", False)
self._action_timeout = " -t 9000 "
self._installed_files = []
self._rsc_classes = self._setup_rsc_classes()
print(f"Testing resource classes {self._rsc_classes!r}")
if "lsb" in self._rsc_classes:
service_agent = "LSBDummy"
elif "systemd" in self._rsc_classes:
service_agent = "pacemaker-cts-dummyd@3"
else:
service_agent = "unsupported"
self._common_cmds = {
- "ocf_reg_line": '-c register_rsc -r ocf_test_rsc ' + self._action_timeout + ' -C ocf -P pacemaker -T Dummy',
+ "ocf_reg_line": f'-c register_rsc -r ocf_test_rsc {self._action_timeout} -C ocf -P pacemaker -T Dummy',
"ocf_reg_event": '-l "NEW_EVENT event_type:register rsc_id:ocf_test_rsc action:none rc:ok op_status:Done"',
- "ocf_unreg_line": '-c unregister_rsc -r ocf_test_rsc ' + self._action_timeout,
+ "ocf_unreg_line": f'-c unregister_rsc -r ocf_test_rsc {self._action_timeout}',
"ocf_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:ocf_test_rsc action:none rc:ok op_status:Done"',
- "ocf_start_line": '-c exec -r ocf_test_rsc -a start ' + self._action_timeout,
+ "ocf_start_line": f'-c exec -r ocf_test_rsc -a start {self._action_timeout}',
"ocf_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:start rc:ok op_status:Done" ',
- "ocf_stop_line": '-c exec -r ocf_test_rsc -a stop ' + self._action_timeout,
+ "ocf_stop_line": f'-c exec -r ocf_test_rsc -a stop {self._action_timeout}',
"ocf_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:stop rc:ok op_status:Done" ',
- "ocf_monitor_line": '-c exec -r ocf_test_rsc -a monitor -i 2s ' + self._action_timeout,
- "ocf_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
- "ocf_cancel_line": '-c cancel -r ocf_test_rsc -a monitor -i 2s ' + self._action_timeout,
+ "ocf_monitor_line": f'-c exec -r ocf_test_rsc -a monitor -i 2s {self._action_timeout}',
+ "ocf_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
+ "ocf_cancel_line": f'-c cancel -r ocf_test_rsc -a monitor -i 2s {self._action_timeout}',
"ocf_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:Cancelled" ',
- "systemd_reg_line": '-c register_rsc -r systemd_test_rsc ' + self._action_timeout + ' -C systemd -T pacemaker-cts-dummyd@3',
+ "systemd_reg_line": f'-c register_rsc -r systemd_test_rsc {self._action_timeout} -C systemd -T pacemaker-cts-dummyd@3',
"systemd_reg_event": '-l "NEW_EVENT event_type:register rsc_id:systemd_test_rsc action:none rc:ok op_status:Done"',
- "systemd_unreg_line": '-c unregister_rsc -r systemd_test_rsc ' + self._action_timeout,
+ "systemd_unreg_line": f'-c unregister_rsc -r systemd_test_rsc {self._action_timeout}',
"systemd_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:systemd_test_rsc action:none rc:ok op_status:Done"',
- "systemd_start_line": '-c exec -r systemd_test_rsc -a start ' + self._action_timeout,
+ "systemd_start_line": f'-c exec -r systemd_test_rsc -a start {self._action_timeout}',
"systemd_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:start rc:ok op_status:Done" ',
- "systemd_stop_line": '-c exec -r systemd_test_rsc -a stop ' + self._action_timeout,
+ "systemd_stop_line": f'-c exec -r systemd_test_rsc -a stop {self._action_timeout}',
"systemd_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:stop rc:ok op_status:Done" ',
- "systemd_monitor_line": '-c exec -r systemd_test_rsc -a monitor -i 2s ' + self._action_timeout,
+ "systemd_monitor_line": f'-c exec -r systemd_test_rsc -a monitor -i 2s {self._action_timeout}',
"systemd_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:monitor rc:ok op_status:Done" -t 15000 ',
- "systemd_cancel_line": '-c cancel -r systemd_test_rsc -a monitor -i 2s ' + self._action_timeout,
+ "systemd_cancel_line": f'-c cancel -r systemd_test_rsc -a monitor -i 2s {self._action_timeout}',
"systemd_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"service_reg_line": f"-c register_rsc -r service_test_rsc {self._action_timeout} -C service -T {service_agent}",
"service_reg_event": '-l "NEW_EVENT event_type:register rsc_id:service_test_rsc action:none rc:ok op_status:Done"',
- "service_unreg_line": '-c unregister_rsc -r service_test_rsc ' + self._action_timeout,
+ "service_unreg_line": f'-c unregister_rsc -r service_test_rsc {self._action_timeout}',
"service_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:service_test_rsc action:none rc:ok op_status:Done"',
- "service_start_line": '-c exec -r service_test_rsc -a start ' + self._action_timeout,
+ "service_start_line": f'-c exec -r service_test_rsc -a start {self._action_timeout}',
"service_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:start rc:ok op_status:Done" ',
- "service_stop_line": '-c exec -r service_test_rsc -a stop ' + self._action_timeout,
+ "service_stop_line": f'-c exec -r service_test_rsc -a stop {self._action_timeout}',
"service_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:stop rc:ok op_status:Done" ',
- "service_monitor_line": '-c exec -r service_test_rsc -a monitor -i 2s ' + self._action_timeout,
- "service_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
- "service_cancel_line": '-c cancel -r service_test_rsc -a monitor -i 2s ' + self._action_timeout,
+ "service_monitor_line": f'-c exec -r service_test_rsc -a monitor -i 2s {self._action_timeout}',
+ "service_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
+ "service_cancel_line": f'-c cancel -r service_test_rsc -a monitor -i 2s {self._action_timeout}',
"service_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:Cancelled" ',
- "lsb_reg_line": '-c register_rsc -r lsb_test_rsc ' + self._action_timeout + ' -C lsb -T LSBDummy',
+ "lsb_reg_line": f'-c register_rsc -r lsb_test_rsc {self._action_timeout} -C lsb -T LSBDummy',
"lsb_reg_event": '-l "NEW_EVENT event_type:register rsc_id:lsb_test_rsc action:none rc:ok op_status:Done" ',
- "lsb_unreg_line": '-c unregister_rsc -r lsb_test_rsc ' + self._action_timeout,
+ "lsb_unreg_line": f'-c unregister_rsc -r lsb_test_rsc {self._action_timeout}',
"lsb_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:lsb_test_rsc action:none rc:ok op_status:Done"',
- "lsb_start_line": '-c exec -r lsb_test_rsc -a start ' + self._action_timeout,
+ "lsb_start_line": f'-c exec -r lsb_test_rsc -a start {self._action_timeout}',
"lsb_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:start rc:ok op_status:Done" ',
- "lsb_stop_line": '-c exec -r lsb_test_rsc -a stop ' + self._action_timeout,
+ "lsb_stop_line": f'-c exec -r lsb_test_rsc -a stop {self._action_timeout}',
"lsb_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:stop rc:ok op_status:Done" ',
- "lsb_monitor_line": '-c exec -r lsb_test_rsc -a status -i 2s ' + self._action_timeout,
- "lsb_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:Done" ' + self._action_timeout,
- "lsb_cancel_line": '-c cancel -r lsb_test_rsc -a status -i 2s ' + self._action_timeout,
+ "lsb_monitor_line": f'-c exec -r lsb_test_rsc -a status -i 2s {self._action_timeout}',
+ "lsb_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:Done" {self._action_timeout}',
+ "lsb_cancel_line": f'-c cancel -r lsb_test_rsc -a status -i 2s {self._action_timeout}',
"lsb_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:Cancelled" ',
- "stonith_reg_line": '-c register_rsc -r stonith_test_rsc ' + self._action_timeout + ' -C stonith -P pacemaker -T fence_dummy',
+ "stonith_reg_line": f'-c register_rsc -r stonith_test_rsc {self._action_timeout} -C stonith -P pacemaker -T fence_dummy',
"stonith_reg_event": '-l "NEW_EVENT event_type:register rsc_id:stonith_test_rsc action:none rc:ok op_status:Done" ',
- "stonith_unreg_line": '-c unregister_rsc -r stonith_test_rsc ' + self._action_timeout,
+ "stonith_unreg_line": f'-c unregister_rsc -r stonith_test_rsc {self._action_timeout}',
"stonith_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:stonith_test_rsc action:none rc:ok op_status:Done"',
- "stonith_start_line": '-c exec -r stonith_test_rsc -a start ' + self._action_timeout,
+ "stonith_start_line": f'-c exec -r stonith_test_rsc -a start {self._action_timeout}',
"stonith_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:start rc:ok op_status:Done" ',
- "stonith_stop_line": '-c exec -r stonith_test_rsc -a stop ' + self._action_timeout,
+ "stonith_stop_line": f'-c exec -r stonith_test_rsc -a stop {self._action_timeout}',
"stonith_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:stop rc:ok op_status:Done" ',
- "stonith_monitor_line": '-c exec -r stonith_test_rsc -a monitor -i 2s ' + self._action_timeout,
- "stonith_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
- "stonith_cancel_line": '-c cancel -r stonith_test_rsc -a monitor -i 2s ' + self._action_timeout,
+ "stonith_monitor_line": f'-c exec -r stonith_test_rsc -a monitor -i 2s {self._action_timeout}',
+ "stonith_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
+ "stonith_cancel_line": f'-c cancel -r stonith_test_rsc -a monitor -i 2s {self._action_timeout}',
"stonith_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Cancelled" ',
}
def _setup_rsc_classes(self):
"""Determine which resource classes are supported."""
classes = stdout_from_command(["crm_resource", "--list-standards"])
# Strip trailing empty line
classes = classes[:-1]
if self.tls:
classes.remove("stonith")
if "systemd" in classes:
try:
# This code doesn't need this import, but pacemaker-cts-dummyd
# does, so ensure the dependency is available rather than cause
# all systemd tests to fail.
# pylint: disable=import-outside-toplevel,unused-import
import systemd.daemon
except ImportError:
print("Python systemd bindings not found.")
print("The tests for systemd class are not going to be run.")
classes.remove("systemd")
return classes
def new_test(self, name, description):
"""Create a named test."""
test = ExecTest(name, description, verbose=self.verbose, tls=self.tls,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self):
"""Prepare the host before executing any tests."""
-
if BuildOptions.REMOTE_ENABLED:
# @TODO Use systemctl when available, and use the subprocess module
# with an argument array instead of os.system()
os.system("service pacemaker_remote stop")
self.cleanup_environment()
# @TODO Support the option of using specified existing certificates
authkey = f"{BuildOptions.PACEMAKER_CONFIG_DIR}/authkey"
if self.tls and not os.path.isfile(authkey):
print(f"Installing {authkey} ...")
# @TODO Use os.mkdir() instead
os.system(f"mkdir -p {BuildOptions.PACEMAKER_CONFIG_DIR}")
# @TODO Use the subprocess module with an argument array instead
os.system(f"dd if=/dev/urandom of={authkey} bs=4096 count=1")
self._installed_files.append(authkey)
# If we're in build directory, install agents if not already installed
# pylint: disable=protected-access
if os.path.exists(f"{BuildOptions._BUILD_DIR}/cts/cts-exec.in"):
if not os.path.exists(f"{BuildOptions.OCF_RA_INSTALL_DIR}/pacemaker"):
# @TODO remember which components were created and remove them
os.makedirs(f"{BuildOptions.OCF_RA_INSTALL_DIR}/pacemaker", 0o755)
for agent in ["Dummy", "Stateful", "ping"]:
agent_source = f"{BuildOptions._BUILD_DIR}/extra/resources/{agent}"
agent_dest = f"{BuildOptions.OCF_RA_INSTALL_DIR}/pacemaker/{agent}"
if not os.path.exists(agent_dest):
print(f"Installing {agent_dest} ...")
shutil.copyfile(agent_source, agent_dest)
os.chmod(agent_dest, EXECMODE)
self._installed_files.append(agent_dest)
subprocess.call(["cts-support", "install"])
def cleanup_environment(self):
"""Clean up the host after executing desired tests."""
for installed_file in self._installed_files:
print(f"Removing {installed_file} ...")
os.remove(installed_file)
subprocess.call(["cts-support", "uninstall"])
def _build_cmd_str(self, rsc, ty):
"""Construct a command string for the given resource and type."""
return f"{self._common_cmds[f'{rsc}_{ty}_line']} {self._common_cmds[f'{rsc}_{ty}_event']}"
def build_generic_tests(self):
"""Register tests that apply to all resource classes."""
common_cmds = self._common_cmds
# register/unregister tests
for rsc in self._rsc_classes:
test = self.new_test(f"generic_registration_{rsc}",
f"Simple resource registration test for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# start/stop tests
for rsc in self._rsc_classes:
test = self.new_test(f"generic_start_stop_{rsc}",
f"Simple start and stop test for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# monitor cancel test
for rsc in self._rsc_classes:
test = self.new_test(f"generic_monitor_cancel_{rsc}",
f"Simple monitor cancel test for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# monitor duplicate test
for rsc in self._rsc_classes:
test = self.new_test(f"generic_monitor_duplicate_{rsc}",
f"Test creation and canceling of duplicate monitors for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# Add the duplicate monitors
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# verify we still get update events
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# cancel the monitor, if the duplicate merged with the original, we should no longer see monitor updates
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# stop implies cancel test
for rsc in self._rsc_classes:
test = self.new_test(f"generic_stop_implies_cancel_{rsc}",
f"Verify stopping a resource implies cancel of recurring ops for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
def build_multi_rsc_tests(self):
"""Register complex tests that involve managing multiple resouces of different types."""
common_cmds = self._common_cmds
# do not use service and systemd at the same time, it is the same resource.
# register start monitor stop unregister resources of each type at the same time
test = self.new_test("multi_rsc_start_stop_all_including_stonith",
"Start, monitor, and stop resources of multiple types and classes")
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
for rsc in self._rsc_classes:
# If this fails, that means the monitor is not being rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
def build_negative_tests(self):
"""Register tests related to how pacemaker-execd handles failures."""
# ocf start timeout test
test = self.new_test("ocf_start_timeout", "Force start timeout to occur, verify start failure.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# -t must be less than self._action_timeout
test.add_cmd(args='-c exec -r test_rsc -a start -k op_sleep -v 5 -t 1000 -w')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:Error occurred op_status:Timed out" '
- + self._action_timeout)
- test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ f'{self._action_timeout}')
+ test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# stonith start timeout test
test = self.new_test("stonith_start_timeout", "Force start timeout to occur, verify start failure.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C stonith -P pacemaker -T fence_dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done"')
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C stonith -P pacemaker -T fence_dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done"')
# -t must be less than self._action_timeout
test.add_cmd(args='-c exec -r test_rsc -a start -k monitor_delay -v 30 -t 1000 -w')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:Error occurred op_status:Timed out" '
- + self._action_timeout)
- test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ f'{self._action_timeout}')
+ test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# stonith component fail
test = self.new_test("stonith_component_fail", "Kill stonith component after pacemaker-execd connects")
test.add_cmd(args=self._build_cmd_str("stonith", "reg"))
test.add_cmd(args=self._build_cmd_str("stonith", "start"))
test.add_cmd(args='-c exec -r stonith_test_rsc -a monitor -i 600s '
'-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Done" '
- + self._action_timeout)
+ f'{self._action_timeout}')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:Error occurred op_status:error" -t 15000',
kill="killall -9 -q pacemaker-fenced lt-pacemaker-fenced")
test.add_cmd(args=self._build_cmd_str("stonith", "unreg"))
# monitor fail for ocf resources
test = self.new_test("monitor_fail_ocf", "Force ocf monitor to fail, verify failure is reported.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"'
- + self._action_timeout)
+ f'{self._action_timeout}')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"'
- + self._action_timeout)
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" ' + self._action_timeout,
+ f'{self._action_timeout}')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
kill=f"rm -f {BuildOptions.LOCAL_STATE_DIR}/run/Dummy-test_rsc.state")
- test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
+ test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 1s {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" '
- + self._action_timeout, expected_exitcode=ExitStatus.TIMEOUT)
+ f'{self._action_timeout}', expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" '
- + self._action_timeout, expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ f'{self._action_timeout}', expected_exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# verify notify changes only for monitor operation
test = self.new_test("monitor_changes_only", "Verify when flag is set, only monitor changes are notified.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout + ' -o '
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} -o '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
- + ' -o -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
+ test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout}'
+ ' -o -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done"' + self._action_timeout,
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done"{self._action_timeout}',
kill=f"rm -f {BuildOptions.LOCAL_STATE_DIR}/run/Dummy-test_rsc.state")
- test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" ' + self._action_timeout,
+ test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 1s{self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done"')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done"')
# monitor fail for systemd resource
if "systemd" in self._rsc_classes:
test = self.new_test("monitor_fail_systemd", "Force systemd monitor to fail, verify failure is reported..")
- test.add_cmd(args='-c register_rsc -r test_rsc -C systemd -T pacemaker-cts-dummyd@3 ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout)
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout)
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done"' + self._action_timeout,
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C systemd -T pacemaker-cts-dummyd@3 {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done"{self._action_timeout}',
kill="pkill -9 -f pacemaker-cts-dummyd")
- test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" ' + self._action_timeout,
+ test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 1s{self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Cancel non-existent operation on a resource
test = self.new_test("cancel_non_existent_op", "Attempt to cancel the wrong monitor operation, verify expected failure")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout)
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}')
# interval is wrong, should fail
- test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 2s' + self._action_timeout
- + ' -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ',
+ test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 2s{self._action_timeout}'
+ ' -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ',
expected_exitcode=ExitStatus.ERROR)
# action name is wrong, should fail
- test.add_cmd(args='-c cancel -r test_rsc -a stop -i 1s' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ',
+ test.add_cmd(args=f'-c cancel -r test_rsc -a stop -i 1s{self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Attempt to invoke non-existent rsc id
test = self.new_test("invoke_non_existent_rsc", "Attempt to perform operations on a non-existent rsc id.")
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:error op_status:Done" ',
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:error op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ',
+ test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c exec -r test_rsc -a monitor -i 6s ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ',
+ test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 6s {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c cancel -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ',
+ test.add_cmd(args=f'-c cancel -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register and start a resource that doesn't exist, systemd
if "systemd" in self._rsc_classes:
test = self.new_test("start_uninstalled_systemd", "Register uninstalled systemd agent, try to start, verify expected failure")
- test.add_cmd(args='-c register_rsc -r test_rsc -C systemd -T this_is_fake1234 ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C systemd -T this_is_fake1234 {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register and start a resource that doesn't exist, ocf
test = self.new_test("start_uninstalled_ocf", "Register uninstalled ocf agent, try to start, verify expected failure.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T this_is_fake1234 ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T this_is_fake1234 {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register ocf with non-existent provider
test = self.new_test("start_ocf_bad_provider", "Register ocf agent with a non-existent provider, verify expected failure.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pancakes -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pancakes -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register ocf with empty provider field
test = self.new_test("start_ocf_no_provider", "Register ocf agent with a no provider, verify expected failure.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ',
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Error" ',
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Error" ',
expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
def build_stress_tests(self):
"""Register stress tests."""
timeout = "-t 20000"
iterations = 25
test = self.new_test("ocf_stress", "Verify OCF agent handling works under load")
for i in range(iterations):
test.add_cmd(args=f'-c register_rsc -r rsc_{i} {timeout} -C ocf -P heartbeat -T Dummy -l "NEW_EVENT event_type:register rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a start {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:start rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a monitor {timeout} -i 1s '
f'-l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:monitor rc:ok op_status:Done"')
for i in range(iterations):
test.add_cmd(args=f'-c exec -r rsc_{i} -a stop {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:stop rc:ok op_status:Done"')
test.add_cmd(args=f'-c unregister_rsc -r rsc_{i} {timeout} -l "NEW_EVENT event_type:unregister rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
if "systemd" in self._rsc_classes:
test = self.new_test("systemd_stress", "Verify systemd dbus connection works under load")
for i in range(iterations):
test.add_cmd(args=f'-c register_rsc -r rsc_{i} {timeout} -C systemd -T pacemaker-cts-dummyd@3 -l "NEW_EVENT event_type:register rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a start {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:start rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a monitor {timeout} -i 1s '
f'-l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:monitor rc:ok op_status:Done"')
for i in range(iterations):
test.add_cmd(args=f'-c exec -r rsc_{i} -a stop {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:stop rc:ok op_status:Done"')
test.add_cmd(args=f'-c unregister_rsc -r rsc_{i} {timeout} -l "NEW_EVENT event_type:unregister rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
iterations = 9
timeout = "-t 30000"
# Verify recurring op in-flight collision is handled in series properly
test = self.new_test("rsc_inflight_collision", "Verify recurring ops do not collide with other operations for the same rsc.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
- '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ' + self._action_timeout)
+ f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args=f'-c exec -r test_rsc -a start {timeout} -k op_sleep -v 1 -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done"')
for i in range(iterations):
test.add_cmd(args=f'-c exec -r test_rsc -a monitor {timeout} -i 100{i}ms -k op_sleep -v 2 '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r test_rsc -a stop {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done"')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {timeout} -l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done"')
def build_custom_tests(self):
"""Register tests that target specific cases."""
# verify resource temporary folder is created and used by OCF agents
test = self.new_test("rsc_tmp_dir", "Verify creation and use of rsc temporary state directory")
test.add_cmd("ls", args=f"-al {BuildOptions.RSC_TMP_DIR}")
test.add_cmd(args='-c register_rsc -r test_rsc -P heartbeat -C ocf -T Dummy '
- '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ' + self._action_timeout)
+ f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args='-c exec -r test_rsc -a start -t 4000')
test.add_cmd("ls", args=f"-al {BuildOptions.RSC_TMP_DIR}")
test.add_cmd("ls", args=f"{BuildOptions.RSC_TMP_DIR}/Dummy-test_rsc.state")
test.add_cmd(args='-c exec -r test_rsc -a stop -t 4000')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# start delay then stop test
test = self.new_test("start_delay", "Verify start delay works as expected.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
- '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ' + self._action_timeout)
+ f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args='-c exec -r test_rsc -s 6000 -a start -w -t 6000')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" -t 2000',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" -t 6000')
- test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# start delay, but cancel before it gets a chance to start
test = self.new_test("start_delay_cancel", "Using start_delay, start a rsc, but cancel the start op before execution.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
- '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ' + self._action_timeout)
+ f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args='-c exec -r test_rsc -s 5000 -a start -w -t 4000')
- test.add_cmd(args='-c cancel -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ')
+ test.add_cmd(args=f'-c cancel -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" -t 5000',
expected_exitcode=ExitStatus.TIMEOUT)
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register a bunch of resources, verify we can get info on them
test = self.new_test("verify_get_rsc_info", "Register multiple resources, verify retrieval of rsc info.")
if "systemd" in self._rsc_classes:
- test.add_cmd(args='-c register_rsc -r rsc1 -C systemd -T pacemaker-cts-dummyd@3 ' + self._action_timeout)
+ test.add_cmd(args=f'-c register_rsc -r rsc1 -C systemd -T pacemaker-cts-dummyd@3 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc1 ')
- test.add_cmd(args='-c unregister_rsc -r rsc1 ' + self._action_timeout)
+ test.add_cmd(args=f'-c unregister_rsc -r rsc1 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc1 ', expected_exitcode=ExitStatus.ERROR)
- test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker ' + self._action_timeout)
+ test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc2 ')
- test.add_cmd(args='-c unregister_rsc -r rsc2 ' + self._action_timeout)
+ test.add_cmd(args=f'-c unregister_rsc -r rsc2 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc2 ', expected_exitcode=ExitStatus.ERROR)
# Register duplicate, verify only one entry exists and can still be removed
test = self.new_test("duplicate_registration", "Register resource multiple times, verify only one entry exists and can be removed.")
- test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker ' + self._action_timeout)
+ test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker {self._action_timeout}')
test.add_cmd(args="-c get_rsc_info -r rsc2 ",
stdout_match="id:rsc2 class:ocf provider:pacemaker type:Dummy")
- test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker ' + self._action_timeout)
+ test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker {self._action_timeout}')
test.add_cmd(args="-c get_rsc_info -r rsc2 ",
stdout_match="id:rsc2 class:ocf provider:pacemaker type:Dummy")
- test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Stateful -P pacemaker ' + self._action_timeout)
+ test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Stateful -P pacemaker {self._action_timeout}')
test.add_cmd(args="-c get_rsc_info -r rsc2 ",
stdout_match="id:rsc2 class:ocf provider:pacemaker type:Stateful")
- test.add_cmd(args='-c unregister_rsc -r rsc2 ' + self._action_timeout)
+ test.add_cmd(args=f'-c unregister_rsc -r rsc2 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc2 ', expected_exitcode=ExitStatus.ERROR)
# verify the option to only send notification to the original client
test = self.new_test("notify_orig_client_only", "Verify option to only send notifications to the client originating the action.")
- test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
- + '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
- + '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
- test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout + ' -n '
+ test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout}'
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout}'
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout} -n '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
# this will fail because the monitor notifications should only go to the original caller, which no longer exists.
- test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ' + self._action_timeout,
+ test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s -t 6000 ')
- test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
- + '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
+ test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout}'
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# get metadata
test = self.new_test("get_ocf_metadata", "Retrieve metadata for a resource")
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T Dummy",
stdout_match="resource-agent name=\"Dummy\"")
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T Stateful")
test.add_cmd(args="-c metadata -P pacemaker -T Stateful", expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T fake_agent", expected_exitcode=ExitStatus.ERROR)
# get stonith metadata
test = self.new_test("get_stonith_metadata", "Retrieve stonith metadata for a resource")
test.add_cmd(args="-c metadata -C stonith -P pacemaker -T fence_dummy",
stdout_match="resource-agent name=\"fence_dummy\"")
# get lsb metadata
if "lsb" in self._rsc_classes:
test = self.new_test("get_lsb_metadata",
"Retrieve metadata for an LSB resource")
test.add_cmd(args="-c metadata -C lsb -T LSBDummy",
stdout_match="resource-agent name='LSBDummy'")
# get metadata
if "systemd" in self._rsc_classes:
test = self.new_test("get_systemd_metadata", "Retrieve metadata for a resource")
test.add_cmd(args="-c metadata -C systemd -T pacemaker-cts-dummyd@",
stdout_match="resource-agent name=\"pacemaker-cts-dummyd@\"")
# get ocf providers
test = self.new_test("list_ocf_providers",
"Retrieve list of available resource providers, verifies pacemaker is a provider.")
test.add_cmd(args="-c list_ocf_providers ", stdout_match="pacemaker")
test.add_cmd(args="-c list_ocf_providers -T ping", stdout_match="pacemaker")
# Verify agents only exist in their lists
test = self.new_test("verify_agent_lists", "Verify the agent lists contain the right data.")
if "ocf" in self._rsc_classes:
test.add_cmd(args="-c list_agents ", stdout_match="Stateful")
test.add_cmd(args="-c list_agents -C ocf", stdout_match="Stateful",
stdout_no_match="pacemaker-cts-dummyd@|fence_dummy")
if "service" in self._rsc_classes:
test.add_cmd(args="-c list_agents -C service", stdout_match="",
stdout_no_match="Stateful|fence_dummy")
if "lsb" in self._rsc_classes:
test.add_cmd(args="-c list_agents", stdout_match="LSBDummy")
test.add_cmd(args="-c list_agents -C lsb", stdout_match="LSBDummy",
stdout_no_match="pacemaker-cts-dummyd@|Stateful|fence_dummy")
test.add_cmd(args="-c list_agents -C service", stdout_match="LSBDummy")
if "systemd" in self._rsc_classes:
test.add_cmd(args="-c list_agents ", stdout_match="pacemaker-cts-dummyd@") # systemd
test.add_cmd(args="-c list_agents -C systemd", stdout_match="", stdout_no_match="Stateful") # should not exist
test.add_cmd(args="-c list_agents -C systemd", stdout_match="pacemaker-cts-dummyd@")
test.add_cmd(args="-c list_agents -C systemd", stdout_match="", stdout_no_match="fence_dummy") # should not exist
if "stonith" in self._rsc_classes:
test.add_cmd(args="-c list_agents -C stonith", stdout_match="fence_dummy") # stonith
test.add_cmd(args="-c list_agents -C stonith", stdout_match="", # should not exist
stdout_no_match="pacemaker-cts-dummyd@")
test.add_cmd(args="-c list_agents -C stonith", stdout_match="", stdout_no_match="Stateful") # should not exist
test.add_cmd(args="-c list_agents ", stdout_match="fence_dummy")
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-execd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
- "\t " + sys.argv[0] + " --run-only start_stop\n\n"
+ f"\t {sys.argv[0]} --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
- "\t " + sys.argv[0] + " --run-only-pattern systemd")
+ f"\t {sys.argv[0]} --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-t", "--timeout", type=float, default=2,
help="Up to how many seconds each test case waits for the daemon to "
"be initialized. Defaults to 2. The value 0 means no limit.")
parser.add_argument("-w", "--force-wait", action="store_true",
help="Each test case waits the default/specified --timeout for the "
"daemon without tracking the log")
if BuildOptions.REMOTE_ENABLED:
parser.add_argument("-R", "--pacemaker-remote", action="store_true",
help="Test pacemaker-remoted binary instead of pacemaker-execd")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run pacemaker-execd regression tests as specified by arguments."""
update_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
if opts.pacemaker_remote:
exit_if_proc_running("pacemaker-remoted")
else:
exit_if_proc_running("corosync")
exit_if_proc_running("pacemaker-execd")
exit_if_proc_running("pacemaker-fenced")
# Create a temporary directory for log files (the directory will
# automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-exec-") as logdir:
tests = ExecTests(verbose=opts.verbose, tls=opts.pacemaker_remote,
timeout=opts.timeout, force_wait=opts.force_wait,
logdir=logdir)
tests.build_generic_tests()
tests.build_multi_rsc_tests()
tests.build_negative_tests()
tests.build_custom_tests()
tests.build_stress_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
tests.setup_environment()
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment()
tests.exit()
if __name__ == "__main__":
main()
diff --git a/cts/cts-fencing.in b/cts/cts-fencing.in
index 058d757b87..1daa06f0d7 100644
--- a/cts/cts-fencing.in
+++ b/cts/cts-fencing.in
@@ -1,953 +1,953 @@
#!@PYTHON@
""" Regression tests for Pacemaker's fencer
"""
__copyright__ = "Copyright 2012-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import sys
import subprocess
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync, localname
from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
TEST_DIR = sys.path[0]
def update_path():
""" Set the PATH environment variable appropriately for the tests """
new_path = os.environ['PATH']
if os.path.exists(f"{TEST_DIR}/cts-fencing.in"):
print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR} ({TEST_DIR})")
# For pacemaker-fenced and cts-fence-helper
new_path = f"{BuildOptions._BUILD_DIR}/daemons/fenced:{new_path}"
new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" # For stonith_admin
new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}" # For cts-support
else:
print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {TEST_DIR})")
# For pacemaker-fenced, cts-fence-helper, and cts-support
new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
print(f'Using PATH="{new_path}"')
os.environ['PATH'] = new_path
class FenceTest(Test):
""" Executor for a single test """
def __init__(self, name, description, **kwargs):
Test.__init__(self, name, description, **kwargs)
self._daemon_location = "pacemaker-fenced"
def _kill_daemons(self):
killall(["pacemakerd", "pacemaker-fenced"])
def _start_daemons(self):
cmd = ["pacemaker-fenced", "--stand-alone", "--logfile", self.logpath]
if self.verbose:
cmd += ["-V"]
s = " ".join(cmd)
print(f"Starting {s}")
self._daemon_process = subprocess.Popen(cmd)
class FenceTests(Tests):
""" Collection of all fencing regression tests """
def __init__(self, **kwargs):
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-fencing")
def new_test(self, name, description):
""" Create a named test """
test = FenceTest(name, description, verbose=self.verbose,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def build_api_sanity_tests(self):
""" Register tests to verify basic API usage """
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("low_level_api_test", "Sanity-test client API")
test.add_cmd("cts-fence-helper", args=f"-t {verbose_arg}", validate=False)
test = self.new_test("low_level_api_mainloop_test",
"Sanity-test client API using mainloop")
test.add_cmd("cts-fence-helper", args=f"-m {verbose_arg}", validate=False)
def build_custom_timeout_tests(self):
""" Register tests to verify custom timeout usage """
# custom timeout without topology
test = self.new_test("custom_timeout_1",
"Verify per device timeouts work as expected without using topology")
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4 = 10
test.add_log_pattern("Total timeout set to 12s")
# custom timeout _WITH_ topology
test = self.new_test("custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology")
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1000ms')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4000s')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4000 = 4006
test.add_log_pattern("Total timeout set to 4807s")
def build_fence_merge_tests(self):
""" Register tests to verify when fence operations should be merged """
### Simple test that overlapping fencing operations get merged
test = self.new_test("custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### one merger will happen
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
### Test that multiple mergers occur
test = self.new_test("custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o delay=2 -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
### Test that multiple mergers occur with topologies used
test = self.new_test("custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
def build_fence_no_merge_tests(self):
""" Register tests to verify when fence operations should not be merged """
test = self.new_test("custom_no_merge",
"Verify differing fencing operations are not merged")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node2 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client",
negative=True)
def build_standalone_tests(self):
""" Register a grab bag of tests """
# test what happens when all devices timeout
test = self.new_test("fence_multi_device_failure",
"Verify that all devices timeout, a fencing failure is returned")
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 2", expected_exitcode=ExitStatus.TIMEOUT)
test.add_log_pattern("Total timeout set to 7s")
test.add_log_pattern("targeting node3 using false1 returned ")
test.add_log_pattern("targeting node3 using false2 returned ")
test.add_log_pattern("targeting node3 using false3 returned ")
# test what happens when multiple devices can fence a node, but the first device fails
test = self.new_test("fence_device_failure_rollover",
"Verify that when one fence device fails for a node, the others are tried")
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 18s")
# test what happens when we try to use a missing fence-agent
test = self.new_test("fence_missing_agent",
"Verify proper error-handling when using a non-existent fence-agent")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_missing -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5", expected_exitcode=ExitStatus.NOSUCH)
test.add_cmd("stonith_admin", args="--output-as=xml -F node2 -t 5")
# simple topology test for one device
test = self.new_test("topology_simple",
"Verify all fencing devices at a level are used")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# add topology, delete topology, verify fencing still works
test = self.new_test("topology_add_remove",
"Verify fencing occurrs after all topology levels are removed")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level has multiple devices
test = self.new_test("topology_device_fails",
"Verify if one device in a level fails, the other is tried")
test.add_cmd("stonith_admin",
args='--output-as=xml -R false -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 48s")
test.add_log_pattern("targeting node3 using false returned 1")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level fails
test = self.new_test("topology_multi_level_fails",
"Verify if one level fails, the next leve is tried")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 3")
test.add_log_pattern("Total timeout set to 21s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned 1")
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# test what happens when the first fencing level had devices that no one has registered
test = self.new_test("topology_missing_devices",
"Verify topology can continue with missing devices")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# Test what happens if multiple fencing levels are defined, and then the first one is removed
test = self.new_test("topology_level_removal",
"Verify level removal works")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 96s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned ",
negative=True)
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# Test targeting a topology level by node name pattern
test = self.new_test("topology_level_pattern",
"Verify targeting topology by node name pattern works")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r '@node.*' -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("targeting node3 using true returned 0")
# test allowing commas and semicolons as delimiters in pcmk_host_list
test = self.new_test("host_list_delimiters",
"Verify commas and semicolons can be used as pcmk_host_list delimiters")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1,node2,node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=pcmk1;pcmk2;pcmk3"')
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F node2 -t 5")
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F pcmk3 -t 5")
test.add_log_pattern("targeting node2 using true1 returned 0")
test.add_log_pattern("targeting pcmk3 using true2 returned 0")
# test the stonith builds the correct list of devices that can fence a node
test = self.new_test("list_devices",
"Verify list of devices that can fence a node is correct")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -l node1 -V",
stdout_match="true2", stdout_no_match="true1")
test.add_cmd("stonith_admin", args="--output-as=xml -l node1 -V",
stdout_match="true3", stdout_no_match="true1")
# simple test of device monitor
test = self.new_test("monitor", "Verify device is reachable")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q false1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q true2", expected_exitcode=ExitStatus.NOSUCH)
# Verify monitor occurs for duration of timeout period on failure
test = self.new_test("monitor_timeout",
"Verify monitor uses duration of timeout period given")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1 -t 5", expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempt 2 to execute")
# Verify monitor occurs for duration of timeout period on failure, but stops at max retries
test = self.new_test("monitor_timeout_max_retries",
"Verify monitor retries until max retry value or timeout is hit")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1 -t 15", expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times")
# simple register test
test = self.new_test("register",
"Verify devices can be registered and un-registered")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1", expected_exitcode=ExitStatus.NOSUCH)
# simple reboot test
test = self.new_test("reboot", "Verify devices can be rebooted")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -B node3 -t 5")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1", expected_exitcode=ExitStatus.NOSUCH)
# test fencing history
test = self.new_test("fence_history",
"Verify last fencing operation is returned")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5 -V")
test.add_cmd("stonith_admin", args="--output-as=xml -H node3",
stdout_match='action="off" target="node3" .* status="success"')
# simple test of dynamic list query
test = self.new_test("dynamic_list_query",
"Verify dynamic list of fencing devices can be retrieved")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -l fake_port_1",
stdout_match='count="3"')
# fence using dynamic list query
test = self.new_test("fence_dynamic_list_query",
"Verify dynamic list of fencing devices can be retrieved")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -F fake_port_1 -t 5 -V")
# simple test of query using status action
test = self.new_test("status_query",
"Verify dynamic list of fencing devices can be retrieved")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args="--output-as=xml -l fake_port_1",
stdout_match='count="3"')
# test what happens when no reboot action is advertised
test = self.new_test("no_reboot_support",
"Verify reboot action defaults to off when no reboot action is advertised by agent")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_no_reboot -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not support reboot")
test.add_log_pattern("using true1 returned 0")
# make sure reboot is used when reboot action is advertised
test = self.new_test("with_reboot_support",
"Verify reboot action can be used when metadata advertises it")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not advertise support for 'reboot', performing 'off'",
negative=True)
test.add_log_pattern("using true1 returned 0")
# make sure all fencing delays are applied correctly and taken into account by fencing timeouts with topology
test = self.new_test("topology_delays",
"Verify all fencing delays are applied correctly and taken into account by fencing timeouts with topology")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
# Resulting "random" delay will always be 1 since (rand() % (delay_max - delay_base)) is always 0 here
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1 -o pcmk_delay_max=2')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 --delay 1")
# Total fencing timeout takes all fencing delays into account
test.add_log_pattern("Total timeout set to 582s")
# Fencing timeout for the first device takes the requested fencing delay
# and pcmk_delay_base into account
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true1 .*146s.*",
regex=True)
# Requested fencing delay is applied only for the first device in the
# first level, with the static delay from pcmk_delay_base added
test.add_log_pattern("Delaying 'off' action targeting node3 using true1 for 2s | timeout=120s requested_delay=1s base=1s max=1s")
# Fencing timeout no longer takes the requested fencing delay into account for further devices
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using false1 .*145s.*",
regex=True)
# Requested fencing delay is no longer applied for further devices
test.add_log_pattern("Delaying 'off' action targeting node3 using false1 for 1s | timeout=120s requested_delay=0s base=1s max=1s")
# Fencing timeout takes pcmk_delay_max into account
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true2 .*146s.*",
regex=True)
test.add_log_pattern("Delaying 'off' action targeting node3 using true2 for 1s | timeout=120s requested_delay=0s base=1s max=2s")
test.add_log_pattern("Delaying 'off' action targeting node3 using true3",
negative=True)
def build_nodeid_tests(self):
""" Register tests that use a corosync node id """
our_uname = localname()
### verify nodeid is supplied when nodeid is in the metadata parameters
test = self.new_test("supply_nodeid",
"Verify nodeid is given when fence agent has nodeid as parameter")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -F {our_uname} -t 3")
test.add_log_pattern(f"as nodeid with fence action 'off' targeting {our_uname}")
### verify nodeid is _NOT_ supplied when nodeid is not in the metadata parameters
test = self.new_test("do_not_supply_nodeid",
"Verify nodeid is _NOT_ given when fence agent does not have nodeid as parameter")
# use a host name that won't be in corosync.conf
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_no_nodeid '
f'-o mode=pass -o pcmk_host_list="regr-test {our_uname}"')
test.add_cmd("stonith_admin", args="--output-as=xml -F regr-test -t 3")
test.add_log_pattern("as nodeid with fence action 'off' targeting regr-test",
negative=True)
test.add_cmd("stonith_admin", args=f"--output-as=xml -F {our_uname} -t 3")
test.add_log_pattern("as nodeid with fence action 'off' targeting {our_uname}",
negative=True)
def build_unfence_tests(self):
""" Register tests that verify unfencing """
our_uname = localname()
### verify unfencing using automatic unfencing
test = self.new_test("unfence_required_1",
"Verify require unfencing on all devices when automatic=true in agent's metadata")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
# both devices should be executed
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
### verify unfencing using automatic unfencing fails if any of the required agents fail
test = self.new_test("unfence_required_2",
"Verify require unfencing on all devices when automatic=true in agent's metadata")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=fail -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 6", expected_exitcode=ExitStatus.ERROR)
### verify unfencing using automatic devices with topology
test = self.new_test("unfence_required_3",
"Verify require unfencing on all devices even when at different topology levels")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v true1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
### verify unfencing using automatic devices with topology
test = self.new_test("unfence_required_4",
"Verify all required devices are executed even with topology levels fail")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true3 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true4 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false4 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v true1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v false1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v false2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v false3")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true3")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 3 -v false4")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 4 -v true4")
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
test.add_log_pattern("using true3 returned 0")
test.add_log_pattern("using true4 returned 0")
def build_unfence_on_target_tests(self):
""" Register tests that verify unfencing that runs on the target """
our_uname = localname()
### verify unfencing using on_target device
test = self.new_test("unfence_on_target_1",
"Verify unfencing with on_target = true")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("(on) to be executed on target")
### verify failure of unfencing using on_target device
test = self.new_test("unfence_on_target_2",
"Verify failure unfencing with on_target = true")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node_fake_1234"')
test.add_cmd("stonith_admin", args="--output-as=xml -U node_fake_1234 -t 3", expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
### verify unfencing using on_target device with topology
test = self.new_test("unfence_on_target_3",
"Verify unfencing with on_target = true using topology")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v true1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("(on) to be executed on target")
### verify unfencing using on_target device with topology fails when target node doesn't exist
test = self.new_test("unfence_on_target_4",
"Verify unfencing failure with on_target = true using topology")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node_fake"')
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node_fake"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -U node_fake -t 3", expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
def build_remap_tests(self):
""" Register tests that verify remapping of reboots to off-on """
test = self.new_test("remap_simple",
"Verify sequential topology reboot is remapped to all-off-then-all-on")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=1 -o pcmk_reboot_timeout=10')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# fence_dummy sets "on" as an on_target action
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("remap_simple_off",
"Verify sequential topology reboot skips 'on' if "
"pcmk_reboot_action=off or agent doesn't support "
"'on'")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o mode=pass "
"-o pcmk_host_list=node_fake -o pcmk_off_timeout=1 "
"-o pcmk_reboot_timeout=10 -o pcmk_reboot_action=off")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy_no_on "
"-o mode=pass -o pcmk_host_list=node_fake "
"-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20")
test.add_cmd("stonith_admin",
args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# "on" should be skipped
test.add_log_pattern("Not turning node_fake back on using "
"true1 because the device is configured "
"to stay off")
test.add_log_pattern("Not turning node_fake back on using true2"
" because the agent doesn't support 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("remap_automatic",
"Verify remapped topology reboot skips automatic 'on'")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence '
'-o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence '
'-o "mode=pass" -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'on' action targeting node_fake using",
negative=True)
test.add_log_pattern("'on' failure",
negative=True)
test = self.new_test("remap_complex_1",
"Verify remapped topology reboot in second level works if non-remapped first level fails")
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("remap_complex_2",
"Verify remapped topology reboot failure in second level proceeds to third level")
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v false2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 3 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using false2")
test.add_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'reboot' action targeting node_fake using true2")
test.add_log_pattern("node_fake with true3",
negative=True)
def build_query_tests(self):
""" run stonith_admin --metadata for the fence_dummy agent and check command output """
test = self.new_test("get_metadata",
"Run stonith_admin --metadata for the fence_dummy agent")
test.add_cmd("stonith_admin", args="--output-as=xml -a fence_dummy --metadata",
stdout_match='