diff --git a/cts/cts-attrd.in b/cts/cts-attrd.in
index 948d843240..d716108868 100644
--- a/cts/cts-attrd.in
+++ b/cts/cts-attrd.in
@@ -1,416 +1,397 @@
#!@PYTHON@
"""Regression tests for Pacemaker's attribute daemon."""
# pylint doesn't like the module name "cts-attrd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2023-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import subprocess
import sys
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
+from pacemaker._cts.environment import set_cts_path
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
-TEST_DIR = sys.path[0]
-
-
-def update_path():
- """Set the PATH environment variable appropriately for the tests."""
- new_path = os.environ['PATH']
- if os.path.exists(f"{TEST_DIR}/cts-attrd.in"):
- # pylint: disable=protected-access
- print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR} ({TEST_DIR})")
- # For pacemaker-attrd
- new_path = f"{BuildOptions._BUILD_DIR}/daemons/attrd:{new_path}"
-
- else:
- print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {TEST_DIR})")
- # For pacemaker-attrd
- new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
-
- print(f'Using PATH="{new_path}"')
- os.environ['PATH'] = new_path
-
class AttributeTest(Test):
"""Executor for a single test."""
def __init__(self, name, description, **kwargs):
"""
Create a new AttributeTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
"""
Test.__init__(self, name, description, **kwargs)
self._daemon_location = "pacemaker-attrd"
self._enable_corosync = True
def _kill_daemons(self):
killall([self._daemon_location])
def _start_daemons(self):
if self.verbose:
print(f"Starting {self._daemon_location}")
cmd = [self._daemon_location, "-s", "-l", self.logpath]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
class AttributeTests(Tests):
"""Collection of all attribute regression tests."""
def __init__(self, **kwargs):
"""Create a new AttributeTests instance."""
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-attrd")
def new_test(self, name, description):
"""Create a named test."""
test = AttributeTest(name, description, verbose=self.verbose, logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self, use_corosync):
"""Prepare the host before executing any tests."""
if use_corosync:
self._corosync.start(kill_first=True)
def cleanup_environment(self, use_corosync):
"""Clean up the host after executing desired tests."""
if use_corosync:
self._corosync.stop()
def build_basic_tests(self):
"""Add basic tests - setting, querying, updating, and deleting attributes."""
test = self.new_test("set_attr_1",
"Set and query an attribute")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="111"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="111"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
# Setting the delay on an attribute that doesn't exist fails, but the failure is
# not passed back to attrd_updater.
test = self.new_test("set_attr_2",
"Set an attribute's delay")
test.add_cmd("attrd_updater", args="--name AAA -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Processed update-delay request from client .*: Error \(Attribute AAA does not exist\)",
regex=True)
test = self.new_test("set_attr_3",
"Set and query an attribute's delay and value")
test.add_cmd("attrd_updater", args="--name AAA -B 111 -d 5 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="111"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="111"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111 \| from .* with 5s write delay",
regex=True)
test = self.new_test("set_attr_4",
"Update an attribute that does not exist with a delay")
test.add_cmd("attrd_updater", args="--name BBB -U 999 -d 10 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="999"')
test.add_cmd("attrd_updater", args="--name BBB -Q",
stdout_match='name="BBB" host="[^"]+" value="999"',
validate=False)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 999 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_1",
"Update an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 333 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="333"')
test.add_cmd("attrd_updater", args="--name BBB -Q",
stdout_match='name="BBB" host="[^"]+" value="333"',
validate=False)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("update_attr_2",
"Update an attribute using a delay other than its default")
test.add_cmd("attrd_updater", args="--name BBB -U 777 -d 10 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 888 -d 7 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 777 -> 888 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_delay_1",
"Update the delay of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test = self.new_test("update_attr_delay_2",
"Update the delay and value of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -B 333 -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("missing_attr_1",
"Query an attribute that does not exist")
test.add_cmd("attrd_updater", args="--name NOSUCH --output-as=xml",
expected_exitcode=ExitStatus.CONFIG)
test = self.new_test("delete_attr_1",
"Delete an existing attribute")
test.add_cmd("attrd_updater", args="--name CCC -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name CCC -D --output-as=xml")
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
test = self.new_test("missing_attr_2",
"Delete an attribute that does not exist")
test.add_cmd("attrd_updater", args="--name NOSUCH2 -D --output-as=xml")
test = self.new_test("attr_in_set_1",
"Set and query an attribute in a specific set")
test.add_cmd("attrd_updater", args="--name DDD -U 555 --set=foo --output-as=xml")
test.add_cmd("attrd_updater", args="--name DDD -Q --output-as=xml",
stdout_match='name="DDD" value="555"')
test.add_cmd("attrd_updater", args="--name DDD -Q",
stdout_match='name="DDD" host="[^"]+" value="555"',
validate=False)
test.add_log_pattern("Processed 1 private change for DDD (set foo)")
def build_multiple_query_tests(self):
"""Add tests that set and query an attribute across multiple nodes."""
# NOTE: These tests make use of the fact that nothing in attrd actually
# cares about whether a node exists when you set or query an attribute.
# It just keeps creating new hash tables for each node you ask it about.
test = self.new_test("multi_query_1",
"Query an attribute set across multiple nodes")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --node cluster1 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -U 222 --node cluster2 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
stdout_match=r'\n.*')
test.add_cmd("attrd_updater", args="--name AAA -QA",
stdout_match='name="AAA" host="cluster1" value="111"\nname="AAA" host="cluster2" value="222"',
validate=False)
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1 --output-as=xml",
stdout_match='')
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1",
stdout_match='name="AAA" host="cluster1" value="111"',
validate=False)
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
stdout_match='')
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2",
stdout_match='name="AAA" host="cluster2" value="222"',
validate=False)
test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
stdout_match=r'\n.*',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -QA",
stdout_match='name="AAA" host="cluster1" value="111"\nname="AAA" host="cluster2" value="222"',
validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="cluster1" value="111"',
validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
stdout_match='',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2",
stdout_match='name="AAA" host="cluster2" value="222"',
validate=False, env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
def build_regex_tests(self):
"""Add tests that use regexes."""
test = self.new_test("regex_update_1",
"Update attributes using a regex")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd("attrd_updater", args="--name ABB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'A.*' -U 333 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="333"')
test.add_cmd("attrd_updater", args="--name ABB -Q --output-as=xml",
stdout_match='name="ABB" value="333"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="333"',
validate=False)
test.add_cmd("attrd_updater", args="--name ABB -Q",
stdout_match='name="ABB" host="[^"]+" value="333"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: 111 -> 333",
regex=True)
test = self.new_test("regex_delete_1",
"Delete attributes using a regex")
test.add_cmd("attrd_updater", args="--name XAX -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name XBX -U 555 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'X[A|B]X' -D --output-as=xml")
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: \(unset\) -> 555",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: 555 -> \(unset\)",
regex=True)
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
def build_utilization_tests(self):
"""Add tests that involve utilization attributes."""
test = self.new_test("utilization_1",
"Set and query a utilization attribute")
test.add_cmd("attrd_updater", args="--name AAA -U ABC -z --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="ABC"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="ABC"',
validate=False)
test.add_log_pattern(r"Setting AAA\[.*\] in utilization: \(unset\) -> ABC",
regex=True)
def build_sync_point_tests(self):
"""Add tests that involve sync points."""
test = self.new_test("local_sync_point",
"Wait for a local sync point")
test.add_cmd("attrd_updater", args="--name AAA -U 123 --wait=local --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="123"')
test.add_cmd("attrd_updater", args="--name AAA -Q",
stdout_match='name="AAA" host="[^"]+" value="123"',
validate=False)
test.add_log_pattern(r"Alerting client .* for reached local sync point",
regex=True)
test = self.new_test("cluster_sync_point",
"Wait for a cluster-wide sync point")
test.add_cmd("attrd_updater", args="--name BBB -U 456 --wait=cluster --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="456"')
test.add_cmd("attrd_updater", args="--name BBB -Q",
stdout_match='name="BBB" host="[^"]+" value="456"',
validate=False)
test.add_log_pattern(r"Alerting client .* for reached cluster sync point",
regex=True)
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-attrd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
f"\t {sys.argv[0]} --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
f"\t {sys.argv[0]} --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run attrd regression tests as specified by arguments."""
- update_path()
+ set_cts_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
exit_if_proc_running("pacemaker-attrd")
# Create a temporary directory for log files (the directory and its
# contents will automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-attrd-") as logdir:
tests = AttributeTests(verbose=opts.verbose, logdir=logdir)
tests.build_basic_tests()
tests.build_multiple_query_tests()
tests.build_regex_tests()
tests.build_utilization_tests()
tests.build_sync_point_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
try:
tests.setup_environment(True)
except TimeoutError:
print("corosync did not start in time, exiting")
sys.exit(ExitStatus.TIMEOUT)
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(True)
tests.exit()
if __name__ == "__main__":
main()
# vim: set filetype=python:
diff --git a/cts/cts-cli.in b/cts/cts-cli.in
index 503991ab81..85923e48a9 100644
--- a/cts/cts-cli.in
+++ b/cts/cts-cli.in
@@ -1,3455 +1,3427 @@
#!@PYTHON@
"""Regression tests for Pacemaker's command line tools."""
# pylint doesn't like the module name "cts-cli" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
# We know this is a very long file.
# pylint: disable=too-many-lines
__copyright__ = "Copyright 2024-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
from contextlib import contextmanager
from datetime import datetime, timedelta
import fileinput
from functools import partial
from gettext import ngettext
from multiprocessing import Pool, cpu_count
import os
import pathlib
import re
from shutil import copyfile
import signal
from string import Formatter
import subprocess
import sys
from tempfile import NamedTemporaryFile, TemporaryDirectory, mkstemp
import types
# These imports allow running from a source checkout after running `make`.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
+from pacemaker._cts.environment import set_cts_path
from pacemaker._cts.errors import XmlValidationError
from pacemaker._cts.validate import validate
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
# Individual tool tests are split out, but can also be accessed as a group with "tools"
tools_tests = ["cibadmin", "crm_attribute", "crm_standby", "crm_resource",
"crm_ticket", "crmadmin", "crm_shadow", "crm_verify", "crm_simulate",
"crm_diff"]
# The default list of tests to run, in the order they should be run
default_tests = ["access_render", "daemons", "dates", "error_codes"] + tools_tests + \
["crm_mon", "acls", "validity", "upgrade", "rules", "feature_set"]
other_tests = ["agents"]
# The directory containing this program
test_home = os.path.dirname(os.path.realpath(__file__))
# Where test data is stored
cts_cli_data = f"{test_home}/cli"
# The name of the shadow CIB
SHADOW_NAME = "cts-cli"
# Arguments to pass to valgrind
VALGRIND_ARGS = ["-q", "--gen-suppressions=all", "--show-reachable=no", "--leak-check=full",
"--trace-children=no", "--time-stamp=yes", "--num-callers=20",
f"--suppressions={test_home}/valgrind-pcmk.suppressions"]
class PluralFormatter(Formatter):
"""
Special string formatting class for selecting singular vs. plurals.
Use like so:
fmt = PluralFormatter()
print(fmt.format("{0} {0}:plural,test,tests} succeeded", n_tests))
"""
def format_field(self, value, format_spec):
"""Convert a value to a formatted representation."""
if format_spec.startswith("plural,"):
eles = format_spec.split(',')
if len(eles) == 2:
singular = eles[1]
plural = singular + "s"
else:
singular = eles[1]
plural = eles[2]
return ngettext(singular, plural, value)
return super().format_field(value, format_spec)
def cleanup_shadow_dir():
"""Remove any previously created shadow CIB directory."""
subprocess.run(["crm_shadow", "--force", "--delete", SHADOW_NAME],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
def copy_existing_cib(existing):
"""
Generate a CIB by copying an existing one to a temporary location.
This is suitable for use with the cib_gen= parameter to the TestGroup class.
"""
(fp, new) = mkstemp(prefix="cts-cli.cib.xml.")
os.close(fp)
copyfile(existing, new)
return new
def current_cib():
"""Return the complete current CIB."""
with environ({"CIB_user": "root"}):
return subprocess.check_output(["cibadmin", "-Q"], encoding="utf-8")
def make_test_group(desc, cmd, **kwargs):
"""
Create a TestGroup that replicates the same test for multiple classes.
The given description, cmd, and kwargs will be passed as arguments to each
Test subclass. The resulting objects will then be added to a TestGroup
and returned.
The main purpose of this function is to be able to run the same test for
both text and XML formats without having to duplicate everything.
"""
tests = []
for c in [Test, ValidatingTest]:
# Insert "--output-as=" after the command name.
splitup = cmd.split()
splitup.insert(1, c.format_args)
obj = c(desc, " ".join(splitup), **kwargs)
tests.append(obj)
return TestGroup(tests)
def create_shadow_cib(shadow_dir, create_empty=True, validate_with=None,
valgrind=False):
"""
Create a shadow CIB file.
Keyword arguments:
create_empty -- If True, the shadow CIB will be empty. Otherwise, the
shadow CIB will be a copy of the currently active
cluster configuration.
validate_with -- If not None, the schema version to validate the CIB
against
valgrind -- If True, run the create operation under valgrind
"""
args = ["crm_shadow", "--batch", "--force"]
if create_empty:
args += ["--create-empty", SHADOW_NAME]
else:
args += ["--create", SHADOW_NAME]
if validate_with is not None:
args += ["--validate-with", validate_with]
if valgrind:
args = ["valgrind"] + VALGRIND_ARGS + args
os.environ["CIB_shadow_dir"] = shadow_dir
os.environ["CIB_shadow"] = SHADOW_NAME
subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
delete_shadow_resource_defaults()
def delete_shadow_resource_defaults():
"""Clear out the rsc_defaults section from a shadow CIB file."""
# A newly created empty CIB might or might not have a rsc_defaults section
# depending on whether the --with-resource-stickiness-default configure
# option was used. To ensure regression tests behave the same either way,
# delete any rsc_defaults after creating or erasing a CIB.
subprocess.run(["cibadmin", "--delete", "--xml-text", ""],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True)
# The above command might or might not bump the CIB version, so reset it
# to ensure future changes result in the same version for comparison.
reset_shadow_cib_version()
def reset_shadow_cib_version():
"""Set various version numbers in a shadow CIB file back to 0."""
with fileinput.input(files=[shadow_path()], inplace=True) as f:
for line in f:
line = re.sub('epoch="[0-9]*"', 'epoch="1"', line)
line = re.sub('num_updates="[0-9]*"', 'num_updates="0"', line)
line = re.sub('admin_epoch="[0-9]*"', 'admin_epoch="0"', line)
print(line, end='')
def run_cmd_list(cmds):
"""
Run one or more shell commands.
cmds can be:
* A string
* A Python function
* A list of the above
Raises subprocess.CalledProcessError on error.
"""
if cmds is None:
return
if isinstance(cmds, (str, types.FunctionType)):
cmds = [cmds]
for c in cmds:
if isinstance(c, types.FunctionType):
c()
else:
subprocess.run(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, universal_newlines=True, check=True)
def sanitize_output(s):
"""
Replace content in the output expected to change between test runs.
This is stuff like version numbers, timestamps, source line numbers,
build options, system names and messages, etc.
"""
# A list of tuples of regular expressions and their replacements.
replacements = [
(r'Created new pacemaker-.* configuration', r'Created new pacemaker configuration'),
(r'Device not configured', r'No such device or address'),
(r'^Entity: line [0-9]+: ', r''),
(r'(Injecting attribute last-failure-ping#monitor_10000=)[0-9]*', r'\1'),
(r'Last change: .*', r'Last change:'),
(r'Last updated: .*', r'Last updated:'),
(r'^Migration will take effect until: .*', r'Migration will take effect until:'),
(r'(\* Possible values.*: .*)\(default: [^)]*\)', r'\1(default: )'),
(r"""-X '.*'""", r"""-X '...'"""),
(r' api-version="[^"]*"', r' api-version="X"'),
(r'\(apply_upgrade@.*\.c:[0-9]+\)', r'apply_upgrade'),
(r'\(invert_action@.*\.c:[0-9]+\)', r'invert_action'),
(r'\(pcmk__update_schema@.*\.c:[0-9]+\)', r'pcmk__update_schema'),
(r'(
"""
# Create a test CIB that has ACL roles
basic_tests = [
Test("Configure some ACLs", "cibadmin -M -o acls -p", update_cib=True,
stdin=acl_cib),
Test("Enable ACLs", "crm_attribute -n enable-acl -v true",
update_cib=True),
# Run cibadmin --show-access on the test CIB as an ACL-restricted user
Test("An instance of ACLs render (into color)",
"cibadmin --force --show-access=color -Q --user tony"),
Test("An instance of ACLs render (into namespacing)",
"cibadmin --force --show-access=namespace -Q --user tony"),
Test("An instance of ACLs render (into text)",
"cibadmin --force --show-access=text -Q --user tony"),
]
return [
ShadowTestGroup(basic_tests),
]
class DaemonsRegressionTest(RegressionTest):
"""A class for testing command line options of pacemaker daemons."""
@property
def name(self):
"""Return the name of this regression test."""
return "daemons"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
return [
Test("Get CIB manager metadata", "pacemaker-based metadata"),
Test("Get controller metadata", "pacemaker-controld metadata"),
Test("Get fencer metadata", "pacemaker-fenced metadata"),
Test("Get scheduler metadata", "pacemaker-schedulerd metadata"),
]
class DatesRegressionTest(RegressionTest):
"""A class for testing handling of ISO8601 dates."""
@property
def name(self):
"""Return the name of this regression test."""
return "dates"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
invalid_periods = [
"",
"2019-01-01 00:00:00Z", # Start with no end
"2019-01-01 00:00:00Z/", # Start with only a trailing slash
"PT2S/P1M", # Two durations
"2019-13-01 00:00:00Z/P1M", # Out-of-range month
"20191077T15/P1M", # Out-of-range day
"2019-10-01T25:00:00Z/P1M", # Out-of-range hour
"2019-10-01T24:00:01Z/P1M", # Hour 24 with anything but :00:00
"PT5H/20191001T007000Z", # Out-of-range minute
"2019-10-01 00:00:80Z/P1M", # Out-of-range second
"2019-10-01 00:00:10 +25:00/P1M", # Out-of-range offset hour
"20191001T000010 -00:61/P1M", # Out-of-range offset minute
"P1Y/2019-02-29 00:00:00Z", # Feb. 29 in non-leap-year
"2019-01-01 00:00:00Z/P", # Duration with no values
"P1Z/2019-02-20 00:00:00Z", # Invalid duration unit
"P1YM/2019-02-20 00:00:00Z", # No number for duration unit
]
# Ensure invalid period specifications are rejected
invalid_period_tests = []
for p in invalid_periods:
invalid_period_tests.append(Test(f"Invalid period - [{p}]",
f"iso8601 -p '{p}'",
expected_rc=ExitStatus.INVALID_PARAM))
year_tests = []
for y in ["06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "40"]:
year_tests.extend([
Test(f"20{y}-W01-7",
f"iso8601 -d '20{y}-W01-7 00Z'"),
Test(f"20{y}-W01-7 - round-trip",
f"iso8601 -d '20{y}-W01-7 00Z' -W -E '20{y}-W01-7 00:00:00Z'"),
Test(f"20{y}-W01-1",
f"iso8601 -d '20{y}-W01-1 00Z'"),
Test(f"20{y}-W01-1 - round-trip",
f"iso8601 -d '20{y}-W01-1 00Z' -W -E '20{y}-W01-1 00:00:00Z'")
])
return invalid_period_tests + [
make_test_group("'2005-040/2005-043' period", "iso8601 -p '2005-040/2005-043'"),
Test("2014-01-01 00:30:00 - 1 Hour",
"iso8601 -d '2014-01-01 00:30:00Z' -D P-1H -E '2013-12-31 23:30:00Z'"),
Test("Valid date - Feb 29 in leap year",
"iso8601 -d '2020-02-29 00:00:00Z' -E '2020-02-29 00:00:00Z'"),
Test("Valid date - using 'T' and offset",
"iso8601 -d '20191201T131211 -05:00' -E '2019-12-01 18:12:11Z'"),
Test("24:00:00 equivalent to 00:00:00 of next day",
"iso8601 -d '2019-12-31 24:00:00Z' -E '2020-01-01 00:00:00Z'"),
] + year_tests + [
make_test_group("2009-W53-07",
"iso8601 -d '2009-W53-7 00:00:00Z' -W -E '2009-W53-7 00:00:00Z'"),
Test("epoch + 2 Years 5 Months 6 Minutes",
"iso8601 -d 'epoch' -D P2Y5MT6M -E '1972-06-01 00:06:00Z'"),
Test("2009-01-31 + 1 Month",
"iso8601 -d '20090131T000000Z' -D P1M -E '2009-02-28 00:00:00Z'"),
Test("2009-01-31 + 2 Months",
"iso8601 -d '2009-01-31 00:00:00Z' -D P2M -E '2009-03-31 00:00:00Z'"),
Test("2009-01-31 + 3 Months",
"iso8601 -d '2009-01-31 00:00:00Z' -D P3M -E '2009-04-30 00:00:00Z'"),
make_test_group("2009-03-31 - 1 Month",
"iso8601 -d '2009-03-31 01:00:00 +01:00' -D P-1M -E '2009-02-28 00:00:00Z'"),
make_test_group("2038-01-01 + 3 Months",
"iso8601 -d '2038-01-01 00:00:00Z' -D P3M -E '2038-04-01 00:00:00Z'"),
]
class ErrorCodeRegressionTest(RegressionTest):
"""A class for testing error code reporting."""
@property
def name(self):
"""Return the name of this regression test."""
return "error_codes"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
# Legacy return codes
#
# Don't test unknown legacy code. FreeBSD includes a colon in strerror(),
# while other distros do not.
legacy_tests = [
make_test_group("Get legacy return code", "crm_error 201"),
make_test_group("Get legacy return code (with name)", "crm_error -n 201"),
make_test_group("Get multiple legacy return codes", "crm_error 201 202"),
make_test_group("Get multiple legacy return codes (with names)",
"crm_error -n 201 202"),
# We can only rely on our custom codes, so we'll spot-check codes 201-209
Test("List legacy return codes (spot check)",
"crm_error -l | grep 20[1-9]"),
ValidatingTest("List legacy return codes (spot check)",
"crm_error -l --output-as=xml | grep -Ev '&1"),
Test("Require --force for CIB erasure", "cibadmin -E",
expected_rc=ExitStatus.UNSAFE, update_cib=True),
Test("Allow CIB erasure with --force", "cibadmin -E --force"),
# Verify the output after erasure
Test("Query CIB", "cibadmin -Q",
setup=delete_shadow_resource_defaults,
update_cib=True),
]
# Add some stuff to the empty CIB so we know that erasing it did something.
basic_tests_setup = [
"""cibadmin -C -o nodes --xml-text ''""",
"""cibadmin -C -o crm_config --xml-text ''""",
"""cibadmin -C -o resources --xml-text ''"""
]
return [
ShadowTestGroup(basic_tests, setup=basic_tests_setup),
]
class CrmAttributeRegressionTest(RegressionTest):
"""A class for testing crm_attribute."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_attribute"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
options_tests = [
make_test_group("List all available options (invalid type)",
"crm_attribute --list-options=asdf",
expected_rc=ExitStatus.USAGE),
make_test_group("List non-advanced cluster options",
"crm_attribute --list-options=cluster"),
make_test_group("List all available cluster options",
"crm_attribute --list-options=cluster --all"),
Test("Return usage error if both -p and OCF_RESOURCE_INSTANCE are empty strings",
"crm_attribute -N cluster01 -p '' -G",
expected_rc=ExitStatus.USAGE),
]
value_update_tests = [
Test("Query the value of an attribute that does not exist",
"crm_attribute -n ABCD --query --quiet",
expected_rc=ExitStatus.NOSUCH),
Test("Configure something before erasing",
"crm_attribute -n test_attr -v 5", update_cib=True),
Test("Test '++' XML attribute update syntax",
"""cibadmin -M --score --xml-text=''""",
update_cib=True),
Test("Test '+=' XML attribute update syntax",
"""cibadmin -M --score --xml-text=''""",
update_cib=True),
make_test_group("Test '++' nvpair value update syntax",
"crm_attribute -n test_attr -v 'value++' --score",
update_cib=True),
make_test_group("Test '+=' nvpair value update syntax",
"crm_attribute -n test_attr -v 'value+=2' --score",
update_cib=True),
Test("Test '++' XML attribute update syntax (--score not set)",
"""cibadmin -M --xml-text=''""",
update_cib=True),
Test("Test '+=' XML attribute update syntax (--score not set)",
"""cibadmin -M --xml-text=''""",
update_cib=True),
make_test_group("Test '++' nvpair value update syntax (--score not set)",
"crm_attribute -n test_attr -v 'value++'",
update_cib=True),
make_test_group("Test '+=' nvpair value update syntax (--score not set)",
"crm_attribute -n test_attr -v 'value+=2'",
update_cib=True),
]
query_set_tests = [
Test("Set cluster option", "crm_attribute -n cluster-delay -v 60s",
update_cib=True),
Test("Query new cluster option",
"cibadmin -Q -o crm_config | grep cib-bootstrap-options-cluster-delay"),
Test("Set no-quorum policy",
"crm_attribute -n no-quorum-policy -v ignore", update_cib=True),
Test("Delete nvpair",
"""cibadmin -D -o crm_config --xml-text ''""",
update_cib=True),
Test("Create operation should fail",
"""cibadmin -C -o crm_config --xml-text ''""",
expected_rc=ExitStatus.EXISTS, update_cib=True),
Test("Modify cluster options section",
"""cibadmin -M -o crm_config --xml-text ''""",
update_cib=True),
Test("Query updated cluster option",
"cibadmin -Q -o crm_config | grep cib-bootstrap-options-cluster-delay",
update_cib=True),
Test("Set duplicate cluster option",
"crm_attribute -n cluster-delay -v 40s -s duplicate",
update_cib=True),
Test("Setting multiply defined cluster option should fail",
"crm_attribute -n cluster-delay -v 30s",
expected_rc=ExitStatus.MULTIPLE, update_cib=True),
Test("Set cluster option with -s",
"crm_attribute -n cluster-delay -v 30s -s duplicate",
update_cib=True),
Test("Delete cluster option with -i",
"crm_attribute -n cluster-delay -D -i cib-bootstrap-options-cluster-delay",
update_cib=True),
Test("Create node1 and bring it online",
"crm_simulate --live-check --in-place --node-up=node1",
update_cib=True),
Test("Create node attribute",
"crm_attribute -n ram -v 1024M -N node1 -t nodes",
update_cib=True),
Test("Query new node attribute",
"cibadmin -Q -o nodes | grep node1-ram",
update_cib=True),
Test("Create second node attribute",
"crm_attribute -n rattr -v XYZ -N node1 -t nodes",
update_cib=True),
Test("Query node attributes by pattern",
"crm_attribute -t nodes -P 'ra.*' -N node1 --query"),
Test("Update node attributes by pattern",
"crm_attribute -t nodes -P 'rat.*' -N node1 -v 10",
update_cib=True),
Test("Delete node attributes by pattern",
"crm_attribute -t nodes -P 'rat.*' -N node1 -D",
update_cib=True),
Test("Set a transient (fail-count) node attribute",
"crm_attribute -n fail-count-foo -v 3 -N node1 -t status",
update_cib=True),
Test("Query a fail count", "crm_failcount --query -r foo -N node1",
update_cib=True),
Test("Show node attributes with crm_simulate",
"crm_simulate --live-check --show-attrs"),
Test("Set a second transient node attribute",
"crm_attribute -n fail-count-bar -v 5 -N node1 -t status",
update_cib=True),
Test("Query transient node attributes by pattern",
"crm_attribute -t status -P fail-count -N node1 --query"),
Test("Update transient node attributes by pattern",
"crm_attribute -t status -P fail-count -N node1 -v 10",
update_cib=True),
Test("Delete transient node attributes by pattern",
"crm_attribute -t status -P fail-count -N node1 -D",
update_cib=True),
Test("crm_attribute given invalid delete usage",
"crm_attribute -t nodes -N node1 -D",
expected_rc=ExitStatus.USAGE),
Test("Set a utilization node attribute",
"crm_attribute -n cpu -v 1 -N node1 -z",
update_cib=True),
Test("Query utilization node attribute",
"crm_attribute --query -n cpu -N node1 -z"),
# This update will fail because it has version numbers
Test("Replace operation should fail",
"""cibadmin -Q | sed -e 's/epoch="[^"]*"/epoch="1"/' | cibadmin -R -p""",
expected_rc=ExitStatus.OLD),
]
promotable_tests = [
make_test_group("Query a nonexistent promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -G",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Delete a nonexistent promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -D"),
make_test_group("Query after deleting a nonexistent promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -G",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Update a nonexistent promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -v 1"),
make_test_group("Query after updating a nonexistent promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -G"),
make_test_group("Update an existing promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -v 5"),
make_test_group("Query after updating an existing promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -G"),
make_test_group("Delete an existing promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -D"),
make_test_group("Query after deleting an existing promotable score attribute",
"crm_attribute -N cluster01 -p promotable-rsc -G",
expected_rc=ExitStatus.NOSUCH),
]
# Test for an issue with legacy command line parsing when the resource is
# specified in the environment (CLBZ#5509)
ocf_rsc_instance_tests = [
make_test_group("Update a promotable score attribute to -INFINITY",
"crm_attribute -N cluster01 -p -v -INFINITY",
env={"OCF_RESOURCE_INSTANCE": "promotable-rsc"}),
make_test_group("Query after updating a promotable score attribute to -INFINITY",
"crm_attribute -N cluster01 -p -G",
env={"OCF_RESOURCE_INSTANCE": "promotable-rsc"}),
Test("Try OCF_RESOURCE_INSTANCE if -p is specified with an empty string",
"crm_attribute -N cluster01 -p '' -G",
env={"OCF_RESOURCE_INSTANCE": "promotable-rsc"}),
]
return options_tests + [
ShadowTestGroup(value_update_tests),
ShadowTestGroup(query_set_tests),
TestGroup(promotable_tests + ocf_rsc_instance_tests,
env={"OCF_RESOURCE_INSTANCE": "promotable-rsc"},
cib_gen=partial(copy_existing_cib, f"{cts_cli_data}/crm_mon.xml")),
]
class CrmStandbyRegressionTest(RegressionTest):
"""A class for testing crm_standby."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_standby"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
basic_tests = [
Test("Default standby value", "crm_standby -N node1 -G"),
Test("Set standby status", "crm_standby -N node1 -v true",
update_cib=True),
Test("Query standby value", "crm_standby -N node1 -G"),
Test("Delete standby value", "crm_standby -N node1 -D",
update_cib=True),
]
return [
ShadowTestGroup(basic_tests,
setup="""cibadmin -C -o nodes --xml-text ''"""),
]
class CrmResourceRegressionTest(RegressionTest):
"""A class for testing crm_resource."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_resource"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
options_tests = [
Test("crm_resource run with extra arguments", "crm_resource foo bar",
expected_rc=ExitStatus.USAGE),
Test("List all available resource options (invalid type)",
"crm_resource --list-options=asdf",
expected_rc=ExitStatus.USAGE),
Test("List all available resource options (invalid type)",
"crm_resource --list-options=asdf --output-as=xml",
expected_rc=ExitStatus.USAGE),
make_test_group("List non-advanced primitive meta-attributes",
"crm_resource --list-options=primitive"),
make_test_group("List all available primitive meta-attributes",
"crm_resource --list-options=primitive --all"),
make_test_group("List non-advanced fencing parameters",
"crm_resource --list-options=fencing"),
make_test_group("List all available fencing parameters",
"crm_resource --list-options=fencing --all"),
]
basic_tests = [
Test("Create a resource",
"""cibadmin -C -o resources --xml-text ''""",
update_cib=True),
Test("crm_resource given both -r and resource config",
"crm_resource -r xyz --class ocf --provider pacemaker --agent Dummy",
expected_rc=ExitStatus.USAGE),
Test("crm_resource given resource config with invalid action",
"crm_resource --class ocf --provider pacemaker --agent Dummy -D",
expected_rc=ExitStatus.USAGE),
Test("Create a resource meta attribute",
"crm_resource -r dummy --meta -p is-managed -v false",
update_cib=True),
Test("Query a resource meta attribute",
"crm_resource -r dummy --meta -g is-managed",
update_cib=True),
Test("Remove a resource meta attribute",
"crm_resource -r dummy --meta -d is-managed",
update_cib=True),
ValidatingTest("Create another resource meta attribute",
"crm_resource -r dummy --meta -p target-role -v Stopped --output-as=xml"),
ValidatingTest("Show why a resource is not running",
"crm_resource -Y -r dummy --output-as=xml"),
ValidatingTest("Remove another resource meta attribute",
"crm_resource -r dummy --meta -d target-role --output-as=xml"),
ValidatingTest("Get a non-existent attribute from a resource element",
"crm_resource -r dummy --get-parameter nonexistent --element --output-as=xml"),
make_test_group("Get a non-existent attribute from a resource element",
"crm_resource -r dummy --get-parameter nonexistent --element",
update_cib=True),
Test("Get an existent attribute from a resource element",
"crm_resource -r dummy --get-parameter class --element",
update_cib=True),
ValidatingTest("Set a non-existent attribute for a resource element",
"crm_resource -r dummy --set-parameter=description -v test_description --element --output-as=xml",
update_cib=True),
ValidatingTest("Set an existent attribute for a resource element",
"crm_resource -r dummy --set-parameter=description -v test_description --element --output-as=xml",
update_cib=True),
ValidatingTest("Delete an existent attribute for a resource element",
"crm_resource -r dummy -d description --element --output-as=xml",
update_cib=True),
ValidatingTest("Delete a non-existent attribute for a resource element",
"crm_resource -r dummy -d description --element --output-as=xml",
update_cib=True),
Test("Set a non-existent attribute for a resource element",
"crm_resource -r dummy --set-parameter=description -v test_description --element",
update_cib=True),
Test("Set an existent attribute for a resource element",
"crm_resource -r dummy --set-parameter=description -v test_description --element",
update_cib=True),
Test("Delete an existent attribute for a resource element",
"crm_resource -r dummy -d description --element",
update_cib=True),
Test("Delete a non-existent attribute for a resource element",
"crm_resource -r dummy -d description --element",
update_cib=True),
Test("Create a resource attribute", "crm_resource -r dummy -p delay -v 10s",
update_cib=True),
make_test_group("List the configured resources", "crm_resource -L",
update_cib=True),
Test("Implicitly list the configured resources", "crm_resource"),
Test("List IDs of instantiated resources", "crm_resource -l"),
make_test_group("Show XML configuration of resource", "crm_resource -q -r dummy"),
Test("Require a destination when migrating a resource that is stopped",
"crm_resource -r dummy -M",
update_cib=True, expected_rc=ExitStatus.USAGE),
Test("Don't support migration to non-existent locations",
"crm_resource -r dummy -M -N i.do.not.exist",
update_cib=True, expected_rc=ExitStatus.NOSUCH),
Test("Create a fencing resource",
"""cibadmin -C -o resources --xml-text ''""",
update_cib=True),
Test("Bring resources online", "crm_simulate --live-check --in-place",
update_cib=True),
Test("Try to move a resource to its existing location",
"crm_resource -r dummy --move --node node1",
update_cib=True, expected_rc=ExitStatus.EXISTS),
Test("Try to move a resource that doesn't exist",
"crm_resource -r xyz --move --node node1",
expected_rc=ExitStatus.NOSUCH),
Test("Move a resource from its existing location",
"crm_resource -r dummy --move",
update_cib=True),
Test("Clear out constraints generated by --move",
"crm_resource -r dummy --clear",
update_cib=True),
Test("Ban a resource on unknown node",
"crm_resource -r dummy -B -N host1",
expected_rc=ExitStatus.NOSUCH),
Test("Create two more nodes and bring them online",
"crm_simulate --live-check --in-place --node-up=node2 --node-up=node3",
update_cib=True),
Test("Ban dummy from node1", "crm_resource -r dummy -B -N node1",
update_cib=True),
Test("Show where a resource is running", "crm_resource -r dummy -W"),
Test("Show constraints on a resource", "crm_resource -a -r dummy"),
ValidatingTest("Ban dummy from node2",
"crm_resource -r dummy -B -N node2 --output-as=xml",
update_cib=True),
Test("Relocate resources due to ban",
"crm_simulate --live-check --in-place -S",
update_cib=True),
ValidatingTest("Move dummy to node1",
"crm_resource -r dummy -M -N node1 --output-as=xml",
update_cib=True),
Test("Clear implicit constraints for dummy on node2",
"crm_resource -r dummy -U -N node2",
update_cib=True),
Test("Drop the status section",
"cibadmin -R -o status --xml-text ''"),
Test("Create a clone",
"""cibadmin -C -o resources --xml-text ''"""),
Test("Create a resource meta attribute",
"crm_resource -r test-primitive --meta -p is-managed -v false",
update_cib=True),
Test("Create a resource meta attribute in the primitive",
"crm_resource -r test-primitive --meta -p is-managed -v false --force",
update_cib=True),
Test("Update resource meta attribute with duplicates",
"crm_resource -r test-clone --meta -p is-managed -v true",
update_cib=True),
Test("Update resource meta attribute with duplicates (force clone)",
"crm_resource -r test-clone --meta -p is-managed -v true --force",
update_cib=True),
Test("Update child resource meta attribute with duplicates",
"crm_resource -r test-primitive --meta -p is-managed -v false",
update_cib=True),
Test("Delete resource meta attribute with duplicates",
"crm_resource -r test-clone --meta -d is-managed",
update_cib=True),
Test("Delete resource meta attribute in parent",
"crm_resource -r test-primitive --meta -d is-managed",
update_cib=True),
Test("Create a resource meta attribute in the primitive",
"crm_resource -r test-primitive --meta -p is-managed -v false --force",
update_cib=True),
Test("Update existing resource meta attribute",
"crm_resource -r test-clone --meta -p is-managed -v true",
update_cib=True),
Test("Create a resource meta attribute in the parent",
"crm_resource -r test-clone --meta -p is-managed -v true --force",
update_cib=True),
Test("Delete resource parent meta attribute (force)",
"crm_resource -r test-clone --meta -d is-managed --force",
update_cib=True),
# Restore meta-attributes before running this test
Test("Delete resource child meta attribute",
"crm_resource -r test-primitive --meta -d is-managed",
setup=["crm_resource -r test-primitive --meta -p is-managed -v true --force",
"crm_resource -r test-clone --meta -p is-managed -v true --force"],
update_cib=True),
Test("Create the dummy-group resource group",
"""cibadmin -C -o resources --xml-text '"""
""""""
""""""
"""'""",
update_cib=True),
Test("Create a resource meta attribute in dummy1",
"crm_resource -r dummy1 --meta -p is-managed -v true",
update_cib=True),
Test("Create a resource meta attribute in dummy-group",
"crm_resource -r dummy-group --meta -p is-managed -v false",
update_cib=True),
Test("Delete the dummy-group resource group",
"cibadmin -D -o resources --xml-text ''",
update_cib=True),
Test("Specify a lifetime when moving a resource",
"crm_resource -r dummy --move --node node2 --lifetime=PT1H",
update_cib=True),
Test("Try to move a resource previously moved with a lifetime",
"crm_resource -r dummy --move --node node1",
update_cib=True),
Test("Ban dummy from node1 for a short time",
"crm_resource -r dummy -B -N node1 --lifetime=PT1S",
update_cib=True),
Test("Remove expired constraints",
"sleep 2 && crm_resource --clear --expired",
update_cib=True),
# Clear has already been tested elsewhere, but we need to get rid of the
# constraints so testing delete works. It won't delete if there's still
# a reference to the resource somewhere.
Test("Clear all implicit constraints for dummy",
"crm_resource -r dummy -U",
update_cib=True),
Test("Set a node health strategy",
"crm_attribute -n node-health-strategy -v migrate-on-red",
update_cib=True),
Test("Set a node health attribute",
"crm_attribute -N node3 -n '#health-cts-cli' -v red",
update_cib=True),
ValidatingTest("Show why a resource is not running on an unhealthy node",
"crm_resource -N node3 -Y -r dummy --output-as=xml"),
Test("Delete a resource",
"crm_resource -D -r dummy -t primitive",
update_cib=True),
]
constraint_tests = []
for rsc in ["prim1", "prim2", "prim3", "prim4", "prim5", "prim6", "prim7",
"prim8", "prim9", "prim10", "prim11", "prim12", "prim13",
"group", "clone"]:
constraint_tests.extend([
make_test_group(f"Check locations and constraints for {rsc}",
f"crm_resource -a -r {rsc}"),
make_test_group(f"Recursively check locations and constraints for {rsc}",
f"crm_resource -A -r {rsc}"),
])
constraint_tests.extend([
Test("Check locations and constraints for group member (referring to group)",
"crm_resource -a -r gr2"),
Test("Check locations and constraints for group member (without referring to group)",
"crm_resource -a -r gr2 --force"),
])
colocation_tests = [
ValidatingTest("Set a meta-attribute for primitive and resources colocated with it",
"crm_resource -r prim5 --meta --set-parameter=target-role -v Stopped --recursive --output-as=xml"),
Test("Set a meta-attribute for group and resource colocated with it",
"crm_resource -r group --meta --set-parameter=target-role -v Stopped --recursive"),
ValidatingTest("Set a meta-attribute for clone and resource colocated with it",
"crm_resource -r clone --meta --set-parameter=target-role -v Stopped --recursive --output-as=xml"),
]
digest_tests = [
ValidatingTest("Show resource digests",
"crm_resource --digests -r rsc1 -N node1 --output-as=xml"),
Test("Show resource digests with overrides",
"crm_resource --digests -r rsc1 -N node1 --output-as=xml CRM_meta_interval=10000 CRM_meta_timeout=20000"),
make_test_group("Show resource operations", "crm_resource --list-operations"),
]
basic2_tests = [
make_test_group("List a promotable clone resource",
"crm_resource --locate -r promotable-clone"),
make_test_group("List the primitive of a promotable clone resource",
"crm_resource --locate -r promotable-rsc"),
make_test_group("List a single instance of a promotable clone resource",
"crm_resource --locate -r promotable-rsc:0"),
make_test_group("List another instance of a promotable clone resource",
"crm_resource --locate -r promotable-rsc:1"),
Test("Try to move an instance of a cloned resource",
"crm_resource -r promotable-rsc:0 --move --node cluster01",
expected_rc=ExitStatus.INVALID_PARAM),
]
basic_tests_setup = [
"crm_attribute -n no-quorum-policy -v ignore",
"crm_simulate --live-check --in-place --node-up=node1"
]
return options_tests + [
ShadowTestGroup(basic_tests, setup=basic_tests_setup),
TestGroup(constraint_tests, env={"CIB_file": f"{cts_cli_data}/constraints.xml"}),
TestGroup(colocation_tests, cib_gen=partial(copy_existing_cib, f"{cts_cli_data}/constraints.xml")),
TestGroup(digest_tests, env={"CIB_file": f"{cts_cli_data}/crm_resource_digests.xml"}),
TestGroup(basic2_tests, env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"}),
ValidatingTest("Check that CIB_file=\"-\" works - crm_resource",
"crm_resource --digests -r rsc1 -N node1 --output-as=xml",
env={"CIB_file": "-"},
stdin=pathlib.Path(f"{cts_cli_data}/crm_resource_digests.xml")),
]
class CrmTicketRegressionTest(RegressionTest):
"""A class for testing crm_ticket."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_ticket"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
basic_tests = [
Test("Default ticket granted state",
"crm_ticket -t ticketA -G granted -d false"),
Test("Set ticket granted state", "crm_ticket -t ticketA -r --force",
update_cib=True),
make_test_group("List ticket IDs", "crm_ticket -w"),
make_test_group("Query ticket state", "crm_ticket -t ticketA -q"),
make_test_group("Query ticket granted state",
"crm_ticket -t ticketA -G granted"),
Test("Delete ticket granted state",
"crm_ticket -t ticketA -D granted --force",
update_cib=True),
Test("Make a ticket standby", "crm_ticket -t ticketA -s",
update_cib=True),
Test("Query ticket standby state", "crm_ticket -t ticketA -G standby"),
Test("Activate a ticket", "crm_ticket -t ticketA -a",
update_cib=True),
make_test_group("List ticket details", "crm_ticket -L -t ticketA"),
Test("Add a second ticket", "crm_ticket -t ticketB -G granted -d false",
update_cib=True),
Test("Set second ticket granted state",
"crm_ticket -t ticketB -r --force",
update_cib=True),
make_test_group("List tickets", "crm_ticket -l"),
Test("Delete second ticket",
"""cibadmin --delete --xml-text ''""",
update_cib=True),
Test("Delete ticket standby state", "crm_ticket -t ticketA -D standby",
update_cib=True),
Test("Add a constraint to a ticket",
"""cibadmin -C -o constraints --xml-text ''""",
update_cib=True),
make_test_group("Query ticket constraints", "crm_ticket -t ticketA -c"),
Test("Delete ticket constraint",
"""cibadmin --delete --xml-text ''""",
update_cib=True),
]
basic_tests_setup = [
"""cibadmin -C -o crm_config --xml-text ''""",
"""cibadmin -C -o resources --xml-text ''"""
]
return [
ShadowTestGroup(basic_tests, setup=basic_tests_setup),
]
class CrmadminRegressionTest(RegressionTest):
"""A class for testing crmadmin."""
@property
def name(self):
"""Return the name of this regression test."""
return "crmadmin"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
basic_tests = [
make_test_group("List all nodes", "crmadmin -N"),
make_test_group("Minimally list all nodes", "crmadmin -N -q"),
Test("List all nodes as bash exports", "crmadmin -N -B"),
make_test_group("List cluster nodes",
"crmadmin -N cluster"),
make_test_group("List guest nodes",
"crmadmin -N guest"),
make_test_group("List remote nodes",
"crmadmin -N remote"),
make_test_group("List cluster,remote nodes",
"crmadmin -N cluster,remote"),
make_test_group("List guest,remote nodes",
"crmadmin -N guest,remote"),
]
return [
TestGroup(basic_tests,
env={"CIB_file": f"{cts_cli_data}/crmadmin-cluster-remote-guest-nodes.xml"}),
Test("Check that CIB_file=\"-\" works", "crmadmin -N",
env={"CIB_file": "-"},
stdin=pathlib.Path(f"{cts_cli_data}/crmadmin-cluster-remote-guest-nodes.xml")),
]
class CrmShadowRegressionTest(RegressionTest):
"""A class for testing crm_shadow."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_shadow"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
no_instance_tests = [
make_test_group("Get active shadow instance (no active instance)",
"crm_shadow --which",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Get active shadow instance's file name (no active instance)",
"crm_shadow --file",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Get active shadow instance's contents (no active instance)",
"crm_shadow --display",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Get active shadow instance's diff (no active instance)",
"crm_shadow --diff",
expected_rc=ExitStatus.NOSUCH),
]
# Create new shadow instance based on active CIB
# Don't use create_shadow_cib() here; test explicitly
new_instance_tests = [
make_test_group("Create copied shadow instance",
f"crm_shadow --create {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force"),
# Query shadow instance based on active CIB
make_test_group("Get active shadow instance (copied)",
"crm_shadow --which"),
make_test_group("Get active shadow instance's file name (copied)",
"crm_shadow --file"),
make_test_group("Get active shadow instance's contents (copied)",
"crm_shadow --display"),
make_test_group("Get active shadow instance's diff (copied)",
"crm_shadow --diff"),
]
# Make some changes to the shadow file
modify_cib = """export CIB_file=$(crm_shadow --file) && """ \
"""cibadmin --modify --xml-text '' && """ \
"""cibadmin --delete --xml-text '' && """ \
"""cibadmin --create -o resources --xml-text '' && """ \
"""cibadmin --create -o status --xml-text ''"""
more_tests = [
# We can't use make_test_group() here because we only want to run
# the modify_cib setup code once, and make_test_group will pass all
# kwargs to every instance it creates.
Test("Get active shadow instance's diff (after changes)",
"crm_shadow --diff",
setup=modify_cib, expected_rc=ExitStatus.ERROR),
ValidatingTest("Get active shadow instance's diff (after changes)",
"crm_shadow --diff --output-as=xml",
expected_rc=ExitStatus.ERROR),
TestGroup([
# Commit the modified shadow CIB to a temp active CIB file
Test("Commit shadow instance",
f"crm_shadow --commit {SHADOW_NAME}",
expected_rc=ExitStatus.USAGE),
Test("Commit shadow instance (force)",
f"crm_shadow --commit {SHADOW_NAME} --force"),
Test("Get active shadow instance's diff (after commit)",
"crm_shadow --diff",
expected_rc=ExitStatus.ERROR),
Test("Commit shadow instance (force) (all)",
f"crm_shadow --commit {SHADOW_NAME} --force --all"),
Test("Get active shadow instance's diff (after commit all)",
"crm_shadow --diff",
expected_rc=ExitStatus.ERROR),
], cib_gen=partial(copy_existing_cib, f"{cts_cli_data}/crm_mon.xml")),
TestGroup([
# Repeat sequence with XML output
ValidatingTest("Commit shadow instance",
f"crm_shadow --commit {SHADOW_NAME} --output-as=xml",
expected_rc=ExitStatus.USAGE),
ValidatingTest("Commit shadow instance (force)",
f"crm_shadow --commit {SHADOW_NAME} --force --output-as=xml"),
ValidatingTest("Get active shadow instance's diff (after commit)",
"crm_shadow --diff --output-as=xml",
expected_rc=ExitStatus.ERROR),
ValidatingTest("Commit shadow instance (force) (all)",
f"crm_shadow --commit {SHADOW_NAME} --force --all --output-as=xml"),
ValidatingTest("Get active shadow instance's diff (after commit all)",
"crm_shadow --diff --output-as=xml",
expected_rc=ExitStatus.ERROR),
# Commit an inactive shadow instance with no active instance
make_test_group("Commit shadow instance (no active instance)",
f"crm_shadow --commit {SHADOW_NAME}",
env={"CIB_shadow": None},
expected_rc=ExitStatus.USAGE),
make_test_group("Commit shadow instance (no active instance) (force)",
f"crm_shadow --commit {SHADOW_NAME} --force",
env={"CIB_shadow": None}),
# Commit an inactive shadow instance with an active instance
make_test_group("Commit shadow instance (mismatch)",
f"crm_shadow --commit {SHADOW_NAME}",
env={"CIB_shadow": "nonexistent_shadow"},
expected_rc=ExitStatus.USAGE),
make_test_group("Commit shadow instance (mismatch) (force)",
f"crm_shadow --commit {SHADOW_NAME} --force",
env={"CIB_shadow": "nonexistent_shadow"}),
# Commit an active shadow instance whose shadow file is missing
make_test_group("Commit shadow instance (nonexistent shadow file)",
"crm_shadow --commit nonexistent_shadow",
env={"CIB_shadow": "nonexistent_shadow"},
expected_rc=ExitStatus.USAGE),
make_test_group("Commit shadow instance (nonexistent shadow file) (force)",
"crm_shadow --commit nonexistent_shadow --force",
env={"CIB_shadow": "nonexistent_shadow"},
expected_rc=ExitStatus.NOSUCH),
make_test_group("Get active shadow instance's diff (nonexistent shadow file)",
"crm_shadow --diff",
env={"CIB_shadow": "nonexistent_shadow"},
expected_rc=ExitStatus.NOSUCH),
# Commit an active shadow instance when the CIB file is missing
make_test_group("Commit shadow instance (nonexistent CIB file)",
f"crm_shadow --commit {SHADOW_NAME}",
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"},
expected_rc=ExitStatus.USAGE),
make_test_group("Commit shadow instance (nonexistent CIB file) (force)",
f"crm_shadow --commit {SHADOW_NAME} --force",
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"},
expected_rc=ExitStatus.NOSUCH),
make_test_group("Get active shadow instance's diff (nonexistent CIB file)",
"crm_shadow --diff",
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"},
expected_rc=ExitStatus.NOSUCH),
], cib_gen=partial(copy_existing_cib, f"{cts_cli_data}/crm_mon.xml")),
]
delete_1_tests = [
# Delete an active shadow instance
Test("Delete shadow instance", f"crm_shadow --delete {SHADOW_NAME}",
expected_rc=ExitStatus.USAGE),
Test("Delete shadow instance (force)", f"crm_shadow --delete {SHADOW_NAME} --force"),
ShadowTestGroup([
ValidatingTest("Delete shadow instance",
f"crm_shadow --delete {SHADOW_NAME} --output-as=xml",
expected_rc=ExitStatus.USAGE),
ValidatingTest("Delete shadow instance (force)",
f"crm_shadow --delete {SHADOW_NAME} --force --output-as=xml"),
])
]
delete_2_tests = [
# Delete an inactive shadow instance with no active instance
Test("Delete shadow instance (no active instance)",
f"crm_shadow --delete {SHADOW_NAME}",
expected_rc=ExitStatus.USAGE),
Test("Delete shadow instance (no active instance) (force)",
f"crm_shadow --delete {SHADOW_NAME} --force"),
]
delete_3_tests = [
ValidatingTest("Delete shadow instance (no active instance)",
f"crm_shadow --delete {SHADOW_NAME} --output-as=xml",
expected_rc=ExitStatus.USAGE),
ValidatingTest("Delete shadow instance (no active instance) (force)",
f"crm_shadow --delete {SHADOW_NAME} --force --output-as=xml"),
]
delete_4_tests = [
# Delete an inactive shadow instance with an active instance
Test("Delete shadow instance (mismatch)",
f"crm_shadow --delete {SHADOW_NAME}",
expected_rc=ExitStatus.USAGE),
Test("Delete shadow instance (mismatch) (force)",
f"crm_shadow --delete {SHADOW_NAME} --force"),
]
delete_5_tests = [
ValidatingTest("Delete shadow instance (mismatch)",
f"crm_shadow --delete {SHADOW_NAME} --output-as=xml",
expected_rc=ExitStatus.USAGE),
ValidatingTest("Delete shadow instance (mismatch) (force)",
f"crm_shadow --delete {SHADOW_NAME} --force --output-as=xml"),
# Delete an active shadow instance whose shadow file is missing
Test("Delete shadow instance (nonexistent shadow file)",
"crm_shadow --delete nonexistent_shadow",
expected_rc=ExitStatus.USAGE),
Test("Delete shadow instance (nonexistent shadow file) (force)",
"crm_shadow --delete nonexistent_shadow --force"),
ValidatingTest("Delete shadow instance (nonexistent shadow file)",
"crm_shadow --delete nonexistent_shadow --output-as=xml",
expected_rc=ExitStatus.USAGE),
ValidatingTest("Delete shadow instance (nonexistent shadow file) (force)",
"crm_shadow --delete nonexistent_shadow --force --output-as=xml"),
]
delete_6_tests = [
# Delete an active shadow instance when the CIB file is missing
Test("Delete shadow instance (nonexistent CIB file)",
f"crm_shadow --delete {SHADOW_NAME}",
expected_rc=ExitStatus.USAGE),
Test("Delete shadow instance (nonexistent CIB file) (force)",
f"crm_shadow --delete {SHADOW_NAME} --force"),
]
delete_7_tests = [
ValidatingTest("Delete shadow instance (nonexistent CIB file)",
f"crm_shadow --delete {SHADOW_NAME} --output-as=xml",
expected_rc=ExitStatus.USAGE),
ValidatingTest("Delete shadow instance (nonexistent CIB file) (force)",
f"crm_shadow --delete {SHADOW_NAME} --force --output-as=xml"),
]
create_1_tests = [
# Create new shadow instance based on active CIB with no instance active
make_test_group("Create copied shadow instance (no active instance)",
f"crm_shadow --create {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force",
env={"CIB_shadow": None}),
# Create new shadow instance based on active CIB with other instance active
make_test_group("Create copied shadow instance (mismatch)",
f"crm_shadow --create {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force",
env={"CIB_shadow": "nonexistent_shadow"}),
# Create new shadow instance based on CIB (shadow file already exists)
make_test_group("Create copied shadow instance (file already exists)",
f"crm_shadow --create {SHADOW_NAME} --batch",
expected_rc=ExitStatus.CANTCREAT),
make_test_group("Create copied shadow instance (file already exists) (force)",
f"crm_shadow --create {SHADOW_NAME} --batch --force"),
# Create new shadow instance based on active CIB when the CIB file is missing
make_test_group("Create copied shadow instance (nonexistent CIB file) (force)",
f"crm_shadow --create {SHADOW_NAME} --batch --force",
expected_rc=ExitStatus.NOSUCH,
setup=f"crm_shadow --delete {SHADOW_NAME} --force",
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"}),
]
create_2_tests = [
# Create new empty shadow instance
make_test_group("Create empty shadow instance",
f"crm_shadow --create-empty {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force"),
# Create empty shadow instance with no active instance
make_test_group("Create empty shadow instance (no active instance)",
f"crm_shadow --create-empty {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force",
env={"CIB_shadow": None}),
# Create empty shadow instance with other instance active
make_test_group("Create empty shadow instance (mismatch)",
f"crm_shadow --create-empty {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force",
env={"CIB_shadow": "nonexistent_shadow"}),
# Create empty shadow instance when the CIB file is missing
make_test_group("Create empty shadow instance (nonexistent CIB file)",
f"crm_shadow --create-empty {SHADOW_NAME} --batch",
setup=f"crm_shadow --delete {SHADOW_NAME} --force",
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"}),
# Create empty shadow instance (shadow file already exists)
make_test_group("Create empty shadow instance (file already exists)",
f"crm_shadow --create-empty {SHADOW_NAME} --batch",
expected_rc=ExitStatus.CANTCREAT),
make_test_group("Create empty shadow instance (file already exists) (force)",
f"crm_shadow --create-empty {SHADOW_NAME} --batch --force"),
# Query shadow instance with an empty CIB.
# --which and --file queries were done earlier.
TestGroup([
make_test_group("Get active shadow instance's contents (empty CIB)",
"crm_shadow --display"),
make_test_group("Get active shadow instance's diff (empty CIB)",
"crm_shadow --diff",
expected_rc=ExitStatus.ERROR),
], setup=delete_shadow_resource_defaults),
]
reset_1_tests = [
Test("Resetting active shadow instance to active CIB requires force",
f"crm_shadow --reset {SHADOW_NAME} --batch",
expected_rc=ExitStatus.USAGE),
Test("Reset active shadow instance to active CIB",
f"crm_shadow --reset {SHADOW_NAME} --batch --force"),
Test("Active shadow instance no different from active CIB after reset",
"crm_shadow --diff"),
Test("Active shadow instance differs from active CIB after change",
"crm_shadow --diff",
setup="crm_attribute -n admin_epoch -v 99",
expected_rc=ExitStatus.ERROR),
ValidatingTest("Reset active shadow instance to active CIB",
f"crm_shadow --reset {SHADOW_NAME} --batch --force --output-as=xml"),
ValidatingTest("Active shadow instance no different from active CIB after reset",
"crm_shadow --diff --output-as=xml"),
ValidatingTest("Active shadow instance differs from active CIB after change",
"crm_shadow --diff --output-as=xml",
setup="crm_attribute -n admin_epoch -v 199",
expected_rc=ExitStatus.ERROR),
make_test_group("Reset shadow instance to active CIB with nonexistent shadow file",
f"crm_shadow --reset {SHADOW_NAME} --batch --force",
setup=f"crm_shadow --delete {SHADOW_NAME} --force"),
Test("Active shadow instance no different from active CIB after force-reset",
"crm_shadow --diff"),
]
reset_2_tests = [
make_test_group("Reset inactive shadow instance (none active) to active CIB",
f"crm_shadow --reset {SHADOW_NAME} --force --batch"),
]
reset_3_tests = [
make_test_group("Reset inactive shadow instance while another instance active",
f"crm_shadow --reset {SHADOW_NAME} --batch --force"),
]
reset_4_tests = [
make_test_group("Reset shadow instance with nonexistent CIB",
f"crm_shadow --reset {SHADOW_NAME} --batch --force",
expected_rc=ExitStatus.NOSUCH),
]
# Switch shadow instances
switch_tests = [
make_test_group("Switch to new shadow instance",
f"crm_shadow --switch {SHADOW_NAME} --batch"),
TestGroup([
make_test_group("Switch to nonexistent shadow instance",
f"crm_shadow --switch {SHADOW_NAME} --batch",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Switch to nonexistent shadow instance (force)",
f"crm_shadow --switch {SHADOW_NAME} --batch --force",
expected_rc=ExitStatus.NOSUCH),
], setup=f"crm_shadow --delete {SHADOW_NAME} --force"),
]
return no_instance_tests + [
ShadowTestGroup(new_instance_tests + more_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"},
create=False),
ShadowTestGroup(delete_1_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"}),
ShadowTestGroup(delete_2_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml",
"CIB_shadow": None}),
ShadowTestGroup(delete_3_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml",
"CIB_shadow": None}),
ShadowTestGroup(delete_4_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml",
"CIB_shadow": "nonexistent_shadow"}),
ShadowTestGroup(delete_5_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml",
"CIB_shadow": "nonexistent_shadow"}),
ShadowTestGroup(delete_6_tests,
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"}),
ShadowTestGroup(delete_7_tests,
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"}),
ShadowTestGroup(create_1_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"},
create=False),
ShadowTestGroup(create_2_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"},
create=False),
ShadowTestGroup(reset_1_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"}),
ShadowTestGroup(reset_2_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml",
"CIB_shadow": None}),
ShadowTestGroup(reset_3_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml",
"CIB_shadow": "nonexistent_shadow"}),
ShadowTestGroup(reset_4_tests,
env={"CIB_file": f"{cts_cli_data}/nonexistent_cib.xml"}),
ShadowTestGroup(switch_tests,
env={"CIB_shadow": "nonexistent_shadow"},
create_empty=True),
]
class CrmVerifyRegressionTest(RegressionTest):
"""A class for testing crm_verify."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_verify"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
invalid_tests = [
make_test_group("Verify a file-specified invalid configuration",
f"crm_verify --xml-file {cts_cli_data}/crm_verify_invalid_bz.xml",
expected_rc=ExitStatus.CONFIG),
make_test_group("Verify a file-specified invalid configuration (verbose)",
f"crm_verify --xml-file {cts_cli_data}/crm_verify_invalid_bz.xml --verbose",
expected_rc=ExitStatus.CONFIG),
make_test_group("Verify a file-specified invalid configuration (quiet)",
f"crm_verify --xml-file {cts_cli_data}/crm_verify_invalid_bz.xml --quiet",
expected_rc=ExitStatus.CONFIG),
ValidatingTest("Verify another file-specified invalid configuration",
f"crm_verify --xml-file {cts_cli_data}/crm_verify_invalid_no_stonith.xml --output-as=xml",
expected_rc=ExitStatus.CONFIG),
]
with open(f"{test_home}/cli/crm_mon.xml", encoding="utf-8") as f:
cib_contents = f.read()
valid_tests = [
ValidatingTest("Verify a file-specified valid configuration",
f"crm_verify --xml-file {cts_cli_data}/crm_mon.xml --output-as=xml"),
ValidatingTest("Verify a piped-in valid configuration",
"crm_verify -p --output-as=xml",
stdin=pathlib.Path(f"{cts_cli_data}/crm_mon.xml")),
ValidatingTest("Verbosely verify a file-specified valid configuration",
f"crm_verify --xml-file {cts_cli_data}/crm_mon.xml --output-as=xml --verbose"),
ValidatingTest("Verbosely verify a piped-in valid configuration",
"crm_verify -p --output-as=xml --verbose",
stdin=pathlib.Path(f"{cts_cli_data}/crm_mon.xml")),
ValidatingTest("Verify a string-supplied valid configuration",
f"crm_verify -X '{cib_contents}' --output-as=xml"),
ValidatingTest("Verbosely verify a string-supplied valid configuration",
f"crm_verify -X '{cib_contents}' --output-as=xml --verbose"),
]
return invalid_tests + valid_tests
class CrmSimulateRegressionTest(RegressionTest):
"""A class for testing crm_simulate."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_simulate"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
good_cib = """
"""
bad_cib = good_cib.replace("start", "break")
bad_version_cib = good_cib.replace("pacemaker-1.2", "pacemaker-9999.0")
recoverable_cib = good_cib.replace("", "")
no_version_cib = good_cib.replace('validate-with="pacemaker-1.2" ', "")
no_version_bad_cib = bad_version_cib.replace('epoch="3"', 'epoch="30"').replace("start", "break")
basic_tests = [
Test("Show allocation scores with crm_simulate",
f"crm_simulate -x {cts_cli_data}/crm_mon.xml --show-scores --output-as=xml"),
Test("Show utilization with crm_simulate",
f"crm_simulate -x {cts_cli_data}/crm_mon.xml --show-utilization"),
Test("Simulate injecting a failure",
f"crm_simulate -x {cts_cli_data}/crm_mon.xml -S -i ping_monitor_10000@cluster02=1"),
Test("Simulate bringing a node down",
f"crm_simulate -x {cts_cli_data}/crm_mon.xml -S --node-down=cluster01"),
Test("Simulate a node failing",
f"crm_simulate -x {cts_cli_data}/crm_mon.xml -S --node-fail=cluster02"),
Test("Run crm_simulate with invalid CIB (enum violation)",
"crm_simulate -p -S",
stdin=bad_cib,
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema"},
expected_rc=ExitStatus.CONFIG),
Test("Run crm_simulate with invalid CIB (unrecognized validate-with)",
"crm_simulate -p -S",
stdin=bad_version_cib,
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema"},
expected_rc=ExitStatus.CONFIG),
Test("Run crm_simulate with invalid, but possibly recoverable CIB (valid with X.Y+1)",
"crm_simulate -p -S",
stdin=recoverable_cib,
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema"}),
Test("Run crm_simulate with valid CIB, but without validate-with attribute",
"crm_simulate -p -S",
stdin=no_version_cib,
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema"},
expected_rc=ExitStatus.CONFIG),
Test("Run crm_simulate with invalid CIB, also without validate-with attribute",
"crm_simulate -p -S",
stdin=no_version_bad_cib,
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema"},
expected_rc=ExitStatus.CONFIG),
]
return [
ShadowTestGroup(basic_tests, create=False,
env={"CIB_shadow": None}),
]
class CrmDiffRegressionTest(RegressionTest):
"""A class for testing crm_diff."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_diff"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
old_file = f"{cts_cli_data}/crm_diff_old.xml"
new_file = f"{cts_cli_data}/crm_diff_new.xml"
patch_file = f"{cts_cli_data}/crm_diff_patchset.xml"
cib_patch_file = f"{cts_cli_data}/crm_diff_patchset_cib.xml"
# Enclose the strings in quotes now rather than in the command lines
with open(f"{cts_cli_data}/crm_diff_old.xml", "r") as file:
old_str = f"'{file.read()}'"
with open(f"{cts_cli_data}/crm_diff_new.xml", "r") as file:
new_str = f"'{file.read()}'"
return [
make_test_group("Create an XML patchset from files",
f"crm_diff -o {old_file} -n {new_file}",
expected_rc=ExitStatus.ERROR),
make_test_group("Create an XML patchset from strings",
f"crm_diff -O {old_str} -N {new_str}",
expected_rc=ExitStatus.ERROR),
make_test_group("Create an XML patchset from old file, new string",
f"crm_diff -o {old_file} -N {new_str}",
expected_rc=ExitStatus.ERROR),
make_test_group("Create an XML patchset from old string, new file",
f"crm_diff -O {old_str} -n {new_file}",
expected_rc=ExitStatus.ERROR),
make_test_group("Create an XML patchset as CIB",
f"crm_diff -o {old_file} -n {new_file} --cib",
expected_rc=ExitStatus.ERROR),
make_test_group("Create an XML patchset with no versions",
f"crm_diff -o {old_file} -n {new_file} --no-version",
expected_rc=ExitStatus.ERROR),
make_test_group("Create an XML patchset as CIB, with no versions",
f"crm_diff -o {old_file} -n {new_file} --cib --no-version",
expected_rc=ExitStatus.USAGE),
# Patch must be a file (cannot be a string).
#
# patch_file was generated using the following command:
#
# # crm_diff -o {old_file} -n {new_file}
#
make_test_group("Apply an XML patchset to a file",
f"crm_diff -o {old_file} -p {patch_file}"),
make_test_group("Apply an XML patchset to a string",
f"crm_diff -O {old_str} -p {patch_file}"),
make_test_group("Apply an XML patchset as CIB",
f"crm_diff -o {old_file} -p {patch_file} --cib"),
make_test_group("Apply an XML patchset with no versions",
f"crm_diff -o {old_file} -p {patch_file} --no-version"),
make_test_group("Apply an XML patchset as CIB, with no versions",
f"crm_diff -o {old_file} -p {patch_file} --cib --no-version",
expected_rc=ExitStatus.USAGE),
# cib_patch_file was generated using the following command:
#
# # crm_diff -o {old_file} -n {new_file} --cib
#
# Thus a digest was added to the patchset, and attribute position
# changes were ignored.
#
# @FIXME Currently these all fail due to digest mismatch. The issue
# goes back to at least Pacemaker 1.1.24. However, note that they
# fail with a generic error code, not a digest error code.
#
# It seems reasonable that a patchset generated by crm_diff should
# possible to apply to the old XML using crm_diff.
make_test_group("Apply an XML patchset generated as CIB",
f"crm_diff -o {old_file} -p {cib_patch_file}",
expected_rc=ExitStatus.ERROR),
make_test_group("Apply an XML patchset generated as CIB, as CIB",
f"crm_diff -o {old_file} -p {cib_patch_file} --cib",
expected_rc=ExitStatus.ERROR),
make_test_group("Apply an XML patchset generated as CIB, with no versions",
f"crm_diff -o {old_file} -p {cib_patch_file} --no-version",
expected_rc=ExitStatus.ERROR),
# @TODO We could add tests where the old and new CIBs have the same
# version info. In that case, at the time of writing, generating a
# patchset with --cib and then trying to apply it will result in
# ExitStatus.OLD.
]
class CrmMonRegressionTest(RegressionTest):
"""A class for testing crm_mon."""
@property
def name(self):
"""Return the name of this regression test."""
return "crm_mon"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
basic_tests = [
make_test_group("Basic output", "crm_mon -1"),
make_test_group("Output without node section",
"crm_mon -1 --exclude=nodes"),
# The next test doesn't need to be performed for other output formats. It's
# really just a test to make sure that blank lines are correct.
Test("Output with only the node section",
"crm_mon -1 --exclude=all --include=nodes"),
# XML includes everything already so there's no need for a complete test
Test("Complete text output", "crm_mon -1 --include=all"),
# XML includes detailed output already
Test("Complete text output with detail", "crm_mon -1R --include=all"),
Test("Complete brief text output", "crm_mon -1 --include=all --brief"),
Test("Complete text output grouped by node",
"crm_mon -1 --include=all --group-by-node"),
# XML does not have a brief output option
Test("Complete brief text output grouped by node",
"crm_mon -1 --include=all --group-by-node --brief"),
ValidatingTest("Output grouped by node",
"crm_mon --output-as=xml --group-by-node"),
make_test_group("Complete output filtered by node",
"crm_mon -1 --include=all --node=cluster01"),
make_test_group("Complete output filtered by tag",
"crm_mon -1 --include=all --node=even-nodes"),
make_test_group("Complete output filtered by resource tag",
"crm_mon -1 --include=all --resource=fencing-rscs"),
make_test_group("Output filtered by node that doesn't exist",
"crm_mon -1 --node=blah"),
Test("Basic text output with inactive resources", "crm_mon -1 -r"),
# XML already includes inactive resources
Test("Basic text output with inactive resources, filtered by node",
"crm_mon -1 -r --node=cluster02"),
make_test_group("Complete output filtered by primitive resource",
"crm_mon -1 --include=all --resource=Fencing"),
make_test_group("Complete output filtered by group resource",
"crm_mon -1 --include=all --resource=exim-group"),
Test("Complete text output filtered by group resource member",
"crm_mon -1 --include=all --resource=Public-IP"),
ValidatingTest("Output filtered by group resource member",
"crm_mon --output-as=xml --resource=Email"),
make_test_group("Complete output filtered by clone resource",
"crm_mon -1 --include=all --resource=ping-clone"),
make_test_group("Complete output filtered by clone resource instance",
"crm_mon -1 --include=all --resource=ping"),
Test("Complete text output filtered by exact clone resource instance",
"crm_mon -1 --include=all --show-detail --resource=ping:0"),
ValidatingTest("Output filtered by exact clone resource instance",
"crm_mon --output-as=xml --resource=ping:1"),
make_test_group("Output filtered by resource that doesn't exist",
"crm_mon -1 --resource=blah"),
Test("Basic text output with inactive resources, filtered by tag",
"crm_mon -1 -r --resource=inactive-rscs"),
Test("Basic text output with inactive resources, filtered by bundle resource",
"crm_mon -1 -r --resource=httpd-bundle"),
ValidatingTest("Output filtered by inactive bundle resource",
"crm_mon --output-as=xml --resource=httpd-bundle"),
Test("Basic text output with inactive resources, filtered by bundled IP address resource",
"crm_mon -1 -r --resource=httpd-bundle-ip-192.168.122.131"),
ValidatingTest("Output filtered by bundled IP address resource",
"crm_mon --output-as=xml --resource=httpd-bundle-ip-192.168.122.132"),
Test("Basic text output with inactive resources, filtered by bundled container",
"crm_mon -1 -r --resource=httpd-bundle-docker-1"),
ValidatingTest("Output filtered by bundled container",
"crm_mon --output-as=xml --resource=httpd-bundle-docker-2"),
Test("Basic text output with inactive resources, filtered by bundle connection",
"crm_mon -1 -r --resource=httpd-bundle-0"),
ValidatingTest("Output filtered by bundle connection",
"crm_mon --output-as=xml --resource=httpd-bundle-0"),
Test("Basic text output with inactive resources, filtered by bundled primitive resource",
"crm_mon -1 -r --resource=httpd"),
ValidatingTest("Output filtered by bundled primitive resource",
"crm_mon --output-as=xml --resource=httpd"),
Test("Complete text output, filtered by clone name in cloned group",
"crm_mon -1 --include=all --show-detail --resource=mysql-clone-group"),
ValidatingTest("Output, filtered by clone name in cloned group",
"crm_mon --output-as=xml --resource=mysql-clone-group"),
Test("Complete text output, filtered by group name in cloned group",
"crm_mon -1 --include=all --show-detail --resource=mysql-group"),
ValidatingTest("Output, filtered by group name in cloned group",
"crm_mon --output-as=xml --resource=mysql-group"),
Test("Complete text output, filtered by exact group instance name in cloned group",
"crm_mon -1 --include=all --show-detail --resource=mysql-group:1"),
ValidatingTest("Output, filtered by exact group instance name in cloned group",
"crm_mon --output-as=xml --resource=mysql-group:1"),
Test("Complete text output, filtered by primitive name in cloned group",
"crm_mon -1 --include=all --show-detail --resource=mysql-proxy"),
ValidatingTest("Output, filtered by primitive name in cloned group",
"crm_mon --output-as=xml --resource=mysql-proxy"),
Test("Complete text output, filtered by exact primitive instance name in cloned group",
"crm_mon -1 --include=all --show-detail --resource=mysql-proxy:1"),
ValidatingTest("Output, filtered by exact primitive instance name in cloned group",
"crm_mon --output-as=xml --resource=mysql-proxy:1"),
]
partial_tests = [
Test("Output of partially active resources", "crm_mon -1 --show-detail"),
ValidatingTest("Output of partially active resources", "crm_mon --output-as=xml"),
Test("Output of partially active resources, with inactive resources",
"crm_mon -1 -r --show-detail"),
# XML already includes inactive resources
Test("Complete brief text output, with inactive resources",
"crm_mon -1 -r --include=all --brief --show-detail"),
# XML does not have a brief output option
Test("Text output of partially active group", "crm_mon -1 --resource=partially-active-group"),
Test("Text output of partially active group, with inactive resources",
"crm_mon -1 --resource=partially-active-group -r"),
Test("Text output of active member of partially active group",
"crm_mon -1 --resource=dummy-1"),
Test("Text output of inactive member of partially active group",
"crm_mon -1 --resource=dummy-2 --show-detail"),
Test("Complete brief text output grouped by node, with inactive resources",
"crm_mon -1 -r --include=all --group-by-node --brief --show-detail"),
Test("Text output of partially active resources, with inactive resources, filtered by node",
"crm_mon -1 -r --node=cluster01"),
ValidatingTest("Output of partially active resources, filtered by node",
"crm_mon --output-as=xml --node=cluster01"),
]
unmanaged_tests = [
make_test_group("Output of active unmanaged resource on offline node",
"crm_mon -1"),
Test("Brief text output of active unmanaged resource on offline node",
"crm_mon -1 --brief"),
Test("Brief text output of active unmanaged resource on offline node, grouped by node",
"crm_mon -1 --brief --group-by-node"),
]
maint1_tests = [
make_test_group("Output of all resources with maintenance-mode enabled",
"crm_mon -1 -r",
setup="crm_attribute -n maintenance-mode -v true",
teardown="crm_attribute -n maintenance-mode -v false"),
make_test_group("Output of all resources with maintenance enabled for a node",
"crm_mon -1 -r",
setup="crm_attribute -n maintenance -N cluster02 -v true",
teardown="crm_attribute -n maintenance -N cluster02 -v false"),
]
maint2_tests = [
# The fence resource is excluded, for comparison
make_test_group("Output of all resources with maintenance meta attribute true",
"crm_mon -1 -r"),
]
t180_tests = [
Test("Text output of guest node's container on different node from its remote resource",
"crm_mon -1"),
Test("Complete text output of guest node's container on different node from its remote resource",
"crm_mon -1 --show-detail"),
]
return [
TestGroup(basic_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon.xml"}),
Test("Check that CIB_file=\"-\" works", "crm_mon -1",
env={"CIB_file": "-"},
stdin=pathlib.Path(f"{cts_cli_data}/crm_mon.xml")),
TestGroup(partial_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon-partial.xml"}),
TestGroup(unmanaged_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon-unmanaged.xml"}),
TestGroup(maint1_tests,
cib_gen=partial(copy_existing_cib, f"{cts_cli_data}/crm_mon.xml")),
TestGroup(maint2_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon-rsc-maint.xml"}),
TestGroup(t180_tests,
env={"CIB_file": f"{cts_cli_data}/crm_mon-T180.xml"}),
]
class AclsRegressionTest(RegressionTest):
"""A class for testing access control lists."""
@property
def name(self):
"""Return the name of this regression test."""
return "acls"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
acl_cib = """
"""
basic_tests = [
Test("Configure some ACLs", "cibadmin -M -o acls -p",
update_cib=True, stdin=acl_cib),
Test("Enable ACLs", "crm_attribute -n enable-acl -v true",
update_cib=True),
Test("Set cluster option", "crm_attribute -n no-quorum-policy -v ignore",
update_cib=True),
Test("New ACL role",
"""cibadmin --create -o acls --xml-text ''""",
update_cib=True),
Test("New ACL target",
"""cibadmin --create -o acls --xml-text ''""",
update_cib=True),
Test("Another ACL role",
"""cibadmin --create -o acls --xml-text ''""",
update_cib=True),
Test("Another ACL target",
"""cibadmin --create -o acls --xml-text ''""",
update_cib=True),
Test("Updated ACL",
"""cibadmin --replace -o acls --xml-text ''""",
update_cib=True),
]
no_acl_tests = [
Test("unknownguy: Query configuration", "cibadmin -Q",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("unknownguy: Set enable-acl",
"crm_attribute -n enable-acl -v false",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("unknownguy: Set stonith-enabled",
"crm_attribute -n stonith-enabled -v false",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("unknownguy: Create a resource",
"""cibadmin -C -o resources --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
]
deny_cib_tests = [
Test("l33t-haxor: Query configuration",
"cibadmin -Q",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("l33t-haxor: Set enable-acl",
"crm_attribute -n enable-acl -v false",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("l33t-haxor: Set stonith-enabled",
"crm_attribute -n stonith-enabled -v false",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("l33t-haxor: Create a resource",
"""cibadmin -C -o resources --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
]
observer_tests = [
Test("niceguy: Query configuration", "cibadmin -Q"),
Test("niceguy: Set enable-acl",
"crm_attribute -n enable-acl -v false",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("niceguy: Set stonith-enabled",
"crm_attribute -n stonith-enabled -v false",
update_cib=True),
Test("niceguy: Create a resource",
"""cibadmin -C -o resources --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("root: Query configuration", "cibadmin -Q",
env={"CIB_user": "root"}),
Test("root: Set stonith-enabled", "crm_attribute -n stonith-enabled -v true",
update_cib=True, env={"CIB_user": "root"}),
Test("root: Create a resource",
"""cibadmin -C -o resources --xml-text ''""",
update_cib=True, env={"CIB_user": "root"}),
# For use with later tests
Test("root: Create another resource (with description)",
"""cibadmin -C -o resources --xml-text ''""",
update_cib=True, env={"CIB_user": "root"}),
]
deny_cib_2_tests = [
Test("l33t-haxor: Create a resource meta attribute",
"crm_resource -r dummy --meta -p target-role -v Stopped",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("l33t-haxor: Query a resource meta attribute",
"crm_resource -r dummy --meta -g target-role",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
Test("l33t-haxor: Remove a resource meta attribute",
"crm_resource -r dummy --meta -d target-role",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
]
observer_2_tests = [
Test("niceguy: Create a resource meta attribute",
"crm_resource -r dummy --meta -p target-role -v Stopped",
update_cib=True),
Test("niceguy: Query a resource meta attribute",
"crm_resource -r dummy --meta -g target-role",
update_cib=True),
Test("niceguy: Remove a resource meta attribute",
"crm_resource -r dummy --meta -d target-role",
update_cib=True),
Test("niceguy: Create a resource meta attribute",
"crm_resource -r dummy --meta -p target-role -v Started",
update_cib=True),
]
read_meta_tests = [
Test("badidea: Query configuration - implied deny", "cibadmin -Q"),
]
deny_cib_3_tests = [
Test("betteridea: Query configuration - explicit deny", "cibadmin -Q"),
]
replace_tests = [
TestGroup([
AclTest("niceguy: Replace - remove acls",
"cibadmin --replace -p",
setup="cibadmin --delete --xml-text ''",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
AclTest("niceguy: Replace - create resource",
"cibadmin --replace -p",
setup="""cibadmin -C -o resources --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
AclTest("niceguy: Replace - modify attribute (deny)",
"cibadmin --replace -p",
setup="crm_attribute -n enable-acl -v false",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
AclTest("niceguy: Replace - delete attribute (deny)",
"cibadmin --replace -p",
setup="""cibadmin --replace --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
AclTest("niceguy: Replace - create attribute (deny)",
"cibadmin --replace -p",
setup="""cibadmin --modify --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
], env={"CIB_user": "niceguy"}),
# admin role
TestGroup([
AclTest("bob: Replace - create attribute (direct allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''"""),
AclTest("bob: Replace - modify attribute (direct allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''"""),
AclTest("bob: Replace - delete attribute (direct allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --replace -o resources --xml-text ''"""),
], env={"CIB_user": "bob"}),
# super_user role
TestGroup([
AclTest("joe: Replace - create attribute (inherited allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''"""),
AclTest("joe: Replace - modify attribute (inherited allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''"""),
AclTest("joe: Replace - delete attribute (inherited allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --replace -o resources --xml-text ''"""),
], env={"CIB_user": "joe"}),
# rsc_writer role
TestGroup([
AclTest("mike: Replace - create attribute (allow overrides deny)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''"""),
AclTest("mike: Replace - modify attribute (allow overrides deny)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''"""),
AclTest("mike: Replace - delete attribute (allow overrides deny)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --replace -o resources --xml-text ''"""),
# Create an additional resource for deny-overrides-allow testing
AclTest("mike: Create another resource",
"""cibadmin -C -o resources --xml-text ''""",
update_cib=True),
], env={"CIB_user": "mike"}),
# rsc_denied role
TestGroup([
AclTest("chris: Replace - create attribute (deny overrides allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
AclTest("chris: Replace - modify attribute (deny overrides allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --modify --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
AclTest("chris: Replace - delete attribute (deny overrides allow)",
"cibadmin --replace -o resources -p",
setup="""cibadmin --replace -o resources --xml-text ''""",
expected_rc=ExitStatus.INSUFFICIENT_PRIV),
], env={"CIB_user": "chris"}),
]
loop_tests = [
# no ACL
TestGroup(no_acl_tests, env={"CIB_user": "unknownguy"}),
# deny /cib permission
TestGroup(deny_cib_tests, env={"CIB_user": "l33t-haxor"}),
# observer role
TestGroup(observer_tests, env={"CIB_user": "niceguy"}),
# deny /cib permission
TestGroup(deny_cib_2_tests, env={"CIB_user": "l33t-haxor"}),
# observer role
TestGroup(observer_2_tests, env={"CIB_user": "niceguy"}),
# read //meta_attributes
TestGroup(read_meta_tests, env={"CIB_user": "badidea"}),
# deny /cib, read //meta_attributes
TestGroup(deny_cib_3_tests, env={"CIB_user": "betteridea"}),
] + replace_tests
return [
ShadowTestGroup(basic_tests + [
TestGroup(loop_tests,
env={"PCMK_trace_functions": "pcmk__check_acl,pcmk__apply_creation_acl"})]),
]
class ValidityRegressionTest(RegressionTest):
"""A class for testing CIB validity."""
@property
def name(self):
"""Return the name of this regression test."""
return "validity"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
basic_tests = [
# sanitize_output() strips out validate-with, so there's no point in
# outputting the CIB after tests that modify it
Test("Try to set unrecognized validate-with",
"cibadmin -M --xml-text ''",
expected_rc=ExitStatus.CONFIG),
Test("Try to remove validate-with attribute",
"cibadmin -R -p",
stdin=StdinCmd("""cibadmin -Q | sed 's#validate-with="[^"]*"##'"""),
expected_rc=ExitStatus.CONFIG),
Test("Try to use rsc_order first-action value disallowed by schema",
"cibadmin -M -o constraints --xml-text ''",
expected_rc=ExitStatus.CONFIG, update_cib=True),
Test("Try to use configuration legal only with schema after configured one",
"cibadmin -C -o configuration --xml-text ''",
expected_rc=ExitStatus.CONFIG, update_cib=True),
Test("Disable schema validation",
"cibadmin -M --xml-text ''",
expected_rc=ExitStatus.OK),
Test("Set invalid rsc_order first-action value (schema validation disabled)",
"cibadmin -M -o constraints --xml-text ''",
expected_rc=ExitStatus.OK, update_cib=True),
Test("Run crm_simulate with invalid rsc_order first-action "
"(schema validation disabled)",
"crm_simulate -SL",
expected_rc=ExitStatus.OK),
]
basic_tests_setup = [
"""cibadmin -C -o resources --xml-text ''""",
"""cibadmin -C -o resources --xml-text ''""",
"""cibadmin -C -o constraints --xml-text ''""",
]
return [
ShadowTestGroup(basic_tests, validate_with="pacemaker-1.2",
setup=basic_tests_setup,
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema,invert_action"}),
]
class UpgradeRegressionTest(RegressionTest):
"""A class for testing upgrading the CIB."""
@property
def name(self):
"""Return the name of this regression test."""
return "upgrade"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
resource_cib = """
"""
basic_tests = [
Test("Set stonith-enabled=false", "crm_attribute -n stonith-enabled -v false",
update_cib=True),
Test("Configure the initial resource", "cibadmin -M -o resources -p",
update_cib=True, stdin=resource_cib),
Test("Upgrade to latest CIB schema (trigger 2.10.xsl + the wrapping)",
"cibadmin --upgrade --force -V -V",
update_cib=True),
Test("Query a resource instance attribute (shall survive)",
"crm_resource -r mySmartFuse -g requires",
update_cib=True),
]
return [
ShadowTestGroup(basic_tests, validate_with="pacemaker-2.10",
env={"PCMK_trace_functions": "apply_upgrade,pcmk__update_schema"})
]
class RulesRegressionTest(RegressionTest):
"""A class for testing support for CIB rules."""
@property
def name(self):
"""Return the name of this regression test."""
return "rules"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
tomorrow = datetime.now() + timedelta(days=1)
rule_cib = f"""
"""
usage_tests = [
make_test_group("crm_rule given no arguments", "crm_rule",
expected_rc=ExitStatus.USAGE),
make_test_group("crm_rule given no rule to check", "crm_rule -c",
expected_rc=ExitStatus.USAGE),
make_test_group("crm_rule given invalid input XML",
"crm_rule -c -r blahblah -X invalidxml",
expected_rc=ExitStatus.DATAERR),
make_test_group("crm_rule given invalid input XML on stdin",
"crm_rule -c -r blahblah -X -",
stdin=StdinCmd("echo invalidxml"),
expected_rc=ExitStatus.DATAERR),
]
basic_tests = [
make_test_group("Try to check a rule that doesn't exist",
"crm_rule -c -r blahblah",
expected_rc=ExitStatus.NOSUCH),
make_test_group("Try to check a rule that has too many date_expressions",
"crm_rule -c -r cli-rule-too-many-date-expressions",
expected_rc=ExitStatus.UNIMPLEMENT_FEATURE),
make_test_group("Verify basic rule is expired",
"crm_rule -c -r cli-prefer-rule-dummy-expired",
expected_rc=ExitStatus.EXPIRED),
make_test_group("Verify basic rule worked in the past",
"crm_rule -c -r cli-prefer-rule-dummy-expired -d 20180101"),
make_test_group("Verify basic rule is not yet in effect",
"crm_rule -c -r cli-prefer-rule-dummy-not-yet",
expected_rc=ExitStatus.NOT_YET_IN_EFFECT),
make_test_group("Verify date_spec rule with years has expired",
"crm_rule -c -r cli-prefer-rule-dummy-date_spec-only-years",
expected_rc=ExitStatus.EXPIRED),
make_test_group("Verify multiple rules at once",
"crm_rule -c -r cli-prefer-rule-dummy-not-yet -r cli-prefer-rule-dummy-date_spec-only-years",
expected_rc=ExitStatus.EXPIRED),
make_test_group("Verify date_spec rule with years is in effect",
"crm_rule -c -r cli-prefer-rule-dummy-date_spec-only-years -d 20190201"),
make_test_group("Try to check a rule whose date_spec does not contain years=",
"crm_rule -c -r cli-prefer-rule-dummy-date_spec-without-years",
expected_rc=ExitStatus.UNIMPLEMENT_FEATURE),
make_test_group("Try to check a rule with no date_expression",
"crm_rule -c -r cli-no-date_expression-rule",
expected_rc=ExitStatus.UNIMPLEMENT_FEATURE),
]
return usage_tests + [
TestGroup(basic_tests, cib_gen=partial(write_cib, rule_cib))
]
class FeatureSetRegressionTest(RegressionTest):
"""A class for testing support for version-specific features."""
@property
def name(self):
"""Return the name of this regression test."""
return "feature_set"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
basic_tests = [
# Import the test CIB
Test("Import the test CIB",
f"cibadmin --replace --xml-file {cts_cli_data}/crm_mon-feature_set.xml",
update_cib=True),
Test("Complete text output, no mixed status",
"crm_mon -1 --show-detail"),
ValidatingTest("Output, no mixed status", "crm_mon --output-as=xml"),
# Modify the CIB to fake that the cluster has mixed versions
Test("Fake inconsistent feature set",
"crm_attribute --node=cluster02 --name=#feature-set --update=3.15.0 --lifetime=reboot",
update_cib=True),
Test("Complete text output, mixed status",
"crm_mon -1 --show-detail"),
ValidatingTest("Output, mixed status", "crm_mon --output-as=xml"),
]
return [
ShadowTestGroup(basic_tests),
]
# Tests that depend on resource agents and must be run in an installed
# environment
class AgentRegressionTest(RegressionTest):
"""A class for testing resource agents."""
@property
def name(self):
"""Return the name of this regression test."""
return "agents"
@property
def tests(self):
"""A list of Test instances to be run as part of this regression test."""
return [
make_test_group("Validate a valid resource configuration",
"crm_resource --validate --class ocf --provider pacemaker --agent Dummy"),
# Make the Dummy configuration invalid (op_sleep can't be a generic string)
make_test_group("Validate an invalid resource configuration",
"crm_resource --validate --class ocf --provider pacemaker --agent Dummy",
expected_rc=ExitStatus.NOT_CONFIGURED,
env={"OCF_RESKEY_op_sleep": "asdf"}),
]
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Command line tool regression tests",
epilog=f"Default tests: {' '.join(default_tests)}\n"
"Other tests: agents (must be run in an installed environment)")
parser.add_argument("-j", "--jobs", metavar="JOBS", default=cpu_count() - 1, type=int,
help="The number of tests to run simultaneously")
parser.add_argument("-p", "--path", metavar="DIR", action="append",
help="Look for executables in DIR (may be specified multiple times)")
parser.add_argument("-r", "--run-only", metavar="TEST", choices=default_tests + ["tools"] + other_tests,
action="append",
help="Run only specified tests (may be specified multiple times)")
parser.add_argument("-s", "--save", action="store_true",
help="Save actual output as expected output")
parser.add_argument("-v", "--valgrind", action="store_true",
help="Run all commands under valgrind")
parser.add_argument("-V", "--verbose", action="store_true",
help="Display any differences from expected output")
args = parser.parse_args()
if args.path is None:
args.path = []
return args
def setup_environment(valgrind):
"""Set various environment variables needed for operation."""
if valgrind:
os.environ["G_SLICE"] = "always-malloc"
# Ensure all command output is in portable locale for comparison
os.environ["LC_ALL"] = "C"
# Log test errors to stderr
os.environ["PCMK_stderr"] = "1"
# Because we will change the value of PCMK_trace_functions and then reset it
# back to some initial value at various points, it's easiest to assume it is
# defined but empty by default
if "PCMK_trace_functions" not in os.environ:
os.environ["PCMK_trace_functions"] = ""
-def path_prepend(p):
- """Add another directory to the front of $PATH."""
- old = os.environ["PATH"]
- os.environ["PATH"] = f"{p}:{old}"
-
-
-def setup_path(opts_path):
- """Set the PATH environment variable appropriately for the tests."""
- srcdir = os.path.dirname(test_home)
-
- # Add any search paths given on the command line
- for p in opts_path:
- path_prepend(p)
-
- if os.path.exists(f"{srcdir}/tools/crm_simulate"):
- print(f"Using local binaries from: {srcdir}")
-
- path_prepend(f"{srcdir}/tools")
-
- for daemon in ["based", "controld", "fenced", "schedulerd"]:
- path_prepend(f"{srcdir}/daemons/{daemon}")
-
- print(f"Using local schemas from: {srcdir}/xml")
- os.environ["PCMK_schema_directory"] = f"{srcdir}/xml"
- else:
- path_prepend(BuildOptions.DAEMON_DIR)
- os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR
-
-
def _run_one(valgrind, r):
"""Run and return a TestGroup object."""
# See comments in run_regression_tests.
r.run(valgrind=valgrind)
return r
def run_regression_tests(regs, jobs, valgrind=False):
"""Run the given tests and return the modified objects."""
executed = []
with Pool(processes=jobs) as pool:
# What we really want to do here is:
# pool.map(lambda r: r.run(),regs)
#
# However, multiprocessing uses pickle somehow in its operation, and python
# doesn't want to pickle a lambda (nor a nested function within this one).
# Thus, we need to use the _run_one wrapper at the file level just to call
# run(). Further, if we don't return the modified object from that and then
# return the list of modified objects here, it looks like the rest of the
# program will use the originals, before this was ever run.
executed = pool.map(partial(_run_one, valgrind), regs)
return executed
def results(regs, save, verbose):
"""Print the output from each regression test, returning the number whose output differs."""
output_differs = 0
if verbose:
print("\n\nResults")
sys.stdout.flush()
for r in regs:
r.write()
if save:
dest = f"{test_home}/cli/regression.{r.name}.exp"
copyfile(r.results_file, dest)
r.diff(verbose)
if not r.identical:
output_differs += 1
return output_differs
def summary(regs, output_differs, verbose):
"""Print the summary output for the entire test run."""
test_failures = 0
test_successes = 0
for r in regs:
test_failures += r.failures
test_successes += r.successes
print("\n\nSummary")
sys.stdout.flush()
# First, print all the Passed/Failed lines from each Test run.
for r in regs:
print("\n".join(r.summary))
fmt = PluralFormatter()
# Then, print information specific to each result possibility. Basically,
# if there were failures then we print the output differences, leave the
# failed output files in place, and exit with an error. Otherwise, clean up
# anything that passed.
if test_failures > 0 and output_differs > 0:
print(fmt.format("{0} {0:plural,test} failed; see output in:",
test_failures))
for r in regs:
r.process_results(verbose)
return ExitStatus.ERROR
if test_failures > 0:
print(fmt.format("{0} {0:plural,test} failed", test_failures))
for r in regs:
r.process_results(verbose)
return ExitStatus.ERROR
if output_differs:
print(fmt.format("{0} {0:plural,test} passed but output was "
"unexpected; see output in:", test_successes))
for r in regs:
r.process_results(verbose)
return ExitStatus.DIGEST
print(fmt.format("{0} {0:plural,test} passed", test_successes))
for r in regs:
r.cleanup()
return ExitStatus.OK
regression_classes = [
AccessRenderRegressionTest,
DaemonsRegressionTest,
DatesRegressionTest,
ErrorCodeRegressionTest,
CibadminRegressionTest,
CrmAttributeRegressionTest,
CrmStandbyRegressionTest,
CrmResourceRegressionTest,
CrmTicketRegressionTest,
CrmadminRegressionTest,
CrmShadowRegressionTest,
CrmVerifyRegressionTest,
CrmSimulateRegressionTest,
CrmDiffRegressionTest,
CrmMonRegressionTest,
AclsRegressionTest,
ValidityRegressionTest,
UpgradeRegressionTest,
RulesRegressionTest,
FeatureSetRegressionTest,
AgentRegressionTest,
]
def main():
"""Run command line regression tests as specified by arguments."""
opts = build_options()
setup_environment(opts.valgrind)
- setup_path(opts.path)
+ set_cts_path(extra=opts.path)
# Filter the list of all regression test classes to include only those that
# were requested on the command line. If empty, this defaults to default_tests.
if not opts.run_only:
opts.run_only = default_tests
if opts.run_only == ["tools"]:
opts.run_only = tools_tests
regs = []
for cls in regression_classes:
obj = cls()
if obj.name in opts.run_only:
regs.append(obj)
regs = run_regression_tests(regs, max(1, opts.jobs), valgrind=opts.valgrind)
output_differs = results(regs, opts.save, opts.verbose)
rc = summary(regs, output_differs, opts.verbose)
sys.exit(rc)
if __name__ == "__main__":
main()
# vim: set filetype=python:
diff --git a/cts/cts-exec.in b/cts/cts-exec.in
index 26833da8f3..c423ff154a 100644
--- a/cts/cts-exec.in
+++ b/cts/cts-exec.in
@@ -1,931 +1,903 @@
#!@PYTHON@
"""Regression tests for Pacemaker's pacemaker-execd."""
# pylint doesn't like the module name "cts-execd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2012-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import stat
import sys
import subprocess
import shutil
import tempfile
-# Where to find test binaries
-# Prefer the source tree if available
-TEST_DIR = sys.path[0]
-
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
+from pacemaker._cts.environment import set_cts_path
from pacemaker._cts.process import killall, exit_if_proc_running, stdout_from_command
from pacemaker._cts.test import Test, Tests
# File permissions for executable scripts we create
EXECMODE = stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH
-def update_path():
- # pylint: disable=protected-access
- """Set the PATH environment variable appropriately for the tests."""
- new_path = os.environ['PATH']
-
- if os.path.exists(f"{TEST_DIR}/cts-exec.in"):
- print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR} ({TEST_DIR})")
- # For pacemaker-execd, cts-exec-helper, and pacemaker-remoted
- new_path = f"{BuildOptions._BUILD_DIR}/daemons/execd:{new_path}"
- new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" # For crm_resource
- # For pacemaker-fenced
- new_path = f"{BuildOptions._BUILD_DIR}/daemons/fenced:{new_path}"
- # For cts-support
- new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}"
-
- else:
- print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {TEST_DIR})")
- # For cts-exec-helper, cts-support, pacemaker-execd, pacemaker-fenced,
- # and pacemaker-remoted
- new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
-
- print(f'Using PATH="{new_path}"')
- os.environ['PATH'] = new_path
-
-
class ExecTest(Test):
"""Executor for a single pacemaker-execd regression test."""
def __init__(self, name, description, **kwargs):
"""Create a new ExecTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
Keyword arguments:
tls -- Enable pacemaker-remoted.
"""
Test.__init__(self, name, description, **kwargs)
self.tls = kwargs.get("tls", False)
# If we are going to run the stonith resource tests, we will need to
# launch and track Corosync and pacemaker-fenced.
self._corosync = None
self._fencer = None
self._is_stonith_test = "stonith" in self.name
if self.tls:
self._daemon_location = "pacemaker-remoted"
else:
self._daemon_location = "pacemaker-execd"
if self._is_stonith_test:
self._corosync = Corosync(self.verbose, self.logdir, "cts-exec")
self._test_tool_location = "cts-exec-helper"
def _kill_daemons(self):
killall([
"corosync",
"pacemaker-fenced",
"lt-pacemaker-fenced",
"pacemaker-execd",
"lt-pacemaker-execd",
"cts-exec-helper",
"lt-cts-exec-helper",
"pacemaker-remoted",
])
def _start_daemons(self):
if self._corosync:
self._corosync.start(kill_first=True)
# pylint: disable=consider-using-with
self._fencer = subprocess.Popen(["pacemaker-fenced", "-s"])
cmd = [self._daemon_location, "-l", self.logpath]
if self.verbose:
cmd += ["-V"]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
def clean_environment(self):
"""Clean up the host after running a test."""
if self._daemon_process:
self._daemon_process.terminate()
self._daemon_process.wait()
if self.verbose:
print("Daemon Output Start")
with open(self.logpath, "rt", errors="replace", encoding="utf-8") as logfile:
for line in logfile:
print(line.strip())
print("Daemon Output End")
if self._corosync:
self._fencer.terminate()
self._fencer.wait()
self._corosync.stop()
self._daemon_process = None
self._fencer = None
self._corosync = None
def add_cmd(self, cmd=None, **kwargs):
"""Add a cts-exec-helper command to be executed as part of this test."""
if cmd is None:
cmd = self._test_tool_location
if cmd == self._test_tool_location:
if self.verbose:
kwargs["args"] += " -V "
if self.tls:
kwargs["args"] += " -S "
kwargs["validate"] = False
kwargs["check_rng"] = False
kwargs["check_stderr"] = False
Test.add_cmd(self, cmd, **kwargs)
def run(self):
"""Execute this test."""
if self.tls and self._is_stonith_test:
self._result_txt = f"SKIPPED - '{self.name}' - disabled when testing pacemaker_remote"
print(self._result_txt)
return
Test.run(self)
class ExecTests(Tests):
"""Collection of all pacemaker-execd regression tests."""
def __init__(self, **kwargs):
"""
Create a new ExecTests instance.
Keyword arguments:
tls -- Enable pacemaker-remoted.
"""
Tests.__init__(self, **kwargs)
self.tls = kwargs.get("tls", False)
self._action_timeout = "-t 9000"
self._installed_files = []
self._rsc_classes = self._setup_rsc_classes()
print(f"Testing resource classes {self._rsc_classes!r}")
if "lsb" in self._rsc_classes:
service_agent = "LSBDummy"
elif "systemd" in self._rsc_classes:
service_agent = "pacemaker-cts-dummyd@3"
else:
service_agent = "unsupported"
self._common_cmds = {
"ocf_reg_line": f'-c register_rsc -r ocf_test_rsc {self._action_timeout} -C ocf -P pacemaker -T Dummy',
"ocf_reg_event": '-l "NEW_EVENT event_type:register rsc_id:ocf_test_rsc action:none rc:ok op_status:Done"',
"ocf_unreg_line": f'-c unregister_rsc -r ocf_test_rsc {self._action_timeout} ',
"ocf_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:ocf_test_rsc action:none rc:ok op_status:Done"',
"ocf_start_line": f'-c exec -r ocf_test_rsc -a start {self._action_timeout} ',
"ocf_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:start rc:ok op_status:Done" ',
"ocf_stop_line": f'-c exec -r ocf_test_rsc -a stop {self._action_timeout} ',
"ocf_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:stop rc:ok op_status:Done" ',
"ocf_monitor_line": f'-c exec -r ocf_test_rsc -a monitor -i 2s {self._action_timeout} ',
"ocf_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout} ',
"ocf_cancel_line": f'-c cancel -r ocf_test_rsc -a monitor -i 2s {self._action_timeout} ',
"ocf_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"systemd_reg_line": f'-c register_rsc -r systemd_test_rsc {self._action_timeout} -C systemd -T pacemaker-cts-dummyd@3',
"systemd_reg_event": '-l "NEW_EVENT event_type:register rsc_id:systemd_test_rsc action:none rc:ok op_status:Done"',
"systemd_unreg_line": f'-c unregister_rsc -r systemd_test_rsc {self._action_timeout} ',
"systemd_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:systemd_test_rsc action:none rc:ok op_status:Done"',
"systemd_start_line": f'-c exec -r systemd_test_rsc -a start {self._action_timeout} ',
"systemd_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:start rc:ok op_status:Done" ',
"systemd_stop_line": f'-c exec -r systemd_test_rsc -a stop {self._action_timeout} ',
"systemd_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:stop rc:ok op_status:Done" ',
"systemd_monitor_line": f'-c exec -r systemd_test_rsc -a monitor -i 2s {self._action_timeout} ',
"systemd_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:monitor rc:ok op_status:Done" -t 15000 ',
"systemd_cancel_line": f'-c cancel -r systemd_test_rsc -a monitor -i 2s {self._action_timeout} ',
"systemd_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"service_reg_line": f"-c register_rsc -r service_test_rsc {self._action_timeout} -C service -T {service_agent}",
"service_reg_event": '-l "NEW_EVENT event_type:register rsc_id:service_test_rsc action:none rc:ok op_status:Done"',
"service_unreg_line": f'-c unregister_rsc -r service_test_rsc {self._action_timeout} ',
"service_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:service_test_rsc action:none rc:ok op_status:Done"',
"service_start_line": f'-c exec -r service_test_rsc -a start {self._action_timeout} ',
"service_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:start rc:ok op_status:Done" ',
"service_stop_line": f'-c exec -r service_test_rsc -a stop {self._action_timeout} ',
"service_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:stop rc:ok op_status:Done" ',
"service_monitor_line": f'-c exec -r service_test_rsc -a monitor -i 2s {self._action_timeout} ',
"service_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout} ',
"service_cancel_line": f'-c cancel -r service_test_rsc -a monitor -i 2s {self._action_timeout} ',
"service_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"lsb_reg_line": f'-c register_rsc -r lsb_test_rsc {self._action_timeout} -C lsb -T LSBDummy',
"lsb_reg_event": '-l "NEW_EVENT event_type:register rsc_id:lsb_test_rsc action:none rc:ok op_status:Done" ',
"lsb_unreg_line": f'-c unregister_rsc -r lsb_test_rsc {self._action_timeout} ',
"lsb_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:lsb_test_rsc action:none rc:ok op_status:Done"',
"lsb_start_line": f'-c exec -r lsb_test_rsc -a start {self._action_timeout} ',
"lsb_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:start rc:ok op_status:Done" ',
"lsb_stop_line": f'-c exec -r lsb_test_rsc -a stop {self._action_timeout} ',
"lsb_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:stop rc:ok op_status:Done" ',
"lsb_monitor_line": f'-c exec -r lsb_test_rsc -a status -i 2s {self._action_timeout} ',
"lsb_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:Done" {self._action_timeout} ',
"lsb_cancel_line": f'-c cancel -r lsb_test_rsc -a status -i 2s {self._action_timeout} ',
"lsb_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:Cancelled" ',
"stonith_reg_line": f'-c register_rsc -r stonith_test_rsc {self._action_timeout} -C stonith -P pacemaker -T fence_dummy',
"stonith_reg_event": '-l "NEW_EVENT event_type:register rsc_id:stonith_test_rsc action:none rc:ok op_status:Done" ',
"stonith_unreg_line": f'-c unregister_rsc -r stonith_test_rsc {self._action_timeout} ',
"stonith_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:stonith_test_rsc action:none rc:ok op_status:Done"',
"stonith_start_line": f'-c exec -r stonith_test_rsc -a start {self._action_timeout} ',
"stonith_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:start rc:ok op_status:Done" ',
"stonith_stop_line": f'-c exec -r stonith_test_rsc -a stop {self._action_timeout} ',
"stonith_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:stop rc:ok op_status:Done" ',
"stonith_monitor_line": f'-c exec -r stonith_test_rsc -a monitor -i 2s {self._action_timeout} ',
"stonith_monitor_event": f'-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout} ',
"stonith_cancel_line": f'-c cancel -r stonith_test_rsc -a monitor -i 2s {self._action_timeout} ',
"stonith_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Cancelled" ',
}
def _setup_rsc_classes(self):
"""Determine which resource classes are supported."""
classes = stdout_from_command(["crm_resource", "--list-standards"])
# Strip trailing empty line
classes = classes[:-1]
if self.tls:
classes.remove("stonith")
if "systemd" in classes:
try:
# This code doesn't need this import, but pacemaker-cts-dummyd
# does, so ensure the dependency is available rather than cause
# all systemd tests to fail.
# pylint: disable=import-outside-toplevel,unused-import
import systemd.daemon
except ImportError:
print("Python systemd bindings not found.")
print("The tests for systemd class are not going to be run.")
classes.remove("systemd")
return classes
def new_test(self, name, description):
"""Create a named test."""
test = ExecTest(name, description, verbose=self.verbose, tls=self.tls,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self):
"""Prepare the host before executing any tests."""
if BuildOptions.REMOTE_ENABLED:
# @TODO Use systemctl when available, and use the subprocess module
# with an argument array instead of os.system()
os.system("service pacemaker_remote stop")
self.cleanup_environment()
# @TODO Support the option of using specified existing certificates
authkey = f"{BuildOptions.PACEMAKER_CONFIG_DIR}/authkey"
if self.tls and not os.path.isfile(authkey):
print(f"Installing {authkey} ...")
# @TODO Use os.mkdir() instead
os.system(f"mkdir -p {BuildOptions.PACEMAKER_CONFIG_DIR}")
# @TODO Use the subprocess module with an argument array instead
os.system(f"dd if=/dev/urandom of={authkey} bs=4096 count=1")
self._installed_files.append(authkey)
# If we're in build directory, install agents if not already installed
# pylint: disable=protected-access
if os.path.exists(f"{BuildOptions._BUILD_DIR}/cts/cts-exec.in"):
if not os.path.exists(f"{BuildOptions.OCF_RA_INSTALL_DIR}/pacemaker"):
# @TODO remember which components were created and remove them
os.makedirs(f"{BuildOptions.OCF_RA_INSTALL_DIR}/pacemaker", 0o755)
for agent in ["Dummy", "Stateful", "ping"]:
agent_source = f"{BuildOptions._BUILD_DIR}/extra/resources/{agent}"
agent_dest = f"{BuildOptions.OCF_RA_INSTALL_DIR}/pacemaker/{agent}"
if not os.path.exists(agent_dest):
print(f"Installing {agent_dest} ...")
shutil.copyfile(agent_source, agent_dest)
os.chmod(agent_dest, EXECMODE)
self._installed_files.append(agent_dest)
subprocess.call(["cts-support", "install"])
def cleanup_environment(self):
"""Clean up the host after executing desired tests."""
for installed_file in self._installed_files:
print(f"Removing {installed_file} ...")
os.remove(installed_file)
subprocess.call(["cts-support", "uninstall"])
def _build_cmd_str(self, rsc, ty):
"""Construct a command string for the given resource and type."""
return f"{self._common_cmds[f'{rsc}_{ty}_line']} {self._common_cmds[f'{rsc}_{ty}_event']}"
def build_generic_tests(self):
"""Register tests that apply to all resource classes."""
common_cmds = self._common_cmds
# register/unregister tests
for rsc in self._rsc_classes:
test = self.new_test(f"generic_registration_{rsc}",
f"Simple resource registration test for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# start/stop tests
for rsc in self._rsc_classes:
test = self.new_test(f"generic_start_stop_{rsc}",
f"Simple start and stop test for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# monitor cancel test
for rsc in self._rsc_classes:
test = self.new_test(f"generic_monitor_cancel_{rsc}",
f"Simple monitor cancel test for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# monitor duplicate test
for rsc in self._rsc_classes:
test = self.new_test(f"generic_monitor_duplicate_{rsc}",
f"Test creation and canceling of duplicate monitors for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# Add the duplicate monitors
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# verify we still get update events
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# cancel the monitor, if the duplicate merged with the original, we should no longer see monitor updates
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# stop implies cancel test
for rsc in self._rsc_classes:
test = self.new_test(f"generic_stop_implies_cancel_{rsc}",
f"Verify stopping a resource implies cancel of recurring ops for {rsc} standard")
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"],
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
def build_multi_rsc_tests(self):
"""Register complex tests that involve managing multiple resouces of different types."""
common_cmds = self._common_cmds
# do not use service and systemd at the same time, it is the same resource.
# register start monitor stop unregister resources of each type at the same time
test = self.new_test("multi_rsc_start_stop_all_including_stonith",
"Start, monitor, and stop resources of multiple types and classes")
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
for rsc in self._rsc_classes:
# If this fails, that means the monitor is not being rescheduled
test.add_cmd(args=common_cmds[f"{rsc}_monitor_event"])
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
def build_negative_tests(self):
"""Register tests related to how pacemaker-execd handles failures."""
# ocf start timeout test
test = self.new_test("ocf_start_timeout", "Force start timeout to occur, verify start failure.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# -t must be less than self._action_timeout
test.add_cmd(args='-c exec -r test_rsc -a start -k op_sleep -v 5 -t 1000 -w')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:Error occurred op_status:Timed out" '
f'{self._action_timeout}')
test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# stonith start timeout test
test = self.new_test("stonith_start_timeout", "Force start timeout to occur, verify start failure.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C stonith -P pacemaker -T fence_dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done"')
# -t must be less than self._action_timeout
test.add_cmd(args='-c exec -r test_rsc -a start -k monitor_delay -v 30 -t 1000 -w')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:Error occurred op_status:Timed out" '
f'{self._action_timeout}')
test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# stonith component fail
test = self.new_test("stonith_component_fail", "Kill stonith component after pacemaker-execd connects")
test.add_cmd(args=self._build_cmd_str("stonith", "reg"))
test.add_cmd(args=self._build_cmd_str("stonith", "start"))
test.add_cmd(args='-c exec -r stonith_test_rsc -a monitor -i 600s '
'-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Done" '
f'{self._action_timeout}')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:Error occurred op_status:error" -t 15000',
kill="killall -9 -q pacemaker-fenced lt-pacemaker-fenced")
test.add_cmd(args=self._build_cmd_str("stonith", "unreg"))
# monitor fail for ocf resources
test = self.new_test("monitor_fail_ocf", "Force ocf monitor to fail, verify failure is reported.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" '
f'{self._action_timeout}')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" '
f'{self._action_timeout}')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout} ',
kill=f"rm -f {BuildOptions.LOCAL_STATE_DIR}/run/Dummy-test_rsc.state")
test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" '
f'{self._action_timeout}', expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" '
f'{self._action_timeout}', expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# verify notify changes only for monitor operation
test = self.new_test("monitor_changes_only", "Verify when flag is set, only monitor changes are notified.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} -o '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout} '
' -o -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
kill=f"rm -f {BuildOptions.LOCAL_STATE_DIR}/run/Dummy-test_rsc.state")
test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done"')
# monitor fail for systemd resource
if "systemd" in self._rsc_classes:
test = self.new_test("monitor_fail_systemd", "Force systemd monitor to fail, verify failure is reported..")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C systemd -T pacemaker-cts-dummyd@3 {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
kill="pkill -9 -f pacemaker-cts-dummyd")
test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Cancel non-existent operation on a resource
test = self.new_test("cancel_non_existent_op", "Attempt to cancel the wrong monitor operation, verify expected failure")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ')
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}')
# interval is wrong, should fail
test.add_cmd(args=f'-c cancel -r test_rsc -a monitor -i 2s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ',
expected_exitcode=ExitStatus.ERROR)
# action name is wrong, should fail
test.add_cmd(args=f'-c cancel -r test_rsc -a stop -i 1s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Attempt to invoke non-existent rsc id
test = self.new_test("invoke_non_existent_rsc", "Attempt to perform operations on a non-existent rsc id.")
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:error op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 6s {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c cancel -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register and start a resource that doesn't exist, systemd
if "systemd" in self._rsc_classes:
test = self.new_test("start_uninstalled_systemd", "Register uninstalled systemd agent, try to start, verify expected failure")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C systemd -T this_is_fake1234 {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register and start a resource that doesn't exist, ocf
test = self.new_test("start_uninstalled_ocf", "Register uninstalled ocf agent, try to start, verify expected failure.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T this_is_fake1234 {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register ocf with non-existent provider
test = self.new_test("start_ocf_bad_provider", "Register ocf agent with a non-existent provider, verify expected failure.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pancakes -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register ocf with empty provider field
test = self.new_test("start_ocf_no_provider", "Register ocf agent with a no provider, verify expected failure.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Error" ',
expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
def build_stress_tests(self):
"""Register stress tests."""
timeout = "-t 20000"
iterations = 25
test = self.new_test("ocf_stress", "Verify OCF agent handling works under load")
for i in range(iterations):
test.add_cmd(args=f'-c register_rsc -r rsc_{i} {timeout} -C ocf -P heartbeat -T Dummy -l "NEW_EVENT event_type:register rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a start {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:start rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a monitor {timeout} -i 1s '
f'-l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:monitor rc:ok op_status:Done"')
for i in range(iterations):
test.add_cmd(args=f'-c exec -r rsc_{i} -a stop {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:stop rc:ok op_status:Done"')
test.add_cmd(args=f'-c unregister_rsc -r rsc_{i} {timeout} -l "NEW_EVENT event_type:unregister rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
if "systemd" in self._rsc_classes:
test = self.new_test("systemd_stress", "Verify systemd dbus connection works under load")
for i in range(iterations):
test.add_cmd(args=f'-c register_rsc -r rsc_{i} {timeout} -C systemd -T pacemaker-cts-dummyd@3 -l "NEW_EVENT event_type:register rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a start {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:start rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r rsc_{i} -a monitor {timeout} -i 1s '
f'-l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:monitor rc:ok op_status:Done"')
for i in range(iterations):
test.add_cmd(args=f'-c exec -r rsc_{i} -a stop {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_{i} action:stop rc:ok op_status:Done"')
test.add_cmd(args=f'-c unregister_rsc -r rsc_{i} {timeout} -l "NEW_EVENT event_type:unregister rsc_id:rsc_{i} action:none rc:ok op_status:Done"')
iterations = 9
timeout = "-t 30000"
# Verify recurring op in-flight collision is handled in series properly
test = self.new_test("rsc_inflight_collision", "Verify recurring ops do not collide with other operations for the same rsc.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args=f'-c exec -r test_rsc -a start {timeout} -k op_sleep -v 1 -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done"')
for i in range(iterations):
test.add_cmd(args=f'-c exec -r test_rsc -a monitor {timeout} -i 100{i}ms -k op_sleep -v 2 '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
test.add_cmd(args=f'-c exec -r test_rsc -a stop {timeout} -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done"')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {timeout} -l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done"')
def build_custom_tests(self):
"""Register tests that target specific cases."""
# verify resource temporary folder is created and used by OCF agents
test = self.new_test("rsc_tmp_dir", "Verify creation and use of rsc temporary state directory")
test.add_cmd("ls", args=f"-al {BuildOptions.RSC_TMP_DIR}")
test.add_cmd(args='-c register_rsc -r test_rsc -P heartbeat -C ocf -T Dummy '
f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args='-c exec -r test_rsc -a start -t 4000')
test.add_cmd("ls", args=f"-al {BuildOptions.RSC_TMP_DIR}")
test.add_cmd("ls", args=f"{BuildOptions.RSC_TMP_DIR}/Dummy-test_rsc.state")
test.add_cmd(args='-c exec -r test_rsc -a stop -t 4000')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# start delay then stop test
test = self.new_test("start_delay", "Verify start delay works as expected.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args='-c exec -r test_rsc -s 6000 -a start -w -t 6000')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" -t 2000',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" -t 6000')
test.add_cmd(args=f'-c exec -r test_rsc -a stop {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:Done" ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# start delay, but cancel before it gets a chance to start
test = self.new_test("start_delay_cancel", "Using start_delay, start a rsc, but cancel the start op before execution.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
f'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" {self._action_timeout}')
test.add_cmd(args='-c exec -r test_rsc -s 5000 -a start -w -t 4000')
test.add_cmd(args=f'-c cancel -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" -t 5000',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# Register a bunch of resources, verify we can get info on them
test = self.new_test("verify_get_rsc_info", "Register multiple resources, verify retrieval of rsc info.")
if "systemd" in self._rsc_classes:
test.add_cmd(args=f'-c register_rsc -r rsc1 -C systemd -T pacemaker-cts-dummyd@3 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc1 ')
test.add_cmd(args=f'-c unregister_rsc -r rsc1 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc1 ', expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc2 ')
test.add_cmd(args=f'-c unregister_rsc -r rsc2 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc2 ', expected_exitcode=ExitStatus.ERROR)
# Register duplicate, verify only one entry exists and can still be removed
test = self.new_test("duplicate_registration", "Register resource multiple times, verify only one entry exists and can be removed.")
test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker {self._action_timeout}')
test.add_cmd(args="-c get_rsc_info -r rsc2 ",
stdout_match="id:rsc2 class:ocf provider:pacemaker type:Dummy")
test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker {self._action_timeout}')
test.add_cmd(args="-c get_rsc_info -r rsc2 ",
stdout_match="id:rsc2 class:ocf provider:pacemaker type:Dummy")
test.add_cmd(args=f'-c register_rsc -r rsc2 -C ocf -T Stateful -P pacemaker {self._action_timeout}')
test.add_cmd(args="-c get_rsc_info -r rsc2 ",
stdout_match="id:rsc2 class:ocf provider:pacemaker type:Stateful")
test.add_cmd(args=f'-c unregister_rsc -r rsc2 {self._action_timeout}')
test.add_cmd(args='-c get_rsc_info -r rsc2 ', expected_exitcode=ExitStatus.ERROR)
# verify the option to only send notification to the original client
test = self.new_test("notify_orig_client_only", "Verify option to only send notifications to the client originating the action.")
test.add_cmd(args=f'-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy {self._action_timeout} '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a start {self._action_timeout} '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Done" ')
test.add_cmd(args=f'-c exec -r test_rsc -a monitor -i 1s {self._action_timeout} -n '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done"')
# this will fail because the monitor notifications should only go to the original caller, which no longer exists.
test.add_cmd(args=f'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:Done" {self._action_timeout}',
expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s -t 6000 ')
test.add_cmd(args=f'-c unregister_rsc -r test_rsc {self._action_timeout} '
'-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:Done" ')
# get metadata
test = self.new_test("get_ocf_metadata", "Retrieve metadata for a resource")
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T Dummy",
stdout_match="resource-agent name=\"Dummy\"")
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T Stateful")
test.add_cmd(args="-c metadata -P pacemaker -T Stateful", expected_exitcode=ExitStatus.ERROR)
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T fake_agent", expected_exitcode=ExitStatus.ERROR)
# get stonith metadata
test = self.new_test("get_stonith_metadata", "Retrieve stonith metadata for a resource")
test.add_cmd(args="-c metadata -C stonith -P pacemaker -T fence_dummy",
stdout_match="resource-agent name=\"fence_dummy\"")
# get lsb metadata
if "lsb" in self._rsc_classes:
test = self.new_test("get_lsb_metadata",
"Retrieve metadata for an LSB resource")
test.add_cmd(args="-c metadata -C lsb -T LSBDummy",
stdout_match="resource-agent name='LSBDummy'")
# get metadata
if "systemd" in self._rsc_classes:
test = self.new_test("get_systemd_metadata", "Retrieve metadata for a resource")
test.add_cmd(args="-c metadata -C systemd -T pacemaker-cts-dummyd@",
stdout_match="resource-agent name=\"pacemaker-cts-dummyd@\"")
# get ocf providers
test = self.new_test("list_ocf_providers",
"Retrieve list of available resource providers, verifies pacemaker is a provider.")
test.add_cmd(args="-c list_ocf_providers ", stdout_match="pacemaker")
test.add_cmd(args="-c list_ocf_providers -T ping", stdout_match="pacemaker")
# Verify agents only exist in their lists
test = self.new_test("verify_agent_lists", "Verify the agent lists contain the right data.")
if "ocf" in self._rsc_classes:
test.add_cmd(args="-c list_agents ", stdout_match="Stateful")
test.add_cmd(args="-c list_agents -C ocf", stdout_match="Stateful",
stdout_no_match="pacemaker-cts-dummyd@|fence_dummy")
if "service" in self._rsc_classes:
test.add_cmd(args="-c list_agents -C service", stdout_match="",
stdout_no_match="Stateful|fence_dummy")
if "lsb" in self._rsc_classes:
test.add_cmd(args="-c list_agents", stdout_match="LSBDummy")
test.add_cmd(args="-c list_agents -C lsb", stdout_match="LSBDummy",
stdout_no_match="pacemaker-cts-dummyd@|Stateful|fence_dummy")
test.add_cmd(args="-c list_agents -C service", stdout_match="LSBDummy")
if "systemd" in self._rsc_classes:
test.add_cmd(args="-c list_agents ", stdout_match="pacemaker-cts-dummyd@") # systemd
test.add_cmd(args="-c list_agents -C systemd", stdout_match="", stdout_no_match="Stateful") # should not exist
test.add_cmd(args="-c list_agents -C systemd", stdout_match="pacemaker-cts-dummyd@")
test.add_cmd(args="-c list_agents -C systemd", stdout_match="", stdout_no_match="fence_dummy") # should not exist
if "stonith" in self._rsc_classes:
test.add_cmd(args="-c list_agents -C stonith", stdout_match="fence_dummy") # stonith
test.add_cmd(args="-c list_agents -C stonith", stdout_match="", # should not exist
stdout_no_match="pacemaker-cts-dummyd@")
test.add_cmd(args="-c list_agents -C stonith", stdout_match="", stdout_no_match="Stateful") # should not exist
test.add_cmd(args="-c list_agents ", stdout_match="fence_dummy")
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-execd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
f"\t {sys.argv[0]} --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
f"\t {sys.argv[0]} --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-t", "--timeout", type=float, default=2,
help="Up to how many seconds each test case waits for the daemon to "
"be initialized. Defaults to 2. The value 0 means no limit.")
parser.add_argument("-w", "--force-wait", action="store_true",
help="Each test case waits the default/specified --timeout for the "
"daemon without tracking the log")
if BuildOptions.REMOTE_ENABLED:
parser.add_argument("-R", "--pacemaker-remote", action="store_true",
help="Test pacemaker-remoted binary instead of pacemaker-execd")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run pacemaker-execd regression tests as specified by arguments."""
- update_path()
+ set_cts_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
if opts.pacemaker_remote:
exit_if_proc_running("pacemaker-remoted")
else:
exit_if_proc_running("corosync")
exit_if_proc_running("pacemaker-execd")
exit_if_proc_running("pacemaker-fenced")
# Create a temporary directory for log files (the directory will
# automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-exec-") as logdir:
tests = ExecTests(verbose=opts.verbose, tls=opts.pacemaker_remote,
timeout=opts.timeout, force_wait=opts.force_wait,
logdir=logdir)
tests.build_generic_tests()
tests.build_multi_rsc_tests()
tests.build_negative_tests()
tests.build_custom_tests()
tests.build_stress_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
tests.setup_environment()
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment()
tests.exit()
if __name__ == "__main__":
main()
# vim: set filetype=python:
diff --git a/cts/cts-fencing.in b/cts/cts-fencing.in
index c6563ea313..04323310c9 100644
--- a/cts/cts-fencing.in
+++ b/cts/cts-fencing.in
@@ -1,927 +1,905 @@
#!@PYTHON@
"""Regression tests for Pacemaker's fencer."""
# pylint doesn't like the module name "cts-fencing" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2012-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import sys
import subprocess
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
-from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync, localname
+from pacemaker._cts.environment import set_cts_path
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
-TEST_DIR = sys.path[0]
-
-
-def update_path():
- """Set the PATH environment variable appropriately for the tests."""
- new_path = os.environ['PATH']
- if os.path.exists(f"{TEST_DIR}/cts-fencing.in"):
- # pylint: disable=protected-access
- print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR} ({TEST_DIR})")
- # For pacemaker-fenced and cts-fence-helper
- new_path = f"{BuildOptions._BUILD_DIR}/daemons/fenced:{new_path}"
- new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}" # For stonith_admin
- new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}" # For cts-support
-
- else:
- print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {TEST_DIR})")
- # For pacemaker-fenced, cts-fence-helper, and cts-support
- new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
-
- print(f'Using PATH="{new_path}"')
- os.environ['PATH'] = new_path
-
class FenceTest(Test):
"""Executor for a single test."""
def __init__(self, name, description, **kwargs):
"""
Create a new FenceTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
"""
Test.__init__(self, name, description, **kwargs)
self._daemon_location = "pacemaker-fenced"
def _kill_daemons(self):
killall(["pacemakerd", "pacemaker-fenced"])
def _start_daemons(self):
cmd = ["pacemaker-fenced", "--stand-alone", "--logfile", self.logpath]
if self.verbose:
cmd += ["-V"]
s = " ".join(cmd)
print(f"Starting {s}")
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
class FenceTests(Tests):
"""Collection of all fencing regression tests."""
def __init__(self, **kwargs):
"""Create a new FenceTests instance."""
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-fencing")
def new_test(self, name, description):
"""Create a named test."""
test = FenceTest(name, description, verbose=self.verbose,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def build_api_sanity_tests(self):
"""Register tests to verify basic API usage."""
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("low_level_api_test", "Sanity-test client API")
test.add_cmd("cts-fence-helper", args=f"-t {verbose_arg}", validate=False)
test = self.new_test("low_level_api_mainloop_test",
"Sanity-test client API using mainloop")
test.add_cmd("cts-fence-helper", args=f"-m {verbose_arg}", validate=False)
def build_custom_timeout_tests(self):
"""Register tests to verify custom timeout usage."""
# custom timeout without topology
test = self.new_test("custom_timeout_1",
"Verify per device timeouts work as expected without using topology")
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4 = 10
test.add_log_pattern("Total timeout set to 12s")
# custom timeout _WITH_ topology
test = self.new_test("custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology")
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1000ms')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4000s')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4000 = 4006
test.add_log_pattern("Total timeout set to 4807s")
def build_fence_merge_tests(self):
"""Register tests to verify when fence operations should be merged."""
# Simple test that overlapping fencing operations get merged
test = self.new_test("custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
# one merger will happen
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
# the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
# Test that multiple mergers occur
test = self.new_test("custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o delay=2 -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
# 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
# the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
# Test that multiple mergers occur with topologies used
test = self.new_test("custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
# 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
# the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
def build_fence_no_merge_tests(self):
"""Register tests to verify when fence operations should not be merged."""
test = self.new_test("custom_no_merge",
"Verify differing fencing operations are not merged")
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node2 -t 10", no_wait=True)
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client",
negative=True)
def build_standalone_tests(self):
"""Register a grab bag of tests."""
# test what happens when all devices timeout
test = self.new_test("fence_multi_device_failure",
"Verify that all devices timeout, a fencing failure is returned")
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 2", expected_exitcode=ExitStatus.TIMEOUT)
test.add_log_pattern("Total timeout set to 7s")
test.add_log_pattern("targeting node3 using false1 returned ")
test.add_log_pattern("targeting node3 using false2 returned ")
test.add_log_pattern("targeting node3 using false3 returned ")
# test what happens when multiple devices can fence a node, but the first device fails
test = self.new_test("fence_device_failure_rollover",
"Verify that when one fence device fails for a node, the others are tried")
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 18s")
# test what happens when we try to use a missing fence-agent
test = self.new_test("fence_missing_agent",
"Verify proper error-handling when using a non-existent fence-agent")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_missing -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5", expected_exitcode=ExitStatus.NOSUCH)
test.add_cmd("stonith_admin", args="--output-as=xml -F node2 -t 5")
# simple topology test for one device
test = self.new_test("topology_simple",
"Verify all fencing devices at a level are used")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# add topology, delete topology, verify fencing still works
test = self.new_test("topology_add_remove",
"Verify fencing occurrs after all topology levels are removed")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level has multiple devices
test = self.new_test("topology_device_fails",
"Verify if one device in a level fails, the other is tried")
test.add_cmd("stonith_admin",
args='--output-as=xml -R false -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 48s")
test.add_log_pattern("targeting node3 using false returned 1")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level fails
test = self.new_test("topology_multi_level_fails",
"Verify if one level fails, the next leve is tried")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 3")
test.add_log_pattern("Total timeout set to 21s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned 1")
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# test what happens when the first fencing level had devices that no one has registered
test = self.new_test("topology_missing_devices",
"Verify topology can continue with missing devices")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# Test what happens if multiple fencing levels are defined, and then the first one is removed
test = self.new_test("topology_level_removal",
"Verify level removal works")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 96s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned ",
negative=True)
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# Test targeting a topology level by node name pattern
test = self.new_test("topology_level_pattern",
"Verify targeting topology by node name pattern works")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r '@node.*' -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("targeting node3 using true returned 0")
# test allowing commas and semicolons as delimiters in pcmk_host_list
test = self.new_test("host_list_delimiters",
"Verify commas and semicolons can be used as pcmk_host_list delimiters")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1,node2,node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=pcmk1;pcmk2;pcmk3"')
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F node2 -t 5")
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F pcmk3 -t 5")
test.add_log_pattern("targeting node2 using true1 returned 0")
test.add_log_pattern("targeting pcmk3 using true2 returned 0")
# test the stonith builds the correct list of devices that can fence a node
test = self.new_test("list_devices",
"Verify list of devices that can fence a node is correct")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -l node1 -V",
stdout_match="true2", stdout_no_match="true1")
test.add_cmd("stonith_admin", args="--output-as=xml -l node1 -V",
stdout_match="true3", stdout_no_match="true1")
# simple test of device monitor
test = self.new_test("monitor", "Verify device is reachable")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q false1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q true2", expected_exitcode=ExitStatus.NOSUCH)
# Verify monitor occurs for duration of timeout period on failure
test = self.new_test("monitor_timeout",
"Verify monitor uses duration of timeout period given")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1 -t 5", expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempt 2 to execute")
# Verify monitor occurs for duration of timeout period on failure, but stops at max retries
test = self.new_test("monitor_timeout_max_retries",
"Verify monitor retries until max retry value or timeout is hit")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1 -t 15", expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times")
# simple register test
test = self.new_test("register",
"Verify devices can be registered and un-registered")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1", expected_exitcode=ExitStatus.NOSUCH)
# simple reboot test
test = self.new_test("reboot", "Verify devices can be rebooted")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -B node3 -t 5")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1", expected_exitcode=ExitStatus.NOSUCH)
# test fencing history
test = self.new_test("fence_history",
"Verify last fencing operation is returned")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5 -V")
test.add_cmd("stonith_admin", args="--output-as=xml -H node3",
stdout_match='action="off" target="node3" .* status="success"')
# simple test of dynamic list query
test = self.new_test("dynamic_list_query",
"Verify dynamic list of fencing devices can be retrieved")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -l fake_port_1",
stdout_match='count="3"')
# fence using dynamic list query
test = self.new_test("fence_dynamic_list_query",
"Verify dynamic list of fencing devices can be retrieved")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -F fake_port_1 -t 5 -V")
# simple test of query using status action
test = self.new_test("status_query",
"Verify dynamic list of fencing devices can be retrieved")
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args="--output-as=xml -l fake_port_1",
stdout_match='count="3"')
# test what happens when no reboot action is advertised
test = self.new_test("no_reboot_support",
"Verify reboot action defaults to off when no reboot action is advertised by agent")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_no_reboot -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not support reboot")
test.add_log_pattern("using true1 returned 0")
# make sure reboot is used when reboot action is advertised
test = self.new_test("with_reboot_support",
"Verify reboot action can be used when metadata advertises it")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not advertise support for 'reboot', performing 'off'",
negative=True)
test.add_log_pattern("using true1 returned 0")
# make sure all fencing delays are applied correctly and taken into account by fencing timeouts with topology
test = self.new_test("topology_delays",
"Verify all fencing delays are applied correctly and taken into account by fencing timeouts with topology")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
# Resulting "random" delay will always be 1 since (rand() % (delay_max - delay_base)) is always 0 here
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1 -o pcmk_delay_max=2')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 --delay 1")
# Total fencing timeout takes all fencing delays into account
test.add_log_pattern("Total timeout set to 582s")
# Fencing timeout for the first device takes the requested fencing delay
# and pcmk_delay_base into account
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true1 .*146s.*",
regex=True)
# Requested fencing delay is applied only for the first device in the
# first level, with the static delay from pcmk_delay_base added
test.add_log_pattern("Delaying 'off' action targeting node3 using true1 for 2s | timeout=120s requested_delay=1s base=1s max=1s")
# Fencing timeout no longer takes the requested fencing delay into account for further devices
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using false1 .*145s.*",
regex=True)
# Requested fencing delay is no longer applied for further devices
test.add_log_pattern("Delaying 'off' action targeting node3 using false1 for 1s | timeout=120s requested_delay=0s base=1s max=1s")
# Fencing timeout takes pcmk_delay_max into account
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true2 .*146s.*",
regex=True)
test.add_log_pattern("Delaying 'off' action targeting node3 using true2 for 1s | timeout=120s requested_delay=0s base=1s max=2s")
test.add_log_pattern("Delaying 'off' action targeting node3 using true3",
negative=True)
def build_unfence_tests(self):
"""Register tests that verify unfencing."""
our_uname = localname()
# verify unfencing using automatic unfencing
test = self.new_test("unfence_required_1",
"Verify require unfencing on all devices when automatic=true in agent's metadata")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
# both devices should be executed
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
# verify unfencing using automatic unfencing fails if any of the required agents fail
test = self.new_test("unfence_required_2",
"Verify require unfencing on all devices when automatic=true in agent's metadata")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=fail -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 6", expected_exitcode=ExitStatus.ERROR)
# verify unfencing using automatic devices with topology
test = self.new_test("unfence_required_3",
"Verify require unfencing on all devices even when at different topology levels")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v true1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
# verify unfencing using automatic devices with topology
test = self.new_test("unfence_required_4",
"Verify all required devices are executed even with topology levels fail")
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true3 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R true4 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd('stonith_admin',
args=f'--output-as=xml -R false4 -a fence_dummy -o mode=fail -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v true1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v false1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v false2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v false3")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true3")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 3 -v false4")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 4 -v true4")
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
test.add_log_pattern("using true3 returned 0")
test.add_log_pattern("using true4 returned 0")
def build_unfence_on_target_tests(self):
"""Register tests that verify unfencing that runs on the target."""
our_uname = localname()
# verify unfencing using on_target device
test = self.new_test("unfence_on_target_1",
"Verify unfencing with on_target = true")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname}"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("(on) to be executed on target")
# verify failure of unfencing using on_target device
test = self.new_test("unfence_on_target_2",
"Verify failure unfencing with on_target = true")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node_fake_1234"')
test.add_cmd("stonith_admin", args="--output-as=xml -U node_fake_1234 -t 3", expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
# verify unfencing using on_target device with topology
test = self.new_test("unfence_on_target_3",
"Verify unfencing with on_target = true using topology")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node3"')
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 1 -v true1")
test.add_cmd("stonith_admin", args=f"--output-as=xml -r {our_uname} -i 2 -v true2")
test.add_cmd("stonith_admin", args=f"--output-as=xml -U {our_uname} -t 3")
test.add_log_pattern("(on) to be executed on target")
# verify unfencing using on_target device with topology fails when target node doesn't exist
test = self.new_test("unfence_on_target_4",
"Verify unfencing failure with on_target = true using topology")
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node_fake"')
test.add_cmd("stonith_admin",
args=f'--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list={our_uname} node_fake"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -U node_fake -t 3", expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
def build_remap_tests(self):
"""Register tests that verify remapping of reboots to off-on."""
test = self.new_test("remap_simple",
"Verify sequential topology reboot is remapped to all-off-then-all-on")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=1 -o pcmk_reboot_timeout=10')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# fence_dummy sets "on" as an on_target action
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("remap_simple_off",
"Verify sequential topology reboot skips 'on' if "
"pcmk_reboot_action=off or agent doesn't support "
"'on'")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o mode=pass "
"-o pcmk_host_list=node_fake -o pcmk_off_timeout=1 "
"-o pcmk_reboot_timeout=10 -o pcmk_reboot_action=off")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy_no_on "
"-o mode=pass -o pcmk_host_list=node_fake "
"-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20")
test.add_cmd("stonith_admin",
args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# "on" should be skipped
test.add_log_pattern("Not turning node_fake back on using "
"true1 because the device is configured "
"to stay off")
test.add_log_pattern("Not turning node_fake back on using true2"
" because the agent doesn't support 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("remap_automatic",
"Verify remapped topology reboot skips automatic 'on'")
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence '
'-o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence '
'-o "mode=pass" -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'on' action targeting node_fake using",
negative=True)
test.add_log_pattern("'on' failure",
negative=True)
test = self.new_test("remap_complex_1",
"Verify remapped topology reboot in second level works if non-remapped first level fails")
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("remap_complex_2",
"Verify remapped topology reboot failure in second level proceeds to third level")
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v false2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 3 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using false2")
test.add_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'reboot' action targeting node_fake using true2")
test.add_log_pattern("node_fake with true3",
negative=True)
def build_query_tests(self):
"""Run stonith_admin --metadata for the fence_dummy agent and check command output."""
test = self.new_test("get_metadata",
"Run stonith_admin --metadata for the fence_dummy agent")
test.add_cmd("stonith_admin", args="--output-as=xml -a fence_dummy --metadata",
stdout_match='= 240:
self._logger.log(f"Could not determine an offset for IPaddr resources. Upper bound is too high: {self['IPBase']} {last_part}")
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""")
def _filter_nodes(self):
"""
Filter the list of cluster nodes.
If --limit-nodes is given, keep that many nodes from the front of the
list of cluster nodes and drop the rest.
"""
if self["node-limit"] > 0:
if len(self["nodes"]) > self["node-limit"]:
self._logger.log(f"Limiting the number of nodes configured={len(self['nodes'])} "
f"(max={self['node-limit']})")
while len(self["nodes"]) > self["node-limit"]:
self["nodes"].pop(len(self["nodes"]) - 1)
def _validate(self):
"""Check that we were given all required command line parameters."""
if not self["nodes"]:
raise ValueError("No nodes specified!")
def _discover(self):
"""Probe cluster nodes to figure out how to log and manage services."""
self._target = random.Random().choice(self["nodes"])
exerciser = socket.gethostname()
# Use the IP where possible to avoid name lookup failures
for ip in socket.gethostbyname_ex(exerciser)[2]:
if ip != "127.0.0.1":
exerciser = ip
break
self["cts-exerciser"] = exerciser
self._detect_systemd()
self._detect_syslog()
self._detect_at_boot()
self._detect_ip_offset()
def _parse_args(self, argv):
"""
Parse and validate command line parameters.
Set the appropriate values in the environment dictionary. If argv is
None, use sys.argv instead.
"""
if not argv:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(epilog=f"{sys.argv[0]} -g virt1 -r --stonith ssh --schema pacemaker-2.0 500")
grp1 = parser.add_argument_group("Common options")
grp1.add_argument("-g", "--dsh-group", "--group",
metavar="GROUP", dest="group",
help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
grp1.add_argument("-l", "--limit-nodes",
type=int, default=0,
metavar="MAX",
help="Only use the first MAX cluster nodes supplied with --nodes")
grp1.add_argument("--benchmark",
action="store_true",
help="Add timing information")
grp1.add_argument("--list", "--list-tests",
action="store_true", dest="list_tests",
help="List the valid tests")
grp1.add_argument("--nodes",
metavar="NODES",
help="List of cluster nodes separated by whitespace")
grp1.add_argument("--stack",
default="corosync",
metavar="STACK",
help="Which cluster stack is installed")
grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
grp2.add_argument("-L", "--logfile",
metavar="PATH",
help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)")
grp2.add_argument("--at-boot", "--cluster-starts-at-boot",
choices=["1", "0", "yes", "no"],
help="Does the cluster software start at boot time?")
grp2.add_argument("--facility", "--syslog-facility",
default="daemon",
metavar="NAME",
help="Which syslog facility to log to")
grp2.add_argument("--ip", "--test-ip-base",
metavar="IP",
help="Offset for generated IP address resources")
grp3 = parser.add_argument_group("Options for release testing")
grp3.add_argument("-r", "--populate-resources",
action="store_true",
help="Generate a sample configuration")
grp3.add_argument("--choose",
metavar="NAME",
help="Run only the named tests, separated by whitespace")
grp3.add_argument("--fencing", "--stonith",
choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
default="1",
help="What fencing agent to use")
grp3.add_argument("--once",
action="store_true",
help="Run all valid tests once")
grp4 = parser.add_argument_group("Additional (less common) options")
grp4.add_argument("-c", "--clobber-cib",
action="store_true",
help="Erase any existing configuration")
grp4.add_argument("-y", "--yes",
action="store_true", dest="always_continue",
help="Continue to run whenever prompted")
grp4.add_argument("--boot",
action="store_true",
help="")
grp4.add_argument("--cib-filename",
metavar="PATH",
help="Install the given CIB file to the cluster")
grp4.add_argument("--experimental-tests",
action="store_true",
help="Include experimental tests")
grp4.add_argument("--loop-minutes",
type=int, default=60,
help="")
grp4.add_argument("--no-loop-tests",
action="store_true",
help="Don't run looping/time-based tests")
grp4.add_argument("--no-unsafe-tests",
action="store_true",
help="Don't run tests that are unsafe for use with ocfs2/drbd")
grp4.add_argument("--notification-agent",
metavar="PATH",
default="/var/lib/pacemaker/notify.sh",
help="Script to configure for Pacemaker alerts")
grp4.add_argument("--notification-recipient",
metavar="R",
default="/var/lib/pacemaker/notify.log",
help="Recipient to pass to alert script")
grp4.add_argument("--oprofile",
metavar="NODES",
help="List of cluster nodes to run oprofile on")
grp4.add_argument("--outputfile",
metavar="PATH",
help="Location to write logs to")
grp4.add_argument("--qarsh",
action="store_true",
help="Use QARSH to access nodes instead of SSH")
grp4.add_argument("--schema",
metavar="SCHEMA",
default=f"pacemaker-{BuildOptions.CIB_SCHEMA_VERSION}",
help="Create a CIB conforming to the given schema")
grp4.add_argument("--seed",
metavar="SEED",
help="Use the given string as the random number seed")
grp4.add_argument("--set",
action="append",
metavar="ARG",
default=[],
help="Set key=value pairs (can be specified multiple times)")
grp4.add_argument("--stonith-args",
metavar="ARGS",
default="hostlist=all,livedangerously=yes",
help="")
grp4.add_argument("--stonith-type",
metavar="TYPE",
default="external/ssh",
help="")
grp4.add_argument("--trunc",
action="store_true", dest="truncate",
help="Truncate log file before starting")
grp4.add_argument("--valgrind-procs",
metavar="PROCS",
default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd",
help="Run valgrind against the given space-separated list of processes")
grp4.add_argument("--warn-inactive",
action="store_true",
help="Warn if a resource is assigned to an inactive node")
parser.add_argument("iterations",
nargs='?',
type=int, default=1,
help="Number of tests to run")
args = parser.parse_args(args=argv)
# Set values on this object based on what happened with command line
# processing. This has to be done in several blocks.
# These values can always be set. They get a default from the add_argument
# calls, only do one thing, and they do not have any side effects.
self["ClobberCIB"] = args.clobber_cib
self["ListTests"] = args.list_tests
self["Schema"] = args.schema
self["Stack"] = args.stack
self["SyslogFacility"] = args.facility
self["TruncateLog"] = args.truncate
self["at-boot"] = args.at_boot in ["1", "yes"]
self["benchmark"] = args.benchmark
self["continue"] = args.always_continue
self["experimental-tests"] = args.experimental_tests
self["iterations"] = args.iterations
self["loop-minutes"] = args.loop_minutes
self["loop-tests"] = not args.no_loop_tests
self["notification-agent"] = args.notification_agent
self["notification-recipient"] = args.notification_recipient
self["node-limit"] = args.limit_nodes
self["stonith-params"] = args.stonith_args
self["stonith-type"] = args.stonith_type
self["unsafe-tests"] = not args.no_unsafe_tests
self["valgrind-procs"] = args.valgrind_procs
self["warn-inactive"] = args.warn_inactive
# Nodes and groups are mutually exclusive, so their defaults cannot be
# set in their add_argument calls. Additionally, groups does more than
# just set a value. Here, set nodes first and then if a group is
# specified, override the previous nodes value.
if args.nodes:
self["nodes"] = args.nodes.split(" ")
else:
self["nodes"] = []
if args.group:
self["OutputFile"] = f"{os.environ['HOME']}/cluster-{args.dsh_group}.log"
LogFactory().add_file(self["OutputFile"], "CTS")
dsh_file = f"{os.environ['HOME']}/.dsh/group/{args.dsh_group}"
if os.path.isfile(dsh_file):
self["nodes"] = []
with open(dsh_file, "r", encoding="utf-8") as f:
for line in f:
stripped = line.strip()
if not stripped.startswith('#'):
self["nodes"].append(stripped)
else:
print(f"Unknown DSH group: {args.dsh_group}")
# Everything else either can't have a default set in an add_argument
# call (likely because we don't want to always have a value set for it)
# or it does something fancier than just set a single value. However,
# order does not matter for these as long as the user doesn't provide
# conflicting arguments on the command line. So just do Everything
# alphabetically.
if args.boot:
self["scenario"] = "boot"
if args.cib_filename:
self["CIBfilename"] = args.cib_filename
else:
self["CIBfilename"] = None
if args.choose:
self["scenario"] = "sequence"
self["tests"].extend(args.choose.split())
self["iterations"] = len(self["tests"])
if args.fencing:
if args.fencing in ["0", "no"]:
self["DoFencing"] = False
else:
self["DoFencing"] = True
if args.fencing in ["rhcs", "virt", "xvm"]:
self["stonith-type"] = "fence_xvm"
elif args.fencing == "scsi":
self["stonith-type"] = "fence_scsi"
elif args.fencing in ["lha", "ssh"]:
self["stonith-params"] = "hostlist=all,livedangerously=yes"
self["stonith-type"] = "external/ssh"
elif args.fencing == "openstack":
self["stonith-type"] = "fence_openstack"
print("Obtaining OpenStack credentials from the current environment")
region = os.environ['OS_REGION_NAME']
tenant = os.environ['OS_TENANT_NAME']
auth = os.environ['OS_AUTH_URL']
user = os.environ['OS_USERNAME']
password = os.environ['OS_PASSWORD']
self["stonith-params"] = f"region={region},tenant={tenant},auth={auth},user={user},password={password}"
elif args.fencing == "rhevm":
self["stonith-type"] = "fence_rhevm"
print("Obtaining RHEV-M credentials from the current environment")
user = os.environ['RHEVM_USERNAME']
password = os.environ['RHEVM_PASSWORD']
server = os.environ['RHEVM_SERVER']
port = os.environ['RHEVM_PORT']
self["stonith-params"] = f"login={user},passwd={password},ipaddr={server},ipport={port},ssl=1,shell_timeout=10"
if args.ip:
self["CIBResource"] = True
self["ClobberCIB"] = True
self["IPBase"] = args.ip
if args.logfile == "journal":
self["LogAuditDisabled"] = True
self["log_kind"] = LogKind.JOURNAL
elif args.logfile:
self["LogAuditDisabled"] = True
self["LogFileName"] = args.logfile
self["log_kind"] = LogKind.REMOTE_FILE
else:
# We can't set this as the default on the parser.add_argument call
# for this option because then args.logfile will be set, which means
# the above branch will be taken and those other values will also be
# set.
self["LogFileName"] = "/var/log/messages"
if args.once:
self["scenario"] = "all-once"
if args.oprofile:
self["oprofile"] = args.oprofile.split(" ")
else:
self["oprofile"] = []
if args.outputfile:
self["OutputFile"] = args.outputfile
LogFactory().add_file(self["OutputFile"])
if args.populate_resources:
self["CIBResource"] = True
self["ClobberCIB"] = True
if args.qarsh:
self._rsh.enable_qarsh()
for kv in args.set:
(name, value) = kv.split("=")
self[name] = value
print(f"Setting {name} = {value}")
class EnvFactory:
"""A class for constructing a singleton instance of an Environment object."""
instance = None
# pylint: disable=invalid-name
def getInstance(self, args=None):
"""
Return the previously created instance of Environment.
If no instance exists, create a new instance and return that.
"""
if not EnvFactory.instance:
EnvFactory.instance = Environment(args)
return EnvFactory.instance
+
+
+def set_cts_path(extra=None):
+ """Set the PATH environment variable appropriately for the tests."""
+ new_path = os.environ['PATH']
+
+ # Add any search paths given on the command line
+ if extra is not None:
+ for p in extra:
+ new_path = f"{p}:{new_path}"
+
+ cwd = os.getcwd()
+
+ if os.path.exists(f"{cwd}/cts/cts-attrd.in"):
+ # pylint: disable=protected-access
+ print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR}")
+
+ for d in glob(f"{BuildOptions._BUILD_DIR}/daemons/*/"):
+ new_path = f"{d}:{new_path}"
+
+ new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}"
+ new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}"
+
+ print(f"Using local schemas from: {cwd}/xml")
+ os.environ["PCMK_schema_directory"] = f"{cwd}/xml"
+
+ else:
+ print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {cwd})")
+ new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
+ os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR
+
+ print(f'Using PATH="{new_path}"')
+ os.environ['PATH'] = new_path