diff --git a/cts/cts-attrd.in b/cts/cts-attrd.in
index 5037e91d02..fafb162e37 100644
--- a/cts/cts-attrd.in
+++ b/cts/cts-attrd.in
@@ -1,366 +1,366 @@
#!@PYTHON@
"""Regression tests for Pacemaker's attribute daemon."""
# pylint doesn't like the module name "cts-attrd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import subprocess
import sys
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
TEST_DIR = sys.path[0]
def update_path():
"""Set the PATH environment variable appropriately for the tests."""
new_path = os.environ['PATH']
if os.path.exists("%s/cts-attrd.in" % TEST_DIR):
# pylint: disable=protected-access
print("Running tests from the source tree: %s (%s)" % (BuildOptions._BUILD_DIR, TEST_DIR))
# For pacemaker-attrd
new_path = "%s/daemons/attrd:%s" % (BuildOptions._BUILD_DIR, new_path)
else:
print("Running tests from the install tree: %s (not %s)" % (BuildOptions.DAEMON_DIR, TEST_DIR))
# For pacemaker-attrd
new_path = "%s:%s" % (BuildOptions.DAEMON_DIR, new_path)
print('Using PATH="%s"' % new_path)
os.environ['PATH'] = new_path
class AttributeTest(Test):
"""Executor for a single test."""
def __init__(self, name, description, **kwargs):
"""
Create a new AttributeTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
"""
Test.__init__(self, name, description, **kwargs)
self._daemon_location = "pacemaker-attrd"
self._enable_corosync = True
def _kill_daemons(self):
killall([self._daemon_location])
def _start_daemons(self):
if self.verbose:
print("Starting %s" % self._daemon_location)
cmd = [self._daemon_location, "-s", "-l", self.logpath]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
class AttributeTests(Tests):
"""Collection of all attribute regression tests."""
def __init__(self, **kwargs):
"""Create a new AttributeTests instance."""
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-attrd")
def new_test(self, name, description):
"""Create a named test."""
test = AttributeTest(name, description, verbose=self.verbose, logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self, use_corosync):
"""Prepare the host before executing any tests."""
if use_corosync:
self._corosync.start(kill_first=True)
def cleanup_environment(self, use_corosync):
"""Clean up the host after executing desired tests."""
if use_corosync:
self._corosync.stop()
def build_basic_tests(self):
"""Add basic tests - setting, querying, updating, and deleting attributes."""
test = self.new_test("set_attr_1",
"Set and query an attribute")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="111"')
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
# Setting the delay on an attribute that doesn't exist fails, but the failure is
# not passed back to attrd_updater.
test = self.new_test("set_attr_2",
"Set an attribute's delay")
test.add_cmd("attrd_updater", args="--name AAA -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Processed update-delay request from client .*: Error \(Attribute AAA does not exist\)",
regex=True)
test = self.new_test("set_attr_3",
"Set and query an attribute's delay and value")
test.add_cmd("attrd_updater", args="--name AAA -B 111 -d 5 --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="111"')
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111 \| from .* with 5s write delay",
regex=True)
test = self.new_test("set_attr_4",
"Update an attribute that does not exist with a delay")
test.add_cmd("attrd_updater", args="--name BBB -U 999 -d 10 --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="999"')
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 999 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_1",
"Update an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 333 --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="333"')
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("update_attr_2",
"Update an attribute using a delay other than its default")
test.add_cmd("attrd_updater", args="--name BBB -U 777 -d 10 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 888 -d 7 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 777 -> 888 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_delay_1",
"Update the delay of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test = self.new_test("update_attr_delay_2",
"Update the delay and value of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -B 333 -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("missing_attr_1",
"Query an attribute that does not exist")
test.add_cmd_expected_fail("attrd_updater", args="--name NOSUCH --output-as=xml",
- exitcode=ExitStatus.CONFIG)
+ expected_exitcode=ExitStatus.CONFIG)
test = self.new_test("delete_attr_1",
"Delete an existing attribute")
test.add_cmd("attrd_updater", args="--name CCC -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name CCC -D --output-as=xml")
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
test = self.new_test("missing_attr_2",
"Delete an attribute that does not exist")
test.add_cmd("attrd_updater", args="--name NOSUCH2 -D --output-as=xml")
test = self.new_test("attr_in_set_1",
"Set and query an attribute in a specific set")
test.add_cmd("attrd_updater", args="--name DDD -U 555 --set=foo --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name DDD -Q --output-as=xml",
stdout_match='name="DDD" value="555"')
test.add_log_pattern("Processed 1 private change for DDD (set foo)")
def build_multiple_query_tests(self):
"""Add tests that set and query an attribute across multiple nodes."""
# NOTE: These tests make use of the fact that nothing in attrd actually
# cares about whether a node exists when you set or query an attribute.
# It just keeps creating new hash tables for each node you ask it about.
test = self.new_test("multi_query_1",
"Query an attribute set across multiple nodes")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --node cluster1 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -U 222 --node cluster2 --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -QA --output-as=xml",
stdout_match=r'\n.*')
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --node=cluster1 --output-as=xml",
stdout_match='')
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
stdout_match='')
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -QA --output-as=xml",
stdout_match=r'\n.*',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
stdout_match='',
env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
def build_regex_tests(self):
"""Add tests that use regexes."""
test = self.new_test("regex_update_1",
"Update attributes using a regex")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd("attrd_updater", args="--name ABB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'A.*' -U 333 --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="333"')
test.add_cmd_check_stdout("attrd_updater", args="--name ABB -Q --output-as=xml",
stdout_match='name="ABB" value="333"')
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: 111 -> 333",
regex=True)
test = self.new_test("regex_delete_1",
"Delete attributes using a regex")
test.add_cmd("attrd_updater", args="--name XAX -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name XBX -U 555 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'X[A|B]X' -D --output-as=xml")
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: \(unset\) -> 555",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: 555 -> \(unset\)",
regex=True)
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
def build_utilization_tests(self):
"""Add tests that involve utilization attributes."""
test = self.new_test("utilization_1",
"Set and query a utilization attribute")
test.add_cmd("attrd_updater", args="--name AAA -U ABC -z --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="ABC"')
test.add_log_pattern(r"Setting AAA\[.*\] in utilization: \(unset\) -> ABC",
regex=True)
def build_sync_point_tests(self):
"""Add tests that involve sync points."""
test = self.new_test("local_sync_point",
"Wait for a local sync point")
test.add_cmd("attrd_updater", args="--name AAA -U 123 --wait=local --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
stdout_match='name="AAA" value="123"')
test.add_log_pattern(r"Alerting client .* for reached local sync point",
regex=True)
test = self.new_test("cluster_sync_point",
"Wait for a cluster-wide sync point")
test.add_cmd("attrd_updater", args="--name BBB -U 456 --wait=cluster --output-as=xml")
test.add_cmd_check_stdout("attrd_updater", args="--name BBB -Q --output-as=xml",
stdout_match='name="BBB" value="456"')
test.add_log_pattern(r"Alerting client .* for reached cluster sync point",
regex=True)
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-attrd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
"\t " + sys.argv[0] + " --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
"\t " + sys.argv[0] + " --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run attrd regression tests as specified by arguments."""
update_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
exit_if_proc_running("pacemaker-attrd")
# Create a temporary directory for log files (the directory and its
# contents will automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-attrd-") as logdir:
tests = AttributeTests(verbose=opts.verbose, logdir=logdir)
tests.build_basic_tests()
tests.build_multiple_query_tests()
tests.build_regex_tests()
tests.build_utilization_tests()
tests.build_sync_point_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
try:
tests.setup_environment(True)
except TimeoutError:
print("corosync did not start in time, exiting")
sys.exit(ExitStatus.TIMEOUT)
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(True)
tests.exit()
if __name__ == "__main__":
main()
diff --git a/cts/cts-exec.in b/cts/cts-exec.in
index 3f33c6b1e6..3f6579013b 100644
--- a/cts/cts-exec.in
+++ b/cts/cts-exec.in
@@ -1,983 +1,989 @@
#!@PYTHON@
"""Regression tests for Pacemaker's pacemaker-execd."""
# pylint doesn't like the module name "cts-execd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2012-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import stat
import sys
import subprocess
import shutil
import tempfile
# Where to find test binaries
# Prefer the source tree if available
TEST_DIR = sys.path[0]
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.process import killall, exit_if_proc_running, stdout_from_command
from pacemaker._cts.test import Test, Tests
# File permissions for executable scripts we create
EXECMODE = stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH
def update_path():
# pylint: disable=protected-access
"""Set the PATH environment variable appropriately for the tests."""
new_path = os.environ['PATH']
if os.path.exists("%s/cts-exec.in" % TEST_DIR):
print("Running tests from the source tree: %s (%s)" % (BuildOptions._BUILD_DIR, TEST_DIR))
# For pacemaker-execd, cts-exec-helper, and pacemaker-remoted
new_path = "%s/daemons/execd:%s" % (BuildOptions._BUILD_DIR, new_path)
new_path = "%s/tools:%s" % (BuildOptions._BUILD_DIR, new_path) # For crm_resource
# For pacemaker-fenced
new_path = "%s/daemons/fenced:%s" % (BuildOptions._BUILD_DIR, new_path)
# For cts-support
new_path = "%s/cts/support:%s" % (BuildOptions._BUILD_DIR, new_path)
else:
print("Running tests from the install tree: %s (not %s)" % (BuildOptions.DAEMON_DIR, TEST_DIR))
# For cts-exec-helper, cts-support, pacemaker-execd, pacemaker-fenced,
# and pacemaker-remoted
new_path = "%s:%s" % (BuildOptions.DAEMON_DIR, new_path)
print('Using PATH="%s"' % new_path)
os.environ['PATH'] = new_path
class ExecTest(Test):
"""Executor for a single pacemaker-execd regression test."""
def __init__(self, name, description, **kwargs):
"""Create a new ExecTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
Keyword arguments:
tls -- Enable pacemaker-remoted.
"""
Test.__init__(self, name, description, **kwargs)
self.tls = kwargs.get("tls", False)
if self.tls:
self._daemon_location = "pacemaker-remoted"
else:
self._daemon_location = "pacemaker-execd"
self._test_tool_location = "cts-exec-helper"
# We additionally need to keep track of a stonith process.
self._stonith_process = None
def _new_cmd(self, cmd, args, exitcode, **kwargs):
"""Add a command to be executed as part of this test."""
if self.verbose and cmd == self._test_tool_location:
args += " -V "
if (cmd == self._test_tool_location) and self.tls:
args += " -S "
kwargs["validate"] = False
kwargs["check_rng"] = False
kwargs["check_stderr"] = False
Test._new_cmd(self, cmd, args, exitcode, **kwargs)
def _kill_daemons(self):
killall([
"pacemaker-fenced",
"lt-pacemaker-fenced",
"pacemaker-execd",
"lt-pacemaker-execd",
"cts-exec-helper",
"lt-cts-exec-helper",
"pacemaker-remoted",
])
def _start_daemons(self):
if not self.tls:
# pylint: disable=consider-using-with
self._stonith_process = subprocess.Popen(["pacemaker-fenced", "-s"])
cmd = [self._daemon_location, "-l", self.logpath]
if self.verbose:
cmd += ["-V"]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
def clean_environment(self):
"""Clean up the host after running a test."""
if self._daemon_process:
self._daemon_process.terminate()
self._daemon_process.wait()
if self.verbose:
print("Daemon Output Start")
with open(self.logpath, "rt", errors="replace", encoding="utf-8") as logfile:
for line in logfile:
print(line.strip())
print("Daemon Output End")
if self._stonith_process:
self._stonith_process.terminate()
self._stonith_process.wait()
self._daemon_process = None
self._stonith_process = None
def add_cmd(self, cmd=None, **kwargs):
"""Add a cts-exec-helper command to be executed as part of this test."""
if cmd is None:
cmd = self._test_tool_location
self._new_cmd(cmd, kwargs.pop("args"), ExitStatus.OK, **kwargs)
def add_cmd_and_kill(self, cmd=None, **kwargs):
"""Add a cts-exec-helper command and system command to be executed as part of this test."""
if cmd is None:
cmd = self._test_tool_location
self._new_cmd(cmd, kwargs.pop("args"), ExitStatus.OK, **kwargs)
def add_cmd_check_stdout(self, cmd=None, **kwargs):
"""Add a command with expected output to be executed as part of this test."""
if cmd is None:
cmd = self._test_tool_location
self._new_cmd(cmd, kwargs.pop("args"), ExitStatus.OK, **kwargs)
def add_cmd_expected_fail(self, cmd=None, **kwargs):
"""Add a cts-exec-helper command to be executed as part of this test and expected to fail."""
if cmd is None:
cmd = self._test_tool_location
- self._new_cmd(cmd, kwargs.pop("args"), kwargs.pop("exitcode", ExitStatus.ERROR), **kwargs)
+ self._new_cmd(cmd, kwargs.pop("args"), kwargs.pop("expected_exitcode", ExitStatus.ERROR), **kwargs)
def add_sys_cmd(self, cmd, args):
"""Add a simple command to be executed as part of this test."""
self._new_cmd(cmd, args, ExitStatus.OK)
def run(self):
"""Execute this test."""
if self.tls and self.name.count("stonith") != 0:
self._result_txt = "SKIPPED - '%s' - disabled when testing pacemaker_remote" % (self.name)
print(self._result_txt)
return
Test.run(self)
class ExecTests(Tests):
"""Collection of all pacemaker-execd regression tests."""
def __init__(self, **kwargs):
"""
Create a new ExecTests instance.
Keyword arguments:
tls -- Enable pacemaker-remoted.
"""
Tests.__init__(self, **kwargs)
self.tls = kwargs.get("tls", False)
self._action_timeout = " -t 9000 "
self._installed_files = []
self._rsc_classes = self._setup_rsc_classes()
print("Testing resource classes %r" % self._rsc_classes)
self._common_cmds = {
"ocf_reg_line": '-c register_rsc -r ocf_test_rsc ' + self._action_timeout + ' -C ocf -P pacemaker -T Dummy',
"ocf_reg_event": '-l "NEW_EVENT event_type:register rsc_id:ocf_test_rsc action:none rc:ok op_status:complete"',
"ocf_unreg_line": '-c unregister_rsc -r ocf_test_rsc ' + self._action_timeout,
"ocf_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:ocf_test_rsc action:none rc:ok op_status:complete"',
"ocf_start_line": '-c exec -r ocf_test_rsc -a start ' + self._action_timeout,
"ocf_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:start rc:ok op_status:complete" ',
"ocf_stop_line": '-c exec -r ocf_test_rsc -a stop ' + self._action_timeout,
"ocf_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:stop rc:ok op_status:complete" ',
"ocf_monitor_line": '-c exec -r ocf_test_rsc -a monitor -i 2s ' + self._action_timeout,
"ocf_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
"ocf_cancel_line": '-c cancel -r ocf_test_rsc -a monitor -i 2s ' + self._action_timeout,
"ocf_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:ocf_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"systemd_reg_line": '-c register_rsc -r systemd_test_rsc ' + self._action_timeout + ' -C systemd -T pacemaker-cts-dummyd@3',
"systemd_reg_event": '-l "NEW_EVENT event_type:register rsc_id:systemd_test_rsc action:none rc:ok op_status:complete"',
"systemd_unreg_line": '-c unregister_rsc -r systemd_test_rsc ' + self._action_timeout,
"systemd_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:systemd_test_rsc action:none rc:ok op_status:complete"',
"systemd_start_line": '-c exec -r systemd_test_rsc -a start ' + self._action_timeout,
"systemd_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:start rc:ok op_status:complete" ',
"systemd_stop_line": '-c exec -r systemd_test_rsc -a stop ' + self._action_timeout,
"systemd_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:stop rc:ok op_status:complete" ',
"systemd_monitor_line": '-c exec -r systemd_test_rsc -a monitor -i 2s ' + self._action_timeout,
"systemd_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:monitor rc:ok op_status:complete" -t 15000 ',
"systemd_cancel_line": '-c cancel -r systemd_test_rsc -a monitor -i 2s ' + self._action_timeout,
"systemd_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:systemd_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"upstart_reg_line": '-c register_rsc -r upstart_test_rsc ' + self._action_timeout + ' -C upstart -T pacemaker-cts-dummyd',
"upstart_reg_event": '-l "NEW_EVENT event_type:register rsc_id:upstart_test_rsc action:none rc:ok op_status:complete"',
"upstart_unreg_line": '-c unregister_rsc -r upstart_test_rsc ' + self._action_timeout,
"upstart_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:upstart_test_rsc action:none rc:ok op_status:complete"',
"upstart_start_line": '-c exec -r upstart_test_rsc -a start ' + self._action_timeout,
"upstart_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:upstart_test_rsc action:start rc:ok op_status:complete" ',
"upstart_stop_line": '-c exec -r upstart_test_rsc -a stop ' + self._action_timeout,
"upstart_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:upstart_test_rsc action:stop rc:ok op_status:complete" ',
"upstart_monitor_line": '-c exec -r upstart_test_rsc -a monitor -i 2s ' + self._action_timeout,
"upstart_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:upstart_test_rsc action:monitor rc:ok op_status:complete" -t 15000',
"upstart_cancel_line": '-c cancel -r upstart_test_rsc -a monitor -i 2s ' + self._action_timeout,
"upstart_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:upstart_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"service_reg_line": '-c register_rsc -r service_test_rsc ' + self._action_timeout + ' -C service -T LSBDummy',
"service_reg_event": '-l "NEW_EVENT event_type:register rsc_id:service_test_rsc action:none rc:ok op_status:complete"',
"service_unreg_line": '-c unregister_rsc -r service_test_rsc ' + self._action_timeout,
"service_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:service_test_rsc action:none rc:ok op_status:complete"',
"service_start_line": '-c exec -r service_test_rsc -a start ' + self._action_timeout,
"service_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:start rc:ok op_status:complete" ',
"service_stop_line": '-c exec -r service_test_rsc -a stop ' + self._action_timeout,
"service_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:stop rc:ok op_status:complete" ',
"service_monitor_line": '-c exec -r service_test_rsc -a monitor -i 2s ' + self._action_timeout,
"service_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
"service_cancel_line": '-c cancel -r service_test_rsc -a monitor -i 2s ' + self._action_timeout,
"service_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:service_test_rsc action:monitor rc:ok op_status:Cancelled" ',
"lsb_reg_line": '-c register_rsc -r lsb_test_rsc ' + self._action_timeout + ' -C lsb -T LSBDummy',
"lsb_reg_event": '-l "NEW_EVENT event_type:register rsc_id:lsb_test_rsc action:none rc:ok op_status:complete" ',
"lsb_unreg_line": '-c unregister_rsc -r lsb_test_rsc ' + self._action_timeout,
"lsb_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:lsb_test_rsc action:none rc:ok op_status:complete"',
"lsb_start_line": '-c exec -r lsb_test_rsc -a start ' + self._action_timeout,
"lsb_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:start rc:ok op_status:complete" ',
"lsb_stop_line": '-c exec -r lsb_test_rsc -a stop ' + self._action_timeout,
"lsb_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:stop rc:ok op_status:complete" ',
"lsb_monitor_line": '-c exec -r lsb_test_rsc -a status -i 2s ' + self._action_timeout,
"lsb_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:complete" ' + self._action_timeout,
"lsb_cancel_line": '-c cancel -r lsb_test_rsc -a status -i 2s ' + self._action_timeout,
"lsb_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:lsb_test_rsc action:status rc:ok op_status:Cancelled" ',
"stonith_reg_line": '-c register_rsc -r stonith_test_rsc ' + self._action_timeout + ' -C stonith -P pacemaker -T fence_dummy',
"stonith_reg_event": '-l "NEW_EVENT event_type:register rsc_id:stonith_test_rsc action:none rc:ok op_status:complete" ',
"stonith_unreg_line": '-c unregister_rsc -r stonith_test_rsc ' + self._action_timeout,
"stonith_unreg_event": '-l "NEW_EVENT event_type:unregister rsc_id:stonith_test_rsc action:none rc:ok op_status:complete"',
"stonith_start_line": '-c exec -r stonith_test_rsc -a start ' + self._action_timeout,
"stonith_start_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:start rc:ok op_status:complete" ',
"stonith_stop_line": '-c exec -r stonith_test_rsc -a stop ' + self._action_timeout,
"stonith_stop_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:stop rc:ok op_status:complete" ',
"stonith_monitor_line": '-c exec -r stonith_test_rsc -a monitor -i 2s ' + self._action_timeout,
"stonith_monitor_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
"stonith_cancel_line": '-c cancel -r stonith_test_rsc -a monitor -i 2s ' + self._action_timeout,
"stonith_cancel_event": '-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:Cancelled" ',
}
def _setup_rsc_classes(self):
"""Determine which resource classes are supported."""
classes = stdout_from_command(["crm_resource", "--list-standards"])
# Strip trailing empty line
classes = classes[:-1]
if self.tls:
classes.remove("stonith")
if "nagios" in classes:
classes.remove("nagios")
if "systemd" in classes:
try:
# This code doesn't need this import, but pacemaker-cts-dummyd
# does, so ensure the dependency is available rather than cause
# all systemd tests to fail.
# pylint: disable=import-outside-toplevel,unused-import
import systemd.daemon
except ImportError:
print("Python systemd bindings not found.")
print("The tests for systemd class are not going to be run.")
classes.remove("systemd")
return classes
def new_test(self, name, description):
"""Create a named test."""
test = ExecTest(name, description, verbose=self.verbose, tls=self.tls,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self):
"""Prepare the host before executing any tests."""
if BuildOptions.REMOTE_ENABLED:
os.system("service pacemaker_remote stop")
self.cleanup_environment()
if self.tls and not os.path.isfile("/etc/pacemaker/authkey"):
print("Installing /etc/pacemaker/authkey ...")
os.system("mkdir -p /etc/pacemaker")
os.system("dd if=/dev/urandom of=/etc/pacemaker/authkey bs=4096 count=1")
self._installed_files.append("/etc/pacemaker/authkey")
# If we're in build directory, install agents if not already installed
# pylint: disable=protected-access
if os.path.exists("%s/cts/cts-exec.in" % BuildOptions._BUILD_DIR):
if not os.path.exists("%s/pacemaker" % BuildOptions.OCF_RA_INSTALL_DIR):
# @TODO remember which components were created and remove them
os.makedirs("%s/pacemaker" % BuildOptions.OCF_RA_INSTALL_DIR, 0o755)
for agent in ["Dummy", "Stateful", "ping"]:
agent_source = "%s/extra/resources/%s" % (BuildOptions._BUILD_DIR, agent)
agent_dest = "%s/pacemaker/%s" % (BuildOptions.OCF_RA_INSTALL_DIR, agent)
if not os.path.exists(agent_dest):
print("Installing %s ..." % agent_dest)
shutil.copyfile(agent_source, agent_dest)
os.chmod(agent_dest, EXECMODE)
self._installed_files.append(agent_dest)
subprocess.call(["cts-support", "install"])
def cleanup_environment(self):
"""Clean up the host after executing desired tests."""
for installed_file in self._installed_files:
print("Removing %s ..." % installed_file)
os.remove(installed_file)
subprocess.call(["cts-support", "uninstall"])
def _build_cmd_str(self, rsc, ty):
"""Construct a command string for the given resource and type."""
return "%s %s" % (self._common_cmds["%s_%s_line" % (rsc, ty)], self._common_cmds["%s_%s_event" % (rsc, ty)])
def build_generic_tests(self):
"""Register tests that apply to all resource classes."""
common_cmds = self._common_cmds
# register/unregister tests
for rsc in self._rsc_classes:
test = self.new_test("generic_registration_%s" % rsc,
"Simple resource registration test for %s standard" % rsc)
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# start/stop tests
for rsc in self._rsc_classes:
test = self.new_test("generic_start_stop_%s" % rsc, "Simple start and stop test for %s standard" % rsc)
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# monitor cancel test
for rsc in self._rsc_classes:
test = self.new_test("generic_monitor_cancel_%s" % rsc,
"Simple monitor cancel test for %s standard" % rsc)
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
# If this happens the monitor did not actually cancel correctly
- test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc], exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc],
+ expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
- test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc], exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc],
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# monitor duplicate test
for rsc in self._rsc_classes:
test = self.new_test("generic_monitor_duplicate_%s" % rsc,
"Test creation and canceling of duplicate monitors for %s standard" % rsc)
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
# Add the duplicate monitors
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# verify we still get update events
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
# cancel the monitor, if the duplicate merged with the original, we should no longer see monitor updates
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
# If this happens the monitor did not actually cancel correctly
- test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc], exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc],
+ expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
- test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc], exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc],
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
# stop implies cancel test
for rsc in self._rsc_classes:
test = self.new_test("generic_stop_implies_cancel_%s" % rsc,
"Verify stopping a resource implies cancel of recurring ops for %s standard" % rsc)
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
# If this fails, that means the monitor may not be getting rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
# If this happens the monitor did not actually cancel correctly
- test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc], exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc],
+ expected_exitcode=ExitStatus.TIMEOUT)
# If this happens the monitor did not actually cancel correctly
- test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc], exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail(args=common_cmds["%s_monitor_event" % rsc],
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
def build_multi_rsc_tests(self):
"""Register complex tests that involve managing multiple resouces of different types."""
common_cmds = self._common_cmds
# do not use service and systemd at the same time, it is the same resource.
# register start monitor stop unregister resources of each type at the same time
test = self.new_test("multi_rsc_start_stop_all",
"Start, monitor, and stop resources of multiple types and classes")
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "reg"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "start"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "monitor"))
for rsc in self._rsc_classes:
# If this fails, that means the monitor is not being rescheduled
test.add_cmd(args=common_cmds["%s_monitor_event" % rsc])
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "cancel"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "stop"))
for rsc in self._rsc_classes:
test.add_cmd(args=self._build_cmd_str(rsc, "unreg"))
def build_negative_tests(self):
"""Register tests related to how pacemaker-execd handles failures."""
# ocf start timeout test
test = self.new_test("ocf_start_timeout", "Force start timeout to occur, verify start failure.")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# -t must be less than self._action_timeout
test.add_cmd(args='-c exec -r test_rsc -a start -k op_sleep -v 5 -t 1000 -w')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:error op_status:Timed Out" '
+ self._action_timeout)
test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:complete" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# stonith start timeout test
test = self.new_test("stonith_start_timeout", "Force start timeout to occur, verify start failure.")
test.add_cmd(args='-c register_rsc -r test_rsc -C stonith -P pacemaker -T fence_dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete"')
# -t must be less than self._action_timeout
test.add_cmd(args='-c exec -r test_rsc -a start -k monitor_delay -v 30 -t 1000 -w')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:error op_status:Timed Out" '
+ self._action_timeout)
test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:complete" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# stonith component fail
test = self.new_test("stonith_component_fail", "Kill stonith component after pacemaker-execd connects")
test.add_cmd(args=self._build_cmd_str("stonith", "reg"))
test.add_cmd(args=self._build_cmd_str("stonith", "start"))
test.add_cmd(args='-c exec -r stonith_test_rsc -a monitor -i 600s '
'-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:ok op_status:complete" '
+ self._action_timeout)
test.add_cmd_and_kill(args='-l "NEW_EVENT event_type:exec_complete rsc_id:stonith_test_rsc action:monitor rc:error op_status:error" -t 15000',
kill="killall -9 -q pacemaker-fenced lt-pacemaker-fenced")
test.add_cmd(args=self._build_cmd_str("stonith", "unreg"))
# monitor fail for ocf resources
test = self.new_test("monitor_fail_ocf", "Force ocf monitor to fail, verify failure is reported.")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete"')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete"'
+ self._action_timeout)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete"'
+ self._action_timeout)
test.add_cmd_and_kill(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete" ' + self._action_timeout,
kill="rm -f %s/run/Dummy-test_rsc.state" % BuildOptions.LOCAL_STATE_DIR)
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete" '
- + self._action_timeout, exitcode=ExitStatus.TIMEOUT)
+ + self._action_timeout, expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" '
- + self._action_timeout, exitcode=ExitStatus.TIMEOUT)
+ + self._action_timeout, expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# verify notify changes only for monitor operation
test = self.new_test("monitor_changes_only", "Verify when flag is set, only monitor changes are notified.")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout + ' -o '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
+ ' -o -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd_and_kill(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete"' + self._action_timeout,
kill='rm -f %s/run/Dummy-test_rsc.state' % BuildOptions.LOCAL_STATE_DIR)
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete"')
# monitor fail for systemd resource
if "systemd" in self._rsc_classes:
test = self.new_test("monitor_fail_systemd", "Force systemd monitor to fail, verify failure is reported..")
test.add_cmd(args='-c register_rsc -r test_rsc -C systemd -T pacemaker-cts-dummyd@3 ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd_and_kill(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete"' + self._action_timeout,
kill="pkill -9 -f pacemaker-cts-dummyd")
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# monitor fail for upstart resource
if "upstart" in self._rsc_classes:
test = self.new_test("monitor_fail_upstart", "Force upstart monitor to fail, verify failure is reported")
test.add_cmd(args='-c register_rsc -r test_rsc -C upstart -T pacemaker-cts-dummyd ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd_and_kill(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete"' + self._action_timeout,
kill='killall -9 -q dd')
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Cancel non-existent operation on a resource
test = self.new_test("cancel_non_existent_op", "Attempt to cancel the wrong monitor operation, verify expected failure")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ')
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout)
# interval is wrong, should fail
test.add_cmd_expected_fail(args='-c cancel -r test_rsc -a monitor -i 2s' + self._action_timeout
+ ' -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
# action name is wrong, should fail
test.add_cmd_expected_fail(args='-c cancel -r test_rsc -a stop -i 1s' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:not running op_status:Cancelled" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Attempt to invoke non-existent rsc id
test = self.new_test("invoke_non_existent_rsc", "Attempt to perform operations on a non-existent rsc id.")
test.add_cmd_expected_fail(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:error op_status:complete" ')
test.add_cmd_expected_fail(args='-c exec -r test_rsc -a stop ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:complete" ')
test.add_cmd_expected_fail(args='-c exec -r test_rsc -a monitor -i 6s ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ')
test.add_cmd_expected_fail(args='-c cancel -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Register and start a resource that doesn't exist, systemd
if "systemd" in self._rsc_classes:
test = self.new_test("start_uninstalled_systemd", "Register uninstalled systemd agent, try to start, verify expected failure")
test.add_cmd(args='-c register_rsc -r test_rsc -C systemd -T this_is_fake1234 ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
if "upstart" in self._rsc_classes:
test = self.new_test("start_uninstalled_upstart", "Register uninstalled upstart agent, try to start, verify expected failure")
test.add_cmd(args='-c register_rsc -r test_rsc -C upstart -T this_is_fake1234 ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Register and start a resource that doesn't exist, ocf
test = self.new_test("start_uninstalled_ocf", "Register uninstalled ocf agent, try to start, verify expected failure.")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T this_is_fake1234 ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Register ocf with non-existent provider
test = self.new_test("start_ocf_bad_provider", "Register ocf agent with a non-existent provider, verify expected failure.")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pancakes -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:not installed op_status:Not installed" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Register ocf with empty provider field
test = self.new_test("start_ocf_no_provider", "Register ocf agent with a no provider, verify expected failure.")
test.add_cmd_expected_fail(args='-c register_rsc -r test_rsc -C ocf -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd_expected_fail(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Error" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
def build_stress_tests(self):
"""Register stress tests."""
timeout = "-t 20000"
iterations = 25
test = self.new_test("ocf_stress", "Verify OCF agent handling works under load")
for i in range(iterations):
test.add_cmd(args='-c register_rsc -r rsc_%s %s -C ocf -P heartbeat -T Dummy -l "NEW_EVENT event_type:register rsc_id:rsc_%s action:none rc:ok op_status:complete"' % (i, timeout, i))
test.add_cmd(args='-c exec -r rsc_%s -a start %s -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_%s action:start rc:ok op_status:complete"' % (i, timeout, i))
test.add_cmd(args='-c exec -r rsc_%s -a monitor %s -i 1s '
'-l "NEW_EVENT event_type:exec_complete rsc_id:rsc_%s action:monitor rc:ok op_status:complete"' % (i, timeout, i))
for i in range(iterations):
test.add_cmd(args='-c exec -r rsc_%s -a stop %s -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_%s action:stop rc:ok op_status:complete"' % (i, timeout, i))
test.add_cmd(args='-c unregister_rsc -r rsc_%s %s -l "NEW_EVENT event_type:unregister rsc_id:rsc_%s action:none rc:ok op_status:complete"' % (i, timeout, i))
if "systemd" in self._rsc_classes:
test = self.new_test("systemd_stress", "Verify systemd dbus connection works under load")
for i in range(iterations):
test.add_cmd(args='-c register_rsc -r rsc_%s %s -C systemd -T pacemaker-cts-dummyd@3 -l "NEW_EVENT event_type:register rsc_id:rsc_%s action:none rc:ok op_status:complete"' % (i, timeout, i))
test.add_cmd(args='-c exec -r rsc_%s -a start %s -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_%s action:start rc:ok op_status:complete"' % (i, timeout, i))
test.add_cmd(args='-c exec -r rsc_%s -a monitor %s -i 1s '
'-l "NEW_EVENT event_type:exec_complete rsc_id:rsc_%s action:monitor rc:ok op_status:complete"' % (i, timeout, i))
for i in range(iterations):
test.add_cmd(args='-c exec -r rsc_%s -a stop %s -l "NEW_EVENT event_type:exec_complete rsc_id:rsc_%s action:stop rc:ok op_status:complete"' % (i, timeout, i))
test.add_cmd(args='-c unregister_rsc -r rsc_%s %s -l "NEW_EVENT event_type:unregister rsc_id:rsc_%s action:none rc:ok op_status:complete"' % (i, timeout, i))
iterations = 9
timeout = "-t 30000"
# Verify recurring op in-flight collision is handled in series properly
test = self.new_test("rsc_inflight_collision", "Verify recurring ops do not collide with other operations for the same rsc.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd(args='-c exec -r test_rsc -a start %s -k op_sleep -v 1 -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete"' % timeout)
for i in range(iterations):
test.add_cmd(args='-c exec -r test_rsc -a monitor %s -i 100%dms -k op_sleep -v 2 '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete"' % (timeout, i))
test.add_cmd(args='-c exec -r test_rsc -a stop %s -l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:complete"' % timeout)
test.add_cmd(args='-c unregister_rsc -r test_rsc %s -l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete"' % timeout)
def build_custom_tests(self):
"""Register tests that target specific cases."""
# verify resource temporary folder is created and used by OCF agents
test = self.new_test("rsc_tmp_dir", "Verify creation and use of rsc temporary state directory")
test.add_sys_cmd("ls", "-al %s" % BuildOptions.RSC_TMP_DIR)
test.add_cmd(args='-c register_rsc -r test_rsc -P heartbeat -C ocf -T Dummy '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd(args='-c exec -r test_rsc -a start -t 4000')
test.add_sys_cmd("ls", "-al %s" % BuildOptions.RSC_TMP_DIR)
test.add_sys_cmd("ls", "%s/Dummy-test_rsc.state" % BuildOptions.RSC_TMP_DIR)
test.add_cmd(args='-c exec -r test_rsc -a stop -t 4000')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# start delay then stop test
test = self.new_test("start_delay", "Verify start delay works as expected.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd(args='-c exec -r test_rsc -s 6000 -a start -w -t 6000')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" -t 2000',
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" -t 6000')
test.add_cmd(args='-c exec -r test_rsc -a stop ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:stop rc:ok op_status:complete" ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# start delay, but cancel before it gets a chance to start
test = self.new_test("start_delay_cancel", "Using start_delay, start a rsc, but cancel the start op before execution.")
test.add_cmd(args='-c register_rsc -r test_rsc -P pacemaker -C ocf -T Dummy '
'-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ' + self._action_timeout)
test.add_cmd(args='-c exec -r test_rsc -s 5000 -a start -w -t 4000')
test.add_cmd(args='-c cancel -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:Cancelled" ')
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" -t 5000',
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# Register a bunch of resources, verify we can get info on them
test = self.new_test("verify_get_rsc_info", "Register multiple resources, verify retrieval of rsc info.")
if "systemd" in self._rsc_classes:
test.add_cmd(args='-c register_rsc -r rsc1 -C systemd -T pacemaker-cts-dummyd@3 ' + self._action_timeout)
test.add_cmd(args='-c get_rsc_info -r rsc1 ')
test.add_cmd(args='-c unregister_rsc -r rsc1 ' + self._action_timeout)
test.add_cmd_expected_fail(args='-c get_rsc_info -r rsc1 ')
if "upstart" in self._rsc_classes:
test.add_cmd(args='-c register_rsc -r rsc1 -C upstart -T pacemaker-cts-dummyd ' + self._action_timeout)
test.add_cmd(args='-c get_rsc_info -r rsc1 ')
test.add_cmd(args='-c unregister_rsc -r rsc1 ' + self._action_timeout)
test.add_cmd_expected_fail(args='-c get_rsc_info -r rsc1 ')
test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker ' + self._action_timeout)
test.add_cmd(args='-c get_rsc_info -r rsc2 ')
test.add_cmd(args='-c unregister_rsc -r rsc2 ' + self._action_timeout)
test.add_cmd_expected_fail(args='-c get_rsc_info -r rsc2 ')
# Register duplicate, verify only one entry exists and can still be removed
test = self.new_test("duplicate_registration", "Register resource multiple times, verify only one entry exists and can be removed.")
test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker ' + self._action_timeout)
test.add_cmd_check_stdout(args="-c get_rsc_info -r rsc2 ",
match="id:rsc2 class:ocf provider:pacemaker type:Dummy")
test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Dummy -P pacemaker ' + self._action_timeout)
test.add_cmd_check_stdout(args="-c get_rsc_info -r rsc2 ",
match="id:rsc2 class:ocf provider:pacemaker type:Dummy")
test.add_cmd(args='-c register_rsc -r rsc2 -C ocf -T Stateful -P pacemaker ' + self._action_timeout)
test.add_cmd_check_stdout(args="-c get_rsc_info -r rsc2 ",
match="id:rsc2 class:ocf provider:pacemaker type:Stateful")
test.add_cmd(args='-c unregister_rsc -r rsc2 ' + self._action_timeout)
test.add_cmd_expected_fail(args='-c get_rsc_info -r rsc2 ')
# verify the option to only send notification to the original client
test = self.new_test("notify_orig_client_only", "Verify option to only send notifications to the client originating the action.")
test.add_cmd(args='-c register_rsc -r test_rsc -C ocf -P pacemaker -T Dummy ' + self._action_timeout
+ '-l "NEW_EVENT event_type:register rsc_id:test_rsc action:none rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a start ' + self._action_timeout
+ '-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:start rc:ok op_status:complete" ')
test.add_cmd(args='-c exec -r test_rsc -a monitor -i 1s ' + self._action_timeout + ' -n '
'-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete"')
# this will fail because the monitor notifications should only go to the original caller, which no longer exists.
test.add_cmd_expected_fail(args='-l "NEW_EVENT event_type:exec_complete rsc_id:test_rsc action:monitor rc:ok op_status:complete" ' + self._action_timeout,
- exitcode=ExitStatus.TIMEOUT)
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_cmd(args='-c cancel -r test_rsc -a monitor -i 1s -t 6000 ')
test.add_cmd(args='-c unregister_rsc -r test_rsc ' + self._action_timeout
+ '-l "NEW_EVENT event_type:unregister rsc_id:test_rsc action:none rc:ok op_status:complete" ')
# get metadata
test = self.new_test("get_ocf_metadata", "Retrieve metadata for a resource")
test.add_cmd_check_stdout(args="-c metadata -C ocf -P pacemaker -T Dummy",
match="resource-agent name=\"Dummy\"")
test.add_cmd(args="-c metadata -C ocf -P pacemaker -T Stateful")
test.add_cmd_expected_fail(args="-c metadata -P pacemaker -T Stateful")
test.add_cmd_expected_fail(args="-c metadata -C ocf -P pacemaker -T fake_agent")
# get metadata
test = self.new_test("get_lsb_metadata", "Retrieve metadata for a resource")
test.add_cmd_check_stdout(args="-c metadata -C lsb -T LSBDummy",
match="resource-agent name='LSBDummy'")
# get stonith metadata
test = self.new_test("get_stonith_metadata", "Retrieve stonith metadata for a resource")
test.add_cmd_check_stdout(args="-c metadata -C stonith -P pacemaker -T fence_dummy",
match="resource-agent name=\"fence_dummy\"")
# get metadata
if "systemd" in self._rsc_classes:
test = self.new_test("get_systemd_metadata", "Retrieve metadata for a resource")
test.add_cmd_check_stdout(args="-c metadata -C systemd -T pacemaker-cts-dummyd@",
match="resource-agent name=\"pacemaker-cts-dummyd@\"")
# get metadata
if "upstart" in self._rsc_classes:
test = self.new_test("get_upstart_metadata", "Retrieve metadata for a resource")
test.add_cmd_check_stdout(args="-c metadata -C upstart -T pacemaker-cts-dummyd",
match="resource-agent name=\"pacemaker-cts-dummyd\"")
# get ocf providers
test = self.new_test("list_ocf_providers",
"Retrieve list of available resource providers, verifies pacemaker is a provider.")
test.add_cmd_check_stdout(args="-c list_ocf_providers ", match="pacemaker")
test.add_cmd_check_stdout(args="-c list_ocf_providers -T ping", match="pacemaker")
# Verify agents only exist in their lists
test = self.new_test("verify_agent_lists", "Verify the agent lists contain the right data.")
test.add_cmd_check_stdout(args="-c list_agents ", match="Stateful") # ocf
test.add_cmd_check_stdout(args="-c list_agents -C ocf", match="Stateful")
test.add_cmd_check_stdout(args="-c list_agents -C lsb", match="", stdout_no_match="Stateful") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C service", match="", stdout_no_match="Stateful") # should not exist
test.add_cmd_check_stdout(args="-c list_agents ", match="LSBDummy") # init.d
test.add_cmd_check_stdout(args="-c list_agents -C lsb", match="LSBDummy")
test.add_cmd_check_stdout(args="-c list_agents -C service", match="LSBDummy")
test.add_cmd_check_stdout(args="-c list_agents -C ocf", match="", stdout_no_match="pacemaker-cts-dummyd@") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C ocf", match="", stdout_no_match="pacemaker-cts-dummyd@") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C lsb", match="", stdout_no_match="fence_dummy") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C service", match="", stdout_no_match="fence_dummy") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C ocf", match="", stdout_no_match="fence_dummy") # should not exist
if "systemd" in self._rsc_classes:
test.add_cmd_check_stdout(args="-c list_agents ", match="pacemaker-cts-dummyd@") # systemd
test.add_cmd_check_stdout(args="-c list_agents -C service", match="LSBDummy")
test.add_cmd_check_stdout(args="-c list_agents -C systemd", match="", stdout_no_match="Stateful") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C systemd", match="pacemaker-cts-dummyd@")
test.add_cmd_check_stdout(args="-c list_agents -C systemd", match="", stdout_no_match="fence_dummy") # should not exist
if "upstart" in self._rsc_classes:
test.add_cmd_check_stdout(args="-c list_agents ", match="pacemaker-cts-dummyd") # upstart
test.add_cmd_check_stdout(args="-c list_agents -C service", match="LSBDummy")
test.add_cmd_check_stdout(args="-c list_agents -C upstart", match="", stdout_no_match="Stateful") # should not exist
test.add_cmd_check_stdout(args="-c list_agents -C upstart", match="pacemaker-cts-dummyd")
test.add_cmd_check_stdout(args="-c list_agents -C upstart", match="", stdout_no_match="fence_dummy") # should not exist
if "stonith" in self._rsc_classes:
test.add_cmd_check_stdout(args="-c list_agents -C stonith", match="fence_dummy") # stonith
test.add_cmd_check_stdout(args="-c list_agents -C stonith", match="", # should not exist
stdout_no_match="pacemaker-cts-dummyd@")
test.add_cmd_check_stdout(args="-c list_agents -C stonith", match="", stdout_no_match="Stateful") # should not exist
test.add_cmd_check_stdout(args="-c list_agents ", match="fence_dummy")
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-execd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
"\t " + sys.argv[0] + " --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
"\t " + sys.argv[0] + " --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-t", "--timeout", type=float, default=2,
help="Up to how many seconds each test case waits for the daemon to "
"be initialized. Defaults to 2. The value 0 means no limit.")
parser.add_argument("-w", "--force-wait", action="store_true",
help="Each test case waits the default/specified --timeout for the "
"daemon without tracking the log")
if BuildOptions.REMOTE_ENABLED:
parser.add_argument("-R", "--pacemaker-remote", action="store_true",
help="Test pacemaker-remoted binary instead of pacemaker-execd")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run pacemaker-execd regression tests as specified by arguments."""
update_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
if opts.pacemaker_remote:
daemon_name = "pacemaker-remoted"
else:
daemon_name = "pacemaker-execd"
exit_if_proc_running(daemon_name)
# Create a temporary directory for log files (the directory will
# automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-exec-") as logdir:
tests = ExecTests(verbose=opts.verbose, tls=opts.pacemaker_remote,
timeout=opts.timeout, force_wait=opts.force_wait,
logdir=logdir)
tests.build_generic_tests()
tests.build_multi_rsc_tests()
tests.build_negative_tests()
tests.build_custom_tests()
tests.build_stress_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
tests.setup_environment()
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment()
tests.exit()
if __name__ == "__main__":
main()
diff --git a/cts/cts-fencing.in b/cts/cts-fencing.in
index 77f4de7000..deca939d06 100644
--- a/cts/cts-fencing.in
+++ b/cts/cts-fencing.in
@@ -1,1120 +1,1119 @@
#!@PYTHON@
""" Regression tests for Pacemaker's fencer
"""
__copyright__ = "Copyright 2012-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import sys
import subprocess
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync, localname
from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
TEST_DIR = sys.path[0]
def update_path():
""" Set the PATH environment variable appropriately for the tests """
new_path = os.environ['PATH']
if os.path.exists("%s/cts-fencing.in" % TEST_DIR):
print("Running tests from the source tree: %s (%s)" % (BuildOptions._BUILD_DIR, TEST_DIR))
# For pacemaker-fenced and cts-fence-helper
new_path = "%s/daemons/fenced:%s" % (BuildOptions._BUILD_DIR, new_path)
new_path = "%s/tools:%s" % (BuildOptions._BUILD_DIR, new_path) # For stonith_admin
new_path = "%s/cts/support:%s" % (BuildOptions._BUILD_DIR, new_path) # For cts-support
else:
print("Running tests from the install tree: %s (not %s)" % (BuildOptions.DAEMON_DIR, TEST_DIR))
# For pacemaker-fenced, cts-fence-helper, and cts-support
new_path = "%s:%s" % (BuildOptions.DAEMON_DIR, new_path)
print('Using PATH="%s"' % new_path)
os.environ['PATH'] = new_path
class FenceTest(Test):
""" Executor for a single test """
def __init__(self, name, description, **kwargs):
Test.__init__(self, name, description, **kwargs)
if kwargs.get("with_cpg", False):
self._enable_corosync = True
self._daemon_options = ["-c"]
else:
self._enable_corosync = False
self._daemon_options = ["-s"]
self._daemon_location = "pacemaker-fenced"
def _kill_daemons(self):
killall(["pacemakerd", "pacemaker-fenced"])
def _start_daemons(self):
if self.verbose:
self._daemon_options += ["-V"]
print("Starting %s with %s" % (self._daemon_location, self._daemon_options))
cmd = ["pacemaker-fenced", "-l", self.logpath] + self._daemon_options
self._daemon_process = subprocess.Popen(cmd)
class FenceTests(Tests):
""" Collection of all fencing regression tests """
def __init__(self, **kwargs):
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-fencing")
def new_test(self, name, description, with_cpg=False):
""" Create a named test """
test = FenceTest(name, description, verbose=self.verbose, with_cpg=with_cpg,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def run_cpg_only(self):
""" Run all corosync-enabled tests """
for test in self._tests:
if test._enable_corosync:
test.run()
def run_no_cpg(self):
""" Run all standalone tests """
for test in self._tests:
if not test._enable_corosync:
test.run()
def build_api_sanity_tests(self):
""" Register tests to verify basic API usage """
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("standalone_low_level_api_test", "Sanity test client api in standalone mode.")
test.add_cmd("cts-fence-helper", args="-t %s" % verbose_arg, validate=False)
test = self.new_test("cpg_low_level_api_test", "Sanity test client api using mainloop and cpg.", True)
test.add_cmd("cts-fence-helper", args="-m %s" % verbose_arg, validate=False)
def build_custom_timeout_tests(self):
""" Register tests to verify custom timeout usage """
# custom timeout without topology
test = self.new_test("cpg_custom_timeout_1",
"Verify per device timeouts work as expected without using topology.", True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4 = 10
test.add_log_pattern("Total timeout set to 12s")
# custom timeout _WITH_ topology
test = self.new_test("cpg_custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology.", True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1000ms')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4000s')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4000 = 4006
test.add_log_pattern("Total timeout set to 4807s")
def build_fence_merge_tests(self):
""" Register tests to verify when fence operations should be merged """
### Simple test that overlapping fencing operations get merged
test = self.new_test("cpg_custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used.", True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### one merger will happen
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
### Test that multiple mergers occur
test = self.new_test("cpg_custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged", True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o delay=2 -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
### Test that multiple mergers occur with topologies used
test = self.new_test("cpg_custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels.",
True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
def build_fence_no_merge_tests(self):
""" Register tests to verify when fence operations should not be merged """
test = self.new_test("cpg_custom_no_merge",
"Verify differing fencing operations are not merged", True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node2 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client",
negative=True)
def build_standalone_tests(self):
""" Register a grab bag of tests that can be executed in standalone or corosync mode """
test_types = [
{
"prefix" : "standalone",
"use_cpg" : False,
},
{
"prefix" : "cpg",
"use_cpg" : True,
},
]
# test what happens when all devices timeout
for test_type in test_types:
test = self.new_test("%s_fence_multi_device_failure" % test_type["prefix"],
"Verify that all devices timeout, a fencing failure is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
if test_type["use_cpg"]:
- test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 2", exitcode=ExitStatus.TIMEOUT)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 2",
+ expected_exitcode=ExitStatus.TIMEOUT)
test.add_log_pattern("Total timeout set to 7s")
else:
- test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 2", exitcode=ExitStatus.ERROR)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 2",
+ expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("targeting node3 using false1 returned ")
test.add_log_pattern("targeting node3 using false2 returned ")
test.add_log_pattern("targeting node3 using false3 returned ")
# test what happens when multiple devices can fence a node, but the first device fails.
for test_type in test_types:
test = self.new_test("%s_fence_device_failure_rollover" % test_type["prefix"],
"Verify that when one fence device fails for a node, the others are tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
if test_type["use_cpg"]:
test.add_log_pattern("Total timeout set to 18s")
# test what happens when we try to use a missing fence-agent.
for test_type in test_types:
test = self.new_test("%s_fence_missing_agent" % test_type["prefix"],
"Verify proper error-handling when using a non-existent fence-agent.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_missing -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node2")
- test.add_cmd_expected_fail("stonith_admin",
- args="--output-as=xml -F node3 -t 5",
- exitcode=ExitStatus.NOSUCH)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 5",
+ expected_exitcode=ExitStatus.NOSUCH)
test.add_cmd("stonith_admin", args="--output-as=xml -F node2 -t 5")
# simple topology test for one device
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_simple" % test_type["prefix"],
"Verify all fencing devices at a level are used.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# add topology, delete topology, verify fencing still works
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_add_remove" % test_type["prefix"],
"Verify fencing occurrs after all topology levels are removed",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level has multiple devices.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_device_fails" % test_type["prefix"],
"Verify if one device in a level fails, the other is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R false -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 48s")
test.add_log_pattern("targeting node3 using false returned 1")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level fails.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_multi_level_fails" % test_type["prefix"],
"Verify if one level fails, the next leve is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 3")
test.add_log_pattern("Total timeout set to 21s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned 1")
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# test what happens when the first fencing level had devices that no one has registered
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_missing_devices" % test_type["prefix"],
"Verify topology can continue with missing devices.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# Test what happens if multiple fencing levels are defined, and then the first one is removed.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_level_removal" % test_type["prefix"],
"Verify level removal works.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit.
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 96s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned ",
negative=True)
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# Test targeting a topology level by node name pattern.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_level_pattern" % test_type["prefix"],
"Verify targeting topology by node name pattern works.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r '@node.*' -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("targeting node3 using true returned 0")
# test allowing commas and semicolons as delimiters in pcmk_host_list
for test_type in test_types:
test = self.new_test("%s_host_list_delimiters" % test_type["prefix"],
"Verify commas and semicolons can be used as pcmk_host_list delimiters",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1,node2,node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=pcmk1;pcmk2;pcmk3"')
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F node2 -t 5")
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F pcmk3 -t 5")
test.add_log_pattern("targeting node2 using true1 returned 0")
test.add_log_pattern("targeting pcmk3 using true2 returned 0")
# test the stonith builds the correct list of devices that can fence a node.
for test_type in test_types:
test = self.new_test("%s_list_devices" % test_type["prefix"],
"Verify list of devices that can fence a node is correct",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l node1 -V",
stdout_match="true2", stdout_no_match="true1")
test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l node1 -V",
stdout_match="true3", stdout_no_match="true1")
# simple test of device monitor
for test_type in test_types:
test = self.new_test("%s_monitor" % test_type["prefix"],
"Verify device is reachable", test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q false1")
- test.add_cmd_expected_fail("stonith_admin",
- args="--output-as=xml -Q true2",
- exitcode=ExitStatus.NOSUCH)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true2",
+ expected_exitcode=ExitStatus.NOSUCH)
# Verify monitor occurs for duration of timeout period on failure
for test_type in test_types:
test = self.new_test("%s_monitor_timeout" % test_type["prefix"],
"Verify monitor uses duration of timeout period given.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
- test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1 -t 5", exitcode=ExitStatus.ERROR)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1 -t 5",
+ expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempt 2 to execute")
# Verify monitor occurs for duration of timeout period on failure, but stops at max retries
for test_type in test_types:
test = self.new_test("%s_monitor_timeout_max_retries" % test_type["prefix"],
"Verify monitor retries until max retry value or timeout is hit.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
- test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1 -t 15", exitcode=ExitStatus.ERROR)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1 -t 15",
+ expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times")
# simple register test
for test_type in test_types:
test = self.new_test("%s_register" % test_type["prefix"],
"Verify devices can be registered and un-registered",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
- test.add_cmd_expected_fail("stonith_admin",
- args="--output-as=xml -Q true1",
- exitcode=ExitStatus.NOSUCH)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1",
+ expected_exitcode=ExitStatus.NOSUCH)
# simple reboot test
for test_type in test_types:
test = self.new_test("%s_reboot" % test_type["prefix"],
"Verify devices can be rebooted",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -B node3 -t 5")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
- test.add_cmd_expected_fail("stonith_admin",
- args="--output-as=xml -Q true1",
- exitcode=ExitStatus.NOSUCH)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1",
+ expected_exitcode=ExitStatus.NOSUCH)
# test fencing history.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_fence_history" % test_type["prefix"],
"Verify last fencing operation is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5 -V")
test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -H node3",
stdout_match='action="off" target="node3" .* status="success"')
# simple test of dynamic list query
for test_type in test_types:
test = self.new_test("%s_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l fake_port_1",
stdout_match='count="3"')
# fence using dynamic list query
for test_type in test_types:
test = self.new_test("%s_fence_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -F fake_port_1 -t 5 -V")
# simple test of query using status action
for test_type in test_types:
test = self.new_test("%s_status_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l fake_port_1",
stdout_match='count="3"')
# test what happens when no reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_no_reboot_support" % test_type["prefix"],
"Verify reboot action defaults to off when no reboot action is advertised by agent.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_no_reboot -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not support reboot")
test.add_log_pattern("using true1 returned 0")
# make sure reboot is used when reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_with_reboot_support" % test_type["prefix"],
"Verify reboot action can be used when metadata advertises it.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not advertise support for 'reboot', performing 'off'",
negative=True)
test.add_log_pattern("using true1 returned 0")
# make sure all fencing delays are applied correctly and taken into account by fencing timeouts with topology
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_delays" % test_type["prefix"],
"Verify all fencing delays are applied correctly and taken into account by fencing timeouts with topology.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
# Resulting "random" delay will always be 1 since (rand() % (delay_max - delay_base)) is always 0 here.
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1 -o pcmk_delay_max=2')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 --delay 1")
# Total fencing timeout takes all fencing delays into account.
test.add_log_pattern("Total timeout set to 582s")
# Fencing timeout for the first device takes the requested fencing delay into account.
# Fencing timeout also takes pcmk_delay_base into account.
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true1 .*146s.*",
regex=True)
# Requested fencing delay is applied only for the first device in the first level.
# Static delay from pcmk_delay_base is added.
test.add_log_pattern("Delaying 'off' action targeting node3 using true1 for 2s | timeout=120s requested_delay=1s base=1s max=1s")
# Fencing timeout no longer takes the requested fencing delay into account for further devices.
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using false1 .*145s.*",
regex=True)
# Requested fencing delay is no longer applied for further devices.
test.add_log_pattern("Delaying 'off' action targeting node3 using false1 for 1s | timeout=120s requested_delay=0s base=1s max=1s")
# Fencing timeout takes pcmk_delay_max into account.
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true2 .*146s.*",
regex=True)
test.add_log_pattern("Delaying 'off' action targeting node3 using true2 for 1s | timeout=120s requested_delay=0s base=1s max=2s")
test.add_log_pattern("Delaying 'off' action targeting node3 using true3",
negative=True)
def build_nodeid_tests(self):
""" Register tests that use a corosync node id """
our_uname = localname()
### verify nodeid is supplied when nodeid is in the metadata parameters
test = self.new_test("cpg_supply_nodeid",
"Verify nodeid is given when fence agent has nodeid as parameter", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -F %s -t 3" % our_uname)
test.add_log_pattern("as nodeid with fence action 'off' targeting %s" % (our_uname))
### verify nodeid is _NOT_ supplied when nodeid is not in the metadata parameters
test = self.new_test("cpg_do_not_supply_nodeid",
"Verify nodeid is _NOT_ given when fence agent does not have nodeid as parameter",
True)
# use a host name that won't be in corosync.conf
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=regr-test')
test.add_cmd("stonith_admin", args="--output-as=xml -F regr-test -t 3")
test.add_log_pattern("as nodeid with fence action 'off' targeting regr-test",
negative=True)
### verify nodeid use doesn't explode standalone mode
test = self.new_test("standalone_do_not_supply_nodeid",
"Verify nodeid in metadata parameter list doesn't kill standalone mode",
False)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -F %s -t 3" % our_uname)
test.add_log_pattern("as nodeid with fence action 'off' targeting %s" % our_uname,
negative=True)
def build_unfence_tests(self):
""" Register tests that verify unfencing """
our_uname = localname()
### verify unfencing using automatic unfencing
test = self.new_test("cpg_unfence_required_1",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
# both devices should be executed
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
### verify unfencing using automatic unfencing fails if any of the required agents fail
test = self.new_test("cpg_unfence_required_2",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=fail -o "pcmk_host_list=%s"' % our_uname)
- test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U %s -t 6" % our_uname, exitcode=ExitStatus.ERROR)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U %s -t 6" % our_uname,
+ expected_exitcode=ExitStatus.ERROR)
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_3",
"Verify require unfencing on all devices even when at different topology levels",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v true1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_4",
"Verify all required devices are executed even with topology levels fail.",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true3 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true4 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false4 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v true1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v false1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v false2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v false3" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true3" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 3 -v false4" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 4 -v true4" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
test.add_log_pattern("using true3 returned 0")
test.add_log_pattern("using true4 returned 0")
def build_unfence_on_target_tests(self):
""" Register tests that verify unfencing that runs on the target """
our_uname = localname()
### verify unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_1",
"Verify unfencing with on_target = true", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("(on) to be executed on target")
### verify failure of unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_2",
"Verify failure unfencing with on_target = true",
True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node_fake_1234"' % our_uname)
- test.add_cmd_expected_fail("stonith_admin",
- args="--output-as=xml -U node_fake_1234 -t 3",
- exitcode=ExitStatus.NOSUCH)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U node_fake_1234 -t 3",
+ expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
### verify unfencing using on_target device with topology
test = self.new_test("cpg_unfence_on_target_3",
"Verify unfencing with on_target = true using topology",
True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v true1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("(on) to be executed on target")
### verify unfencing using on_target device with topology fails when target node doesn't exist
test = self.new_test("cpg_unfence_on_target_4",
"Verify unfencing failure with on_target = true using topology",
True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node_fake"' % our_uname)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node_fake"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true2")
- test.add_cmd_expected_fail("stonith_admin",
- args="--output-as=xml -U node_fake -t 3",
- exitcode=ExitStatus.NOSUCH)
+ test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U node_fake -t 3",
+ expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
def build_remap_tests(self):
""" Register tests that verify remapping of reboots to off-on """
test = self.new_test("cpg_remap_simple",
"Verify sequential topology reboot is remapped to all-off-then-all-on", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=1 -o pcmk_reboot_timeout=10')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# fence_dummy sets "on" as an on_target action
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("cpg_remap_simple_off",
"Verify sequential topology reboot skips 'on' if "
"pcmk_reboot_action=off or agent doesn't support "
"'on'", True)
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o mode=pass "
"-o pcmk_host_list=node_fake -o pcmk_off_timeout=1 "
"-o pcmk_reboot_timeout=10 -o pcmk_reboot_action=off")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy_no_on "
"-o mode=pass -o pcmk_host_list=node_fake "
"-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20")
test.add_cmd("stonith_admin",
args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# "on" should be skipped
test.add_log_pattern("Not turning node_fake back on using "
"true1 because the device is configured "
"to stay off")
test.add_log_pattern("Not turning node_fake back on using true2"
" because the agent doesn't support 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("cpg_remap_automatic",
"Verify remapped topology reboot skips automatic 'on'", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence '
'-o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence '
'-o "mode=pass" -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'on' action targeting node_fake using",
negative=True)
test.add_log_pattern("'on' failure",
negative=True)
test = self.new_test("cpg_remap_complex_1",
"Verify remapped topology reboot in second level works if non-remapped first level fails",
True)
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("cpg_remap_complex_2",
"Verify remapped topology reboot failure in second level proceeds to third level",
True)
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v false2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 3 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using false2")
test.add_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'reboot' action targeting node_fake using true2")
test.add_log_pattern("node_fake with true3",
negative=True)
def build_query_tests(self):
""" run stonith_admin --metadata for the fence_dummy agent and check command output """
test = self.new_test("get_metadata",
"Run stonith_admin --metadata for the fence_dummy agent", True)
test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -a fence_dummy --metadata",
stdout_match=' 0 or n_negative_matches > 0:
msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches."
self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches)
self.exitcode = ExitStatus.ERROR
def _new_cmd(self, cmd, args, exitcode, **kwargs):
"""
Add a command to be executed as part of this test.
Arguments:
cmd -- The program to run.
args -- Commands line arguments to pass to cmd, as a string.
exitcode -- The expected exit code of cmd. This can be used to
run a command that is expected to fail.
Keyword arguments:
stdout_match -- If not None, a string that is expected to be
present in the stdout of cmd. This can be a
regular expression.
no_wait -- Do not wait for cmd to complete.
stdout_no_match -- If not None, a string that is expected to be
missing in the stdout of cmd. This can be a
regualr expression.
kill -- A command to be run after cmd, typically in
order to kill a failed process. This should be
the entire command line including arguments as
a single string.
validate -- If True, the output of cmd will be passed to
xmllint for validation. If validation fails,
XmlValidationError will be raised.
check_rng -- If True and validate is True, command output
will additionally be checked against the
api-result.rng file.
check_stderr -- If True, the stderr of cmd will be included in
output.
env -- If not None, variables to set in the environment
"""
if cmd is None:
raise ValueError("cmd cannot be None")
self._cmds.append(
{
"args": args,
"check_rng": kwargs.get("check_rng", True),
"check_stderr": kwargs.get("check_stderr", True),
"cmd": cmd,
"expected_exitcode": exitcode,
"kill": kwargs.get("kill"),
"no_wait": kwargs.get("no_wait", False),
"stdout_match": kwargs.get("stdout_match"),
"stdout_no_match": kwargs.get("stdout_no_match"),
"validate": kwargs.get("validate", True),
"env": kwargs.get("env"),
}
)
def _start_daemons(self):
"""Start any necessary daemons in preparation for executing the test."""
raise NotImplementedError("_start_daemons not provided by subclass")
#
# PUBLIC METHODS
#
def add_cmd(self, cmd=None, **kwargs):
"""Add a simple command to be executed as part of this test."""
self._new_cmd(cmd, kwargs.pop("args", ""), ExitStatus.OK,
validate=kwargs.get("validate", True),
check_rng=kwargs.get("check_rng", True),
check_stderr=kwargs.get("check_stderr", True),
env=kwargs.get("env"))
def add_cmd_and_kill(self, cmd=None, **kwargs):
"""Add a command and system command to be executed as part of this test."""
self._new_cmd(cmd, kwargs.pop("args", ""), ExitStatus.OK,
kill=kwargs.get("kill"))
def add_cmd_check_stdout(self, cmd=None, **kwargs):
"""Add a simple command with expected output to be executed as part of this test."""
self._new_cmd(cmd, kwargs.pop("args", ""), ExitStatus.OK,
stdout_match=kwargs.get("match"),
stdout_no_match=kwargs.get("stdout_no_match"),
env=kwargs.get("env"))
def add_cmd_expected_fail(self, cmd=None, **kwargs):
"""Add a command that is expected to fail to be executed as part of this test."""
- self._new_cmd(cmd, kwargs.pop("args", ""), kwargs.get("exitcode", ExitStatus.ERROR))
+ self._new_cmd(cmd, kwargs.pop("args", ""), kwargs.get("expected_exitcode", ExitStatus.ERROR))
def add_cmd_no_wait(self, cmd=None, **kwargs):
"""Add a simple command to be executed (without waiting) as part of this test."""
self._new_cmd(cmd, kwargs.pop("args", ""), ExitStatus.OK,
no_wait=kwargs.get("no_wait", True))
def add_log_pattern(self, pattern, negative=False, regex=False):
"""Add a pattern that should appear in the test's logs."""
self._patterns.append(Pattern(pattern, negative=negative, regex=regex))
def _signal_dict(self):
"""Return a dictionary mapping signal numbers to their names."""
# FIXME: When we support python >= 3.5, this function can be replaced with:
# signal.Signals(self.daemon_process.returncode).name
return {
getattr(signal, _signame): _signame
for _signame in dir(signal)
if _signame.startswith("SIG") and not _signame.startswith("SIG_")
}
def clean_environment(self):
"""Clean up the host after executing a test."""
if self._daemon_process:
if self._daemon_process.poll() is None:
self._daemon_process.terminate()
self._daemon_process.wait()
else:
rc = self._daemon_process.returncode
signame = self._signal_dict().get(-rc, "RET=%s" % rc)
msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)."
self._result_txt = msg % (self.name, self._daemon_location, signame)
self.exitcode = ExitStatus.ERROR
self._daemon_process = None
self._daemon_output = ""
# the default for utf-8 encoding would error out if e.g. memory corruption
# makes fenced output any kind of 8 bit value - while still interesting
# for debugging and we'd still like the regression-test to go over the
# full set of test-cases
with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile:
for line in logfile.readlines():
self._daemon_output += line
if self.verbose:
print("Daemon Output Start")
print(self._daemon_output)
print("Daemon Output End")
def print_result(self, filler):
"""Print the result of the last test execution."""
print("%s%s" % (filler, self._result_txt))
def run(self):
"""Execute this test."""
i = 1
self.start_environment()
if self.verbose:
print("\n--- START TEST - %s" % self.name)
self._result_txt = "SUCCESS - '%s'" % (self.name)
self.exitcode = ExitStatus.OK
for cmd in self._cmds:
try:
self.run_cmd(cmd)
except ExitCodeError as e:
print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode']))
self.set_error(i, cmd)
break
except OutputNotFoundError as e:
print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e))
self.set_error(i, cmd)
break
except OutputFoundError as e:
print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_no_match'], e))
self.set_error(i, cmd)
break
except XmlValidationError as e:
print("Step %d FAILED - xmllint failed: %s" % (i, e))
self.set_error(i, cmd)
break
if self.verbose:
print("Step %d SUCCESS" % (i))
i += 1
self.clean_environment()
if self.exitcode == ExitStatus.OK:
self._match_log_patterns()
print(self._result_txt)
if self.verbose:
print("--- END TEST - %s\n" % self.name)
self.executed = True
def run_cmd(self, args):
"""Execute a command as part of this test."""
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print("\n\nRunning: %s" % " ".join(cmd))
# FIXME: Using "with" here breaks fencing merge tests.
# pylint: disable=consider-using-with
if args['env']:
new_env = os.environ.copy()
new_env.update(args['env'])
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=new_env)
else:
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print("Also running: %s" % args['kill'])
# Typically, the kill argument is used to detect some sort of
# failure. Without yielding for a few seconds here, the process
# launched earlier that is listening for the failure may not have
# time to connect to pacemaker-execd.
time.sleep(2)
subprocess.Popen(shlex.split(args['kill']))
if not args['no_wait']:
test.wait()
else:
return ExitStatus.OK
output = pipe_communicate(test, check_stderr=args['check_stderr'])
if self.verbose:
print(output)
if test.returncode != args['expected_exitcode']:
raise ExitCodeError(test.returncode)
if args['stdout_match'] is not None and \
re.search(args['stdout_match'], output) is None:
raise OutputNotFoundError(output)
if args['stdout_no_match'] is not None and \
re.search(args['stdout_no_match'], output) is not None:
raise OutputFoundError(output)
if args['validate']:
if args['check_rng']:
rng_file = "%s/api/api-result.rng" % rng_directory()
else:
rng_file = None
cmd = find_validator(rng_file)
if not cmd:
raise XmlValidationError("Could not find validator for %s" % rng_file)
if self.verbose:
print("\nRunning: %s" % " ".join(cmd))
with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator:
output = pipe_communicate(validator, check_stderr=True, stdin=output)
if self.verbose:
print(output)
if validator.returncode != 0:
raise XmlValidationError(output)
return ExitStatus.OK
def set_error(self, step, cmd):
"""Record failure of this test."""
msg = "FAILURE - '%s' failed at step %d. Command: %s %s"
self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args'])
self.exitcode = ExitStatus.ERROR
def start_environment(self):
"""Prepare the host for executing a test."""
if os.path.exists(self.logpath):
os.remove(self.logpath)
self._kill_daemons()
self._start_daemons()
logfile = None
init_time = time.time()
update_time = init_time
while True:
# FIXME: Eventually use 'with' here, which seems complicated given
# everything happens in a loop.
# pylint: disable=consider-using-with
time.sleep(0.1)
if not self.force_wait and logfile is None \
and os.path.exists(self.logpath):
logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1")
if not self.force_wait and logfile is not None:
for line in logfile.readlines():
if "successfully started" in line:
return
now = time.time()
if self.timeout > 0 and (now - init_time) >= self.timeout:
if not self.force_wait:
print("\tDaemon %s doesn't seem to have been initialized within %fs."
"\n\tConsider specifying a longer '--timeout' value."
% (self._daemon_location, self.timeout))
return
if self.verbose and (now - update_time) >= 5:
print("Waiting for %s to be initialized: %fs ..."
% (self._daemon_location, now - init_time))
update_time = now
class Tests:
"""The base class for a collection of regression tests."""
def __init__(self, **kwargs):
"""
Create a new Tests instance.
This method must be provided by all subclasses, which must call
Tests.__init__ first.
Keywork arguments:
force_wait --
logdir -- The base directory under which to create a directory
to store output and temporary data.
timeout -- How long to wait for the test to complete.
verbose -- Whether to print additional information, including
verbose command output and daemon log files.
"""
self.force_wait = kwargs.get("force_wait", False)
self.logdir = kwargs.get("logdir", "/tmp")
self.timeout = kwargs.get("timeout", 2)
self.verbose = kwargs.get("verbose", False)
self._tests = []
def exit(self):
"""Exit (with error status code if any test failed)."""
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
sys.exit(ExitStatus.ERROR)
sys.exit(ExitStatus.OK)
def print_list(self):
"""List all registered tests."""
print("\n==== %d TESTS FOUND ====" % len(self._tests))
print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION"))
print("%35s - %s" % ("--------------------", "--------------------"))
for test in self._tests:
print("%35s - %s" % (test.name, test.description))
print("==== END OF LIST ====\n")
def print_results(self):
"""Print summary of results of executed tests."""
failures = 0
success = 0
print("\n\n======= FINAL RESULTS ==========")
print("\n--- FAILURE RESULTS:")
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
failures += 1
test.print_result(" ")
else:
success += 1
if failures == 0:
print(" None")
print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures))
def run_single(self, name):
"""Run a single named test."""
for test in self._tests:
if test.name == name:
test.run()
break
def run_tests(self):
"""Run all tests."""
for test in self._tests:
test.run()
def run_tests_matching(self, pattern):
"""Run all tests whose name matches a pattern."""
for test in self._tests:
if test.name.count(pattern) != 0:
test.run()