diff --git a/cts/cts-attrd.in b/cts/cts-attrd.in
index fafb162e37..1805b61ad4 100644
--- a/cts/cts-attrd.in
+++ b/cts/cts-attrd.in
@@ -1,366 +1,366 @@
#!@PYTHON@
"""Regression tests for Pacemaker's attribute daemon."""
# pylint doesn't like the module name "cts-attrd" which is an invalid complaint for this file
# but probably something we want to continue warning about elsewhere
# pylint: disable=invalid-name
# pacemaker imports need to come after we modify sys.path, which pylint will complain about.
# pylint: disable=wrong-import-position
__copyright__ = "Copyright 2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import subprocess
import sys
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
# pylint: disable=comparison-of-constants,comparison-with-itself,condition-evals-to-constant
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
TEST_DIR = sys.path[0]
def update_path():
"""Set the PATH environment variable appropriately for the tests."""
new_path = os.environ['PATH']
if os.path.exists("%s/cts-attrd.in" % TEST_DIR):
# pylint: disable=protected-access
print("Running tests from the source tree: %s (%s)" % (BuildOptions._BUILD_DIR, TEST_DIR))
# For pacemaker-attrd
new_path = "%s/daemons/attrd:%s" % (BuildOptions._BUILD_DIR, new_path)
else:
print("Running tests from the install tree: %s (not %s)" % (BuildOptions.DAEMON_DIR, TEST_DIR))
# For pacemaker-attrd
new_path = "%s:%s" % (BuildOptions.DAEMON_DIR, new_path)
print('Using PATH="%s"' % new_path)
os.environ['PATH'] = new_path
class AttributeTest(Test):
"""Executor for a single test."""
def __init__(self, name, description, **kwargs):
"""
Create a new AttributeTest instance.
Arguments:
name -- A unique name for this test. This can be used on the
command line to specify that only a specific test should
be executed.
description -- A meaningful description for the test.
"""
Test.__init__(self, name, description, **kwargs)
self._daemon_location = "pacemaker-attrd"
self._enable_corosync = True
def _kill_daemons(self):
killall([self._daemon_location])
def _start_daemons(self):
if self.verbose:
print("Starting %s" % self._daemon_location)
cmd = [self._daemon_location, "-s", "-l", self.logpath]
# pylint: disable=consider-using-with
self._daemon_process = subprocess.Popen(cmd)
class AttributeTests(Tests):
"""Collection of all attribute regression tests."""
def __init__(self, **kwargs):
"""Create a new AttributeTests instance."""
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-attrd")
def new_test(self, name, description):
"""Create a named test."""
test = AttributeTest(name, description, verbose=self.verbose, logdir=self.logdir)
self._tests.append(test)
return test
def setup_environment(self, use_corosync):
"""Prepare the host before executing any tests."""
if use_corosync:
self._corosync.start(kill_first=True)
def cleanup_environment(self, use_corosync):
"""Clean up the host after executing desired tests."""
if use_corosync:
self._corosync.stop()
def build_basic_tests(self):
"""Add basic tests - setting, querying, updating, and deleting attributes."""
test = self.new_test("set_attr_1",
"Set and query an attribute")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
- stdout_match='name="AAA" value="111"')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
+ stdout_match='name="AAA" value="111"')
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
# Setting the delay on an attribute that doesn't exist fails, but the failure is
# not passed back to attrd_updater.
test = self.new_test("set_attr_2",
"Set an attribute's delay")
test.add_cmd("attrd_updater", args="--name AAA -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Processed update-delay request from client .*: Error \(Attribute AAA does not exist\)",
regex=True)
test = self.new_test("set_attr_3",
"Set and query an attribute's delay and value")
test.add_cmd("attrd_updater", args="--name AAA -B 111 -d 5 --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
- stdout_match='name="AAA" value="111"')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
+ stdout_match='name="AAA" value="111"')
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111 \| from .* with 5s write delay",
regex=True)
test = self.new_test("set_attr_4",
"Update an attribute that does not exist with a delay")
test.add_cmd("attrd_updater", args="--name BBB -U 999 -d 10 --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name BBB -Q --output-as=xml",
- stdout_match='name="BBB" value="999"')
+ test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
+ stdout_match='name="BBB" value="999"')
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 999 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_1",
"Update an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 333 --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name BBB -Q --output-as=xml",
- stdout_match='name="BBB" value="333"')
+ test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
+ stdout_match='name="BBB" value="333"')
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("update_attr_2",
"Update an attribute using a delay other than its default")
test.add_cmd("attrd_updater", args="--name BBB -U 777 -d 10 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -U 888 -d 7 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 777 -> 888 \| from .* with 10s write delay",
regex=True)
test = self.new_test("update_attr_delay_1",
"Update the delay of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -Y -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test = self.new_test("update_attr_delay_2",
"Update the delay and value of an attribute that already exists")
test.add_cmd("attrd_updater", args="--name BBB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="--name BBB -B 333 -d 5 --output-as=xml")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern("Update attribute BBB delay to 5000ms (5)")
test.add_log_pattern(r"Setting BBB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test = self.new_test("missing_attr_1",
"Query an attribute that does not exist")
test.add_cmd_expected_fail("attrd_updater", args="--name NOSUCH --output-as=xml",
expected_exitcode=ExitStatus.CONFIG)
test = self.new_test("delete_attr_1",
"Delete an existing attribute")
test.add_cmd("attrd_updater", args="--name CCC -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name CCC -D --output-as=xml")
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting CCC\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
test = self.new_test("missing_attr_2",
"Delete an attribute that does not exist")
test.add_cmd("attrd_updater", args="--name NOSUCH2 -D --output-as=xml")
test = self.new_test("attr_in_set_1",
"Set and query an attribute in a specific set")
test.add_cmd("attrd_updater", args="--name DDD -U 555 --set=foo --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name DDD -Q --output-as=xml",
- stdout_match='name="DDD" value="555"')
+ test.add_cmd("attrd_updater", args="--name DDD -Q --output-as=xml",
+ stdout_match='name="DDD" value="555"')
test.add_log_pattern("Processed 1 private change for DDD (set foo)")
def build_multiple_query_tests(self):
"""Add tests that set and query an attribute across multiple nodes."""
# NOTE: These tests make use of the fact that nothing in attrd actually
# cares about whether a node exists when you set or query an attribute.
# It just keeps creating new hash tables for each node you ask it about.
test = self.new_test("multi_query_1",
"Query an attribute set across multiple nodes")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --node cluster1 --output-as=xml")
test.add_cmd("attrd_updater", args="--name AAA -U 222 --node cluster2 --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -QA --output-as=xml",
- stdout_match=r'\n.*')
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --node=cluster1 --output-as=xml",
- stdout_match='')
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
- stdout_match='')
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -QA --output-as=xml",
- stdout_match=r'\n.*',
- env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
- stdout_match='',
- env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
- stdout_match='',
- env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
+ test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
+ stdout_match=r'\n.*')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster1 --output-as=xml",
+ stdout_match='')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
+ stdout_match='')
+ test.add_cmd("attrd_updater", args="--name AAA -QA --output-as=xml",
+ stdout_match=r'\n.*',
+ env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
+ test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
+ stdout_match='',
+ env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
+ test.add_cmd("attrd_updater", args="--name AAA -Q --node=cluster2 --output-as=xml",
+ stdout_match='',
+ env={"OCF_RESKEY_CRM_meta_on_node": "cluster1"})
def build_regex_tests(self):
"""Add tests that use regexes."""
test = self.new_test("regex_update_1",
"Update attributes using a regex")
test.add_cmd("attrd_updater", args="--name AAA -U 111 --output-as=xml")
test.add_cmd("attrd_updater", args="--name ABB -U 222 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'A.*' -U 333 --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
- stdout_match='name="AAA" value="333"')
- test.add_cmd_check_stdout("attrd_updater", args="--name ABB -Q --output-as=xml",
- stdout_match='name="ABB" value="333"')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
+ stdout_match='name="AAA" value="333"')
+ test.add_cmd("attrd_updater", args="--name ABB -Q --output-as=xml",
+ stdout_match='name="ABB" value="333"')
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: \(unset\) -> 111",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: \(unset\) -> 222",
regex=True)
test.add_log_pattern(r"Setting ABB\[.*\] in instance_attributes: 222 -> 333",
regex=True)
test.add_log_pattern(r"Setting AAA\[.*\] in instance_attributes: 111 -> 333",
regex=True)
test = self.new_test("regex_delete_1",
"Delete attributes using a regex")
test.add_cmd("attrd_updater", args="--name XAX -U 444 --output-as=xml")
test.add_cmd("attrd_updater", args="--name XBX -U 555 --output-as=xml")
test.add_cmd("attrd_updater", args="-P 'X[A|B]X' -D --output-as=xml")
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: \(unset\) -> 444",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: \(unset\) -> 555",
regex=True)
test.add_log_pattern(r"Setting XBX\[.*\] in instance_attributes: 555 -> \(unset\)",
regex=True)
test.add_log_pattern(r"Setting XAX\[.*\] in instance_attributes: 444 -> \(unset\)",
regex=True)
def build_utilization_tests(self):
"""Add tests that involve utilization attributes."""
test = self.new_test("utilization_1",
"Set and query a utilization attribute")
test.add_cmd("attrd_updater", args="--name AAA -U ABC -z --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
- stdout_match='name="AAA" value="ABC"')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
+ stdout_match='name="AAA" value="ABC"')
test.add_log_pattern(r"Setting AAA\[.*\] in utilization: \(unset\) -> ABC",
regex=True)
def build_sync_point_tests(self):
"""Add tests that involve sync points."""
test = self.new_test("local_sync_point",
"Wait for a local sync point")
test.add_cmd("attrd_updater", args="--name AAA -U 123 --wait=local --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name AAA -Q --output-as=xml",
- stdout_match='name="AAA" value="123"')
+ test.add_cmd("attrd_updater", args="--name AAA -Q --output-as=xml",
+ stdout_match='name="AAA" value="123"')
test.add_log_pattern(r"Alerting client .* for reached local sync point",
regex=True)
test = self.new_test("cluster_sync_point",
"Wait for a cluster-wide sync point")
test.add_cmd("attrd_updater", args="--name BBB -U 456 --wait=cluster --output-as=xml")
- test.add_cmd_check_stdout("attrd_updater", args="--name BBB -Q --output-as=xml",
- stdout_match='name="BBB" value="456"')
+ test.add_cmd("attrd_updater", args="--name BBB -Q --output-as=xml",
+ stdout_match='name="BBB" value="456"')
test.add_log_pattern(r"Alerting client .* for reached cluster sync point",
regex=True)
def build_options():
"""Handle command line arguments."""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Run pacemaker-attrd regression tests",
epilog="Example: Run only the test 'start_stop'\n"
"\t " + sys.argv[0] + " --run-only start_stop\n\n"
"Example: Run only the tests with the string 'systemd' present in them\n"
"\t " + sys.argv[0] + " --run-only-pattern systemd")
parser.add_argument("-l", "--list-tests", action="store_true",
help="Print out all registered tests")
parser.add_argument("-p", "--run-only-pattern", metavar='PATTERN',
help="Run only tests matching the given pattern")
parser.add_argument("-r", "--run-only", metavar='TEST',
help="Run a specific test")
parser.add_argument("-V", "--verbose", action="store_true",
help="Verbose output")
args = parser.parse_args()
return args
def main():
"""Run attrd regression tests as specified by arguments."""
update_path()
# Ensure all command output is in portable locale for comparison
os.environ['LC_ALL'] = "C"
opts = build_options()
exit_if_proc_running("pacemaker-attrd")
# Create a temporary directory for log files (the directory and its
# contents will automatically be erased when done)
with tempfile.TemporaryDirectory(prefix="cts-attrd-") as logdir:
tests = AttributeTests(verbose=opts.verbose, logdir=logdir)
tests.build_basic_tests()
tests.build_multiple_query_tests()
tests.build_regex_tests()
tests.build_utilization_tests()
tests.build_sync_point_tests()
if opts.list_tests:
tests.print_list()
sys.exit(ExitStatus.OK)
print("Starting ...")
try:
tests.setup_environment(True)
except TimeoutError:
print("corosync did not start in time, exiting")
sys.exit(ExitStatus.TIMEOUT)
if opts.run_only_pattern:
tests.run_tests_matching(opts.run_only_pattern)
tests.print_results()
elif opts.run_only:
tests.run_single(opts.run_only)
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(True)
tests.exit()
if __name__ == "__main__":
main()
diff --git a/cts/cts-fencing.in b/cts/cts-fencing.in
index deca939d06..c93180a46f 100644
--- a/cts/cts-fencing.in
+++ b/cts/cts-fencing.in
@@ -1,1119 +1,1119 @@
#!@PYTHON@
""" Regression tests for Pacemaker's fencer
"""
__copyright__ = "Copyright 2012-2023 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import sys
import subprocess
import tempfile
# These imports allow running from a source checkout after running `make`.
# Note that while this doesn't necessarily mean it will successfully run tests,
# but being able to see --help output can be useful.
if os.path.exists("@abs_top_srcdir@/python"):
sys.path.insert(0, "@abs_top_srcdir@/python")
if os.path.exists("@abs_top_builddir@/python") and "@abs_top_builddir@" != "@abs_top_srcdir@":
sys.path.insert(0, "@abs_top_builddir@/python")
from pacemaker.buildoptions import BuildOptions
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.corosync import Corosync, localname
from pacemaker._cts.errors import ExitCodeError, OutputFoundError, OutputNotFoundError, XmlValidationError
from pacemaker._cts.process import killall, exit_if_proc_running
from pacemaker._cts.test import Test, Tests
TEST_DIR = sys.path[0]
def update_path():
""" Set the PATH environment variable appropriately for the tests """
new_path = os.environ['PATH']
if os.path.exists("%s/cts-fencing.in" % TEST_DIR):
print("Running tests from the source tree: %s (%s)" % (BuildOptions._BUILD_DIR, TEST_DIR))
# For pacemaker-fenced and cts-fence-helper
new_path = "%s/daemons/fenced:%s" % (BuildOptions._BUILD_DIR, new_path)
new_path = "%s/tools:%s" % (BuildOptions._BUILD_DIR, new_path) # For stonith_admin
new_path = "%s/cts/support:%s" % (BuildOptions._BUILD_DIR, new_path) # For cts-support
else:
print("Running tests from the install tree: %s (not %s)" % (BuildOptions.DAEMON_DIR, TEST_DIR))
# For pacemaker-fenced, cts-fence-helper, and cts-support
new_path = "%s:%s" % (BuildOptions.DAEMON_DIR, new_path)
print('Using PATH="%s"' % new_path)
os.environ['PATH'] = new_path
class FenceTest(Test):
""" Executor for a single test """
def __init__(self, name, description, **kwargs):
Test.__init__(self, name, description, **kwargs)
if kwargs.get("with_cpg", False):
self._enable_corosync = True
self._daemon_options = ["-c"]
else:
self._enable_corosync = False
self._daemon_options = ["-s"]
self._daemon_location = "pacemaker-fenced"
def _kill_daemons(self):
killall(["pacemakerd", "pacemaker-fenced"])
def _start_daemons(self):
if self.verbose:
self._daemon_options += ["-V"]
print("Starting %s with %s" % (self._daemon_location, self._daemon_options))
cmd = ["pacemaker-fenced", "-l", self.logpath] + self._daemon_options
self._daemon_process = subprocess.Popen(cmd)
class FenceTests(Tests):
""" Collection of all fencing regression tests """
def __init__(self, **kwargs):
Tests.__init__(self, **kwargs)
self._corosync = Corosync(self.verbose, self.logdir, "cts-fencing")
def new_test(self, name, description, with_cpg=False):
""" Create a named test """
test = FenceTest(name, description, verbose=self.verbose, with_cpg=with_cpg,
timeout=self.timeout, force_wait=self.force_wait,
logdir=self.logdir)
self._tests.append(test)
return test
def run_cpg_only(self):
""" Run all corosync-enabled tests """
for test in self._tests:
if test._enable_corosync:
test.run()
def run_no_cpg(self):
""" Run all standalone tests """
for test in self._tests:
if not test._enable_corosync:
test.run()
def build_api_sanity_tests(self):
""" Register tests to verify basic API usage """
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("standalone_low_level_api_test", "Sanity test client api in standalone mode.")
test.add_cmd("cts-fence-helper", args="-t %s" % verbose_arg, validate=False)
test = self.new_test("cpg_low_level_api_test", "Sanity test client api using mainloop and cpg.", True)
test.add_cmd("cts-fence-helper", args="-m %s" % verbose_arg, validate=False)
def build_custom_timeout_tests(self):
""" Register tests to verify custom timeout usage """
# custom timeout without topology
test = self.new_test("cpg_custom_timeout_1",
"Verify per device timeouts work as expected without using topology.", True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4 = 10
test.add_log_pattern("Total timeout set to 12s")
# custom timeout _WITH_ topology
test = self.new_test("cpg_custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology.", True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 -o pcmk_off_timeout=1000ms')
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 -o pcmk_off_timeout=4000s')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# timeout is 5+1+4000 = 4006
test.add_log_pattern("Total timeout set to 4807s")
def build_fence_merge_tests(self):
""" Register tests to verify when fence operations should be merged """
### Simple test that overlapping fencing operations get merged
test = self.new_test("cpg_custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used.", True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### one merger will happen
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
### Test that multiple mergers occur
test = self.new_test("cpg_custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged", True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o delay=2 -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
### Test that multiple mergers occur with topologies used
test = self.new_test("cpg_custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels.",
True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
### 4 mergers should occur
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
test.add_log_pattern("Operation 'off' targeting node3 by ")
def build_fence_no_merge_tests(self):
""" Register tests to verify when fence operations should not be merged """
test = self.new_test("cpg_custom_no_merge",
"Verify differing fencing operations are not merged", True)
test.add_cmd("stonith_admin", args="--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node3 node2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", args="--output-as=xml -F node2 -t 10")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 10")
test.add_log_pattern("Merging fencing action 'off' targeting node3 originating from client",
negative=True)
def build_standalone_tests(self):
""" Register a grab bag of tests that can be executed in standalone or corosync mode """
test_types = [
{
"prefix" : "standalone",
"use_cpg" : False,
},
{
"prefix" : "cpg",
"use_cpg" : True,
},
]
# test what happens when all devices timeout
for test_type in test_types:
test = self.new_test("%s_fence_multi_device_failure" % test_type["prefix"],
"Verify that all devices timeout, a fencing failure is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
if test_type["use_cpg"]:
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 2",
expected_exitcode=ExitStatus.TIMEOUT)
test.add_log_pattern("Total timeout set to 7s")
else:
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 2",
expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("targeting node3 using false1 returned ")
test.add_log_pattern("targeting node3 using false2 returned ")
test.add_log_pattern("targeting node3 using false3 returned ")
# test what happens when multiple devices can fence a node, but the first device fails.
for test_type in test_types:
test = self.new_test("%s_fence_device_failure_rollover" % test_type["prefix"],
"Verify that when one fence device fails for a node, the others are tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
if test_type["use_cpg"]:
test.add_log_pattern("Total timeout set to 18s")
# test what happens when we try to use a missing fence-agent.
for test_type in test_types:
test = self.new_test("%s_fence_missing_agent" % test_type["prefix"],
"Verify proper error-handling when using a non-existent fence-agent.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_missing -o mode=pass -o pcmk_host_list=node3")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node2")
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -F node3 -t 5",
expected_exitcode=ExitStatus.NOSUCH)
test.add_cmd("stonith_admin", args="--output-as=xml -F node2 -t 5")
# simple topology test for one device
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_simple" % test_type["prefix"],
"Verify all fencing devices at a level are used.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# add topology, delete topology, verify fencing still works
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_add_remove" % test_type["prefix"],
"Verify fencing occurrs after all topology levels are removed",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 1")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("Total timeout set to 6s")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level has multiple devices.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_device_fails" % test_type["prefix"],
"Verify if one device in a level fails, the other is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R false -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 48s")
test.add_log_pattern("targeting node3 using false returned 1")
test.add_log_pattern("targeting node3 using true returned 0")
# test what happens when the first fencing level fails.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_multi_level_fails" % test_type["prefix"],
"Verify if one level fails, the next leve is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 3")
test.add_log_pattern("Total timeout set to 21s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned 1")
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# test what happens when the first fencing level had devices that no one has registered
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_missing_devices" % test_type["prefix"],
"Verify topology can continue with missing devices.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
# Test what happens if multiple fencing levels are defined, and then the first one is removed.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_level_removal" % test_type["prefix"],
"Verify level removal works.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true4 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit.
test.add_cmd("stonith_admin", args="--output-as=xml -d node3 -i 2")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 20")
test.add_log_pattern("Total timeout set to 96s")
test.add_log_pattern("targeting node3 using false1 returned 1")
test.add_log_pattern("targeting node3 using false2 returned ",
negative=True)
test.add_log_pattern("targeting node3 using true3 returned 0")
test.add_log_pattern("targeting node3 using true4 returned 0")
# Test targeting a topology level by node name pattern.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_level_pattern" % test_type["prefix"],
"Verify targeting topology by node name pattern works.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r '@node.*' -i 1 -v true")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5")
test.add_log_pattern("targeting node3 using true returned 0")
# test allowing commas and semicolons as delimiters in pcmk_host_list
for test_type in test_types:
test = self.new_test("%s_host_list_delimiters" % test_type["prefix"],
"Verify commas and semicolons can be used as pcmk_host_list delimiters",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1,node2,node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=pcmk1;pcmk2;pcmk3"')
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F node2 -t 5")
test.add_cmd("stonith_admin", args="stonith_admin --output-as=xml -F pcmk3 -t 5")
test.add_log_pattern("targeting node2 using true1 returned 0")
test.add_log_pattern("targeting pcmk3 using true2 returned 0")
# test the stonith builds the correct list of devices that can fence a node.
for test_type in test_types:
test = self.new_test("%s_list_devices" % test_type["prefix"],
"Verify list of devices that can fence a node is correct",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
- test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l node1 -V",
- stdout_match="true2", stdout_no_match="true1")
- test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l node1 -V",
- stdout_match="true3", stdout_no_match="true1")
+ test.add_cmd("stonith_admin", args="--output-as=xml -l node1 -V",
+ stdout_match="true2", stdout_no_match="true1")
+ test.add_cmd("stonith_admin", args="--output-as=xml -l node1 -V",
+ stdout_match="true3", stdout_no_match="true1")
# simple test of device monitor
for test_type in test_types:
test = self.new_test("%s_monitor" % test_type["prefix"],
"Verify device is reachable", test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -Q false1")
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true2",
expected_exitcode=ExitStatus.NOSUCH)
# Verify monitor occurs for duration of timeout period on failure
for test_type in test_types:
test = self.new_test("%s_monitor_timeout" % test_type["prefix"],
"Verify monitor uses duration of timeout period given.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1 -t 5",
expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempt 2 to execute")
# Verify monitor occurs for duration of timeout period on failure, but stops at max retries
for test_type in test_types:
test = self.new_test("%s_monitor_timeout_max_retries" % test_type["prefix"],
"Verify monitor retries until max retry value or timeout is hit.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=fail -o monitor_mode=fail -o pcmk_host_list=node3')
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1 -t 15",
expected_exitcode=ExitStatus.ERROR)
test.add_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times")
# simple register test
for test_type in test_types:
test = self.new_test("%s_register" % test_type["prefix"],
"Verify devices can be registered and un-registered",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -Q true1")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1",
expected_exitcode=ExitStatus.NOSUCH)
# simple reboot test
for test_type in test_types:
test = self.new_test("%s_reboot" % test_type["prefix"],
"Verify devices can be rebooted",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -B node3 -t 5")
test.add_cmd("stonith_admin", args="--output-as=xml -D true1")
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -Q true1",
expected_exitcode=ExitStatus.NOSUCH)
# test fencing history.
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_fence_history" % test_type["prefix"],
"Verify last fencing operation is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node3')
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 -t 5 -V")
- test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -H node3",
- stdout_match='action="off" target="node3" .* status="success"')
+ test.add_cmd("stonith_admin", args="--output-as=xml -H node3",
+ stdout_match='action="off" target="node3" .* status="success"')
# simple test of dynamic list query
for test_type in test_types:
test = self.new_test("%s_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
- test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l fake_port_1",
- stdout_match='count="3"')
+ test.add_cmd("stonith_admin", args="--output-as=xml -l fake_port_1",
+ stdout_match='count="3"')
# fence using dynamic list query
for test_type in test_types:
test = self.new_test("%s_fence_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args="--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", args="--output-as=xml -F fake_port_1 -t 5 -V")
# simple test of query using status action
for test_type in test_types:
test = self.new_test("%s_status_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_check=status')
- test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -l fake_port_1",
- stdout_match='count="3"')
+ test.add_cmd("stonith_admin", args="--output-as=xml -l fake_port_1",
+ stdout_match='count="3"')
# test what happens when no reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_no_reboot_support" % test_type["prefix"],
"Verify reboot action defaults to off when no reboot action is advertised by agent.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_no_reboot -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not support reboot")
test.add_log_pattern("using true1 returned 0")
# make sure reboot is used when reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_with_reboot_support" % test_type["prefix"],
"Verify reboot action can be used when metadata advertises it.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -B node1 -t 5 -V")
test.add_log_pattern("does not advertise support for 'reboot', performing 'off'",
negative=True)
test.add_log_pattern("using true1 returned 0")
# make sure all fencing delays are applied correctly and taken into account by fencing timeouts with topology
for test_type in test_types:
if not test_type["use_cpg"]:
continue
test = self.new_test("%s_topology_delays" % test_type["prefix"],
"Verify all fencing delays are applied correctly and taken into account by fencing timeouts with topology.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
test.add_cmd("stonith_admin",
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1')
# Resulting "random" delay will always be 1 since (rand() % (delay_max - delay_base)) is always 0 here.
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3" -o pcmk_delay_base=1 -o pcmk_delay_max=2')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -r node3 -i 2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -F node3 --delay 1")
# Total fencing timeout takes all fencing delays into account.
test.add_log_pattern("Total timeout set to 582s")
# Fencing timeout for the first device takes the requested fencing delay into account.
# Fencing timeout also takes pcmk_delay_base into account.
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true1 .*146s.*",
regex=True)
# Requested fencing delay is applied only for the first device in the first level.
# Static delay from pcmk_delay_base is added.
test.add_log_pattern("Delaying 'off' action targeting node3 using true1 for 2s | timeout=120s requested_delay=1s base=1s max=1s")
# Fencing timeout no longer takes the requested fencing delay into account for further devices.
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using false1 .*145s.*",
regex=True)
# Requested fencing delay is no longer applied for further devices.
test.add_log_pattern("Delaying 'off' action targeting node3 using false1 for 1s | timeout=120s requested_delay=0s base=1s max=1s")
# Fencing timeout takes pcmk_delay_max into account.
test.add_log_pattern(r"Requesting that .* perform 'off' action targeting node3 using true2 .*146s.*",
regex=True)
test.add_log_pattern("Delaying 'off' action targeting node3 using true2 for 1s | timeout=120s requested_delay=0s base=1s max=2s")
test.add_log_pattern("Delaying 'off' action targeting node3 using true3",
negative=True)
def build_nodeid_tests(self):
""" Register tests that use a corosync node id """
our_uname = localname()
### verify nodeid is supplied when nodeid is in the metadata parameters
test = self.new_test("cpg_supply_nodeid",
"Verify nodeid is given when fence agent has nodeid as parameter", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -F %s -t 3" % our_uname)
test.add_log_pattern("as nodeid with fence action 'off' targeting %s" % (our_uname))
### verify nodeid is _NOT_ supplied when nodeid is not in the metadata parameters
test = self.new_test("cpg_do_not_supply_nodeid",
"Verify nodeid is _NOT_ given when fence agent does not have nodeid as parameter",
True)
# use a host name that won't be in corosync.conf
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=regr-test')
test.add_cmd("stonith_admin", args="--output-as=xml -F regr-test -t 3")
test.add_log_pattern("as nodeid with fence action 'off' targeting regr-test",
negative=True)
### verify nodeid use doesn't explode standalone mode
test = self.new_test("standalone_do_not_supply_nodeid",
"Verify nodeid in metadata parameter list doesn't kill standalone mode",
False)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -F %s -t 3" % our_uname)
test.add_log_pattern("as nodeid with fence action 'off' targeting %s" % our_uname,
negative=True)
def build_unfence_tests(self):
""" Register tests that verify unfencing """
our_uname = localname()
### verify unfencing using automatic unfencing
test = self.new_test("cpg_unfence_required_1",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
# both devices should be executed
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
### verify unfencing using automatic unfencing fails if any of the required agents fail
test = self.new_test("cpg_unfence_required_2",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=fail -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U %s -t 6" % our_uname,
expected_exitcode=ExitStatus.ERROR)
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_3",
"Verify require unfencing on all devices even when at different topology levels",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v true1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_4",
"Verify all required devices are executed even with topology levels fail.",
True)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true3 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R true4 -a fence_dummy_auto_unfence -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false3 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd('stonith_admin',
args='--output-as=xml -R false4 -a fence_dummy -o mode=fail -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v true1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v false1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v false2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v false3" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true3" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 3 -v false4" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 4 -v true4" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("using true1 returned 0")
test.add_log_pattern("using true2 returned 0")
test.add_log_pattern("using true3 returned 0")
test.add_log_pattern("using true4 returned 0")
def build_unfence_on_target_tests(self):
""" Register tests that verify unfencing that runs on the target """
our_uname = localname()
### verify unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_1",
"Verify unfencing with on_target = true", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("(on) to be executed on target")
### verify failure of unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_2",
"Verify failure unfencing with on_target = true",
True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node_fake_1234"' % our_uname)
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U node_fake_1234 -t 3",
expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
### verify unfencing using on_target device with topology
test = self.new_test("cpg_unfence_on_target_3",
"Verify unfencing with on_target = true using topology",
True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node3"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 1 -v true1" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r %s -i 2 -v true2" % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -U %s -t 3" % our_uname)
test.add_log_pattern("(on) to be executed on target")
### verify unfencing using on_target device with topology fails when target node doesn't exist
test = self.new_test("cpg_unfence_on_target_4",
"Verify unfencing failure with on_target = true using topology",
True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node_fake"' % our_uname)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o "pcmk_host_list=%s node_fake"' % our_uname)
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true2")
test.add_cmd_expected_fail("stonith_admin", args="--output-as=xml -U node_fake -t 3",
expected_exitcode=ExitStatus.NOSUCH)
test.add_log_pattern("(on) to be executed on target")
def build_remap_tests(self):
""" Register tests that verify remapping of reboots to off-on """
test = self.new_test("cpg_remap_simple",
"Verify sequential topology reboot is remapped to all-off-then-all-on", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=1 -o pcmk_reboot_timeout=10')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake '
'-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# fence_dummy sets "on" as an on_target action
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("cpg_remap_simple_off",
"Verify sequential topology reboot skips 'on' if "
"pcmk_reboot_action=off or agent doesn't support "
"'on'", True)
test.add_cmd("stonith_admin",
args="--output-as=xml -R true1 -a fence_dummy -o mode=pass "
"-o pcmk_host_list=node_fake -o pcmk_off_timeout=1 "
"-o pcmk_reboot_timeout=10 -o pcmk_reboot_action=off")
test.add_cmd("stonith_admin",
args="--output-as=xml -R true2 -a fence_dummy_no_on "
"-o mode=pass -o pcmk_host_list=node_fake "
"-o pcmk_off_timeout=2 -o pcmk_reboot_timeout=20")
test.add_cmd("stonith_admin",
args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_log_pattern("Total timeout set to 3s for peer's fencing targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
# "on" should be skipped
test.add_log_pattern("Not turning node_fake back on using "
"true1 because the device is configured "
"to stay off")
test.add_log_pattern("Not turning node_fake back on using true2"
" because the agent doesn't support 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("cpg_remap_automatic",
"Verify remapped topology reboot skips automatic 'on'", True)
test.add_cmd("stonith_admin",
args='--output-as=xml -R true1 -a fence_dummy_auto_unfence '
'-o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin",
args='--output-as=xml -R true2 -a fence_dummy_auto_unfence '
'-o "mode=pass" -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'on' action targeting node_fake using",
negative=True)
test.add_log_pattern("'on' failure",
negative=True)
test = self.new_test("cpg_remap_complex_1",
"Verify remapped topology reboot in second level works if non-remapped first level fails",
True)
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using true2")
test.add_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'")
test.add_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test = self.new_test("cpg_remap_complex_2",
"Verify remapped topology reboot failure in second level proceeds to third level",
True)
test.add_cmd("stonith_admin", args='--output-as=xml -R false1 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R false2 -a fence_dummy -o mode=fail -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true1 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true2 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args='--output-as=xml -R true3 -a fence_dummy -o mode=pass -o pcmk_host_list=node_fake')
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 2 -v true1 -v false2 -v true3")
test.add_cmd("stonith_admin", args="--output-as=xml -r node_fake -i 3 -v true2")
test.add_cmd("stonith_admin", args="--output-as=xml -B node_fake -t 5")
test.add_log_pattern("perform 'reboot' action targeting node_fake using false1")
test.add_log_pattern("Remapping multiple-device reboot targeting node_fake")
test.add_log_pattern("perform 'off' action targeting node_fake using true1")
test.add_log_pattern("perform 'off' action targeting node_fake using false2")
test.add_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times")
test.add_log_pattern("Undoing remap of reboot targeting node_fake")
test.add_log_pattern("perform 'reboot' action targeting node_fake using true2")
test.add_log_pattern("node_fake with true3",
negative=True)
def build_query_tests(self):
""" run stonith_admin --metadata for the fence_dummy agent and check command output """
test = self.new_test("get_metadata",
"Run stonith_admin --metadata for the fence_dummy agent", True)
- test.add_cmd_check_stdout("stonith_admin", args="--output-as=xml -a fence_dummy --metadata",
- stdout_match=' 0 or n_negative_matches > 0:
msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches."
self._result_txt = msg % (self.name, n_failed_matches, len(self._patterns), n_negative_matches)
self.exitcode = ExitStatus.ERROR
def _new_cmd(self, cmd, args, exitcode, **kwargs):
"""
Add a command to be executed as part of this test.
Arguments:
cmd -- The program to run.
args -- Commands line arguments to pass to cmd, as a string.
exitcode -- The expected exit code of cmd. This can be used to
run a command that is expected to fail.
Keyword arguments:
stdout_match -- If not None, a string that is expected to be
present in the stdout of cmd. This can be a
regular expression.
no_wait -- Do not wait for cmd to complete.
stdout_no_match -- If not None, a string that is expected to be
missing in the stdout of cmd. This can be a
regualr expression.
kill -- A command to be run after cmd, typically in
order to kill a failed process. This should be
the entire command line including arguments as
a single string.
validate -- If True, the output of cmd will be passed to
xmllint for validation. If validation fails,
XmlValidationError will be raised.
check_rng -- If True and validate is True, command output
will additionally be checked against the
api-result.rng file.
check_stderr -- If True, the stderr of cmd will be included in
output.
env -- If not None, variables to set in the environment
"""
if cmd is None:
raise ValueError("cmd cannot be None")
self._cmds.append(
{
"args": args,
"check_rng": kwargs.get("check_rng", True),
"check_stderr": kwargs.get("check_stderr", True),
"cmd": cmd,
"expected_exitcode": exitcode,
"kill": kwargs.get("kill"),
"no_wait": kwargs.get("no_wait", False),
"stdout_match": kwargs.get("stdout_match"),
"stdout_no_match": kwargs.get("stdout_no_match"),
"validate": kwargs.get("validate", True),
"env": kwargs.get("env"),
}
)
def _start_daemons(self):
"""Start any necessary daemons in preparation for executing the test."""
raise NotImplementedError("_start_daemons not provided by subclass")
#
# PUBLIC METHODS
#
def add_cmd(self, cmd=None, **kwargs):
"""
Add a command to be executed as part of this test.
Arguments:
cmd -- The program to run.
Keyword arguments:
args -- Commands line arguments to pass to cmd, as a string.
check_rng -- If True and validate is True, command output will
additionally be checked against the api-result.rng file.
check_stderr -- If True, the stderr of cmd will be included in output.
env -- If not None, variables to set in the environment
expected_exitcode -- The expected exit code of cmd. This can be used to run
a command that is expected to fail.
kill -- A command to be run after cmd, typically in order to
kill a failed process. This should be the entire
command line including arguments as a single string.
no_wait -- Do not wait for cmd to complete.
stdout_match -- If not None, a string that is expected to be present
in the stdout of cmd. This can be a regular
expression.
stdout_no_match -- If not None, a string that is expected to be missing
in the stdout of cmd. This can be a regular
expression.
validate -- If True, the output of cmd will be passed to xmllint
for validation. If validation fails,
XmlValidationError will be raised.
"""
if cmd is None:
raise ValueError("cmd cannot be None")
self._cmds.append(
{
"args": kwargs.get("args", ""),
"check_rng": kwargs.get("check_rng", True),
"check_stderr": kwargs.get("check_stderr", True),
"cmd": cmd,
"expected_exitcode": kwargs.get("expected_exitcode", ExitStatus.OK),
"kill": kwargs.get("kill"),
"no_wait": kwargs.get("no_wait", False),
"stdout_match": kwargs.get("stdout_match"),
"stdout_no_match": kwargs.get("stdout_no_match"),
"validate": kwargs.get("validate", True),
"env": kwargs.get("env"),
}
)
- def add_cmd_check_stdout(self, cmd=None, **kwargs):
- """Add a simple command with expected output to be executed as part of this test."""
- self._new_cmd(cmd, kwargs.pop("args", ""), ExitStatus.OK,
- stdout_match=kwargs.get("stdout_match"),
- stdout_no_match=kwargs.get("stdout_no_match"),
- env=kwargs.get("env"))
-
def add_cmd_expected_fail(self, cmd=None, **kwargs):
"""Add a command that is expected to fail to be executed as part of this test."""
self._new_cmd(cmd, kwargs.pop("args", ""), kwargs.get("expected_exitcode", ExitStatus.ERROR))
def add_cmd_no_wait(self, cmd=None, **kwargs):
"""Add a simple command to be executed (without waiting) as part of this test."""
self._new_cmd(cmd, kwargs.pop("args", ""), ExitStatus.OK,
no_wait=kwargs.get("no_wait", True))
def add_log_pattern(self, pattern, negative=False, regex=False):
"""Add a pattern that should appear in the test's logs."""
self._patterns.append(Pattern(pattern, negative=negative, regex=regex))
def _signal_dict(self):
"""Return a dictionary mapping signal numbers to their names."""
# FIXME: When we support python >= 3.5, this function can be replaced with:
# signal.Signals(self.daemon_process.returncode).name
return {
getattr(signal, _signame): _signame
for _signame in dir(signal)
if _signame.startswith("SIG") and not _signame.startswith("SIG_")
}
def clean_environment(self):
"""Clean up the host after executing a test."""
if self._daemon_process:
if self._daemon_process.poll() is None:
self._daemon_process.terminate()
self._daemon_process.wait()
else:
rc = self._daemon_process.returncode
signame = self._signal_dict().get(-rc, "RET=%s" % rc)
msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)."
self._result_txt = msg % (self.name, self._daemon_location, signame)
self.exitcode = ExitStatus.ERROR
self._daemon_process = None
self._daemon_output = ""
# the default for utf-8 encoding would error out if e.g. memory corruption
# makes fenced output any kind of 8 bit value - while still interesting
# for debugging and we'd still like the regression-test to go over the
# full set of test-cases
with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile:
for line in logfile.readlines():
self._daemon_output += line
if self.verbose:
print("Daemon Output Start")
print(self._daemon_output)
print("Daemon Output End")
def print_result(self, filler):
"""Print the result of the last test execution."""
print("%s%s" % (filler, self._result_txt))
def run(self):
"""Execute this test."""
i = 1
self.start_environment()
if self.verbose:
print("\n--- START TEST - %s" % self.name)
self._result_txt = "SUCCESS - '%s'" % (self.name)
self.exitcode = ExitStatus.OK
for cmd in self._cmds:
try:
self.run_cmd(cmd)
except ExitCodeError as e:
print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode']))
self.set_error(i, cmd)
break
except OutputNotFoundError as e:
print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e))
self.set_error(i, cmd)
break
except OutputFoundError as e:
print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_no_match'], e))
self.set_error(i, cmd)
break
except XmlValidationError as e:
print("Step %d FAILED - xmllint failed: %s" % (i, e))
self.set_error(i, cmd)
break
if self.verbose:
print("Step %d SUCCESS" % (i))
i += 1
self.clean_environment()
if self.exitcode == ExitStatus.OK:
self._match_log_patterns()
print(self._result_txt)
if self.verbose:
print("--- END TEST - %s\n" % self.name)
self.executed = True
def run_cmd(self, args):
"""Execute a command as part of this test."""
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print("\n\nRunning: %s" % " ".join(cmd))
# FIXME: Using "with" here breaks fencing merge tests.
# pylint: disable=consider-using-with
if args['env']:
new_env = os.environ.copy()
new_env.update(args['env'])
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=new_env)
else:
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print("Also running: %s" % args['kill'])
# Typically, the kill argument is used to detect some sort of
# failure. Without yielding for a few seconds here, the process
# launched earlier that is listening for the failure may not have
# time to connect to pacemaker-execd.
time.sleep(2)
subprocess.Popen(shlex.split(args['kill']))
if not args['no_wait']:
test.wait()
else:
return ExitStatus.OK
output = pipe_communicate(test, check_stderr=args['check_stderr'])
if self.verbose:
print(output)
if test.returncode != args['expected_exitcode']:
raise ExitCodeError(test.returncode)
if args['stdout_match'] is not None and \
re.search(args['stdout_match'], output) is None:
raise OutputNotFoundError(output)
if args['stdout_no_match'] is not None and \
re.search(args['stdout_no_match'], output) is not None:
raise OutputFoundError(output)
if args['validate']:
if args['check_rng']:
rng_file = "%s/api/api-result.rng" % rng_directory()
else:
rng_file = None
cmd = find_validator(rng_file)
if not cmd:
raise XmlValidationError("Could not find validator for %s" % rng_file)
if self.verbose:
print("\nRunning: %s" % " ".join(cmd))
with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as validator:
output = pipe_communicate(validator, check_stderr=True, stdin=output)
if self.verbose:
print(output)
if validator.returncode != 0:
raise XmlValidationError(output)
return ExitStatus.OK
def set_error(self, step, cmd):
"""Record failure of this test."""
msg = "FAILURE - '%s' failed at step %d. Command: %s %s"
self._result_txt = msg % (self.name, step, cmd['cmd'], cmd['args'])
self.exitcode = ExitStatus.ERROR
def start_environment(self):
"""Prepare the host for executing a test."""
if os.path.exists(self.logpath):
os.remove(self.logpath)
self._kill_daemons()
self._start_daemons()
logfile = None
init_time = time.time()
update_time = init_time
while True:
# FIXME: Eventually use 'with' here, which seems complicated given
# everything happens in a loop.
# pylint: disable=consider-using-with
time.sleep(0.1)
if not self.force_wait and logfile is None \
and os.path.exists(self.logpath):
logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1")
if not self.force_wait and logfile is not None:
for line in logfile.readlines():
if "successfully started" in line:
return
now = time.time()
if self.timeout > 0 and (now - init_time) >= self.timeout:
if not self.force_wait:
print("\tDaemon %s doesn't seem to have been initialized within %fs."
"\n\tConsider specifying a longer '--timeout' value."
% (self._daemon_location, self.timeout))
return
if self.verbose and (now - update_time) >= 5:
print("Waiting for %s to be initialized: %fs ..."
% (self._daemon_location, now - init_time))
update_time = now
class Tests:
"""The base class for a collection of regression tests."""
def __init__(self, **kwargs):
"""
Create a new Tests instance.
This method must be provided by all subclasses, which must call
Tests.__init__ first.
Keywork arguments:
force_wait --
logdir -- The base directory under which to create a directory
to store output and temporary data.
timeout -- How long to wait for the test to complete.
verbose -- Whether to print additional information, including
verbose command output and daemon log files.
"""
self.force_wait = kwargs.get("force_wait", False)
self.logdir = kwargs.get("logdir", "/tmp")
self.timeout = kwargs.get("timeout", 2)
self.verbose = kwargs.get("verbose", False)
self._tests = []
def exit(self):
"""Exit (with error status code if any test failed)."""
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
sys.exit(ExitStatus.ERROR)
sys.exit(ExitStatus.OK)
def print_list(self):
"""List all registered tests."""
print("\n==== %d TESTS FOUND ====" % len(self._tests))
print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION"))
print("%35s - %s" % ("--------------------", "--------------------"))
for test in self._tests:
print("%35s - %s" % (test.name, test.description))
print("==== END OF LIST ====\n")
def print_results(self):
"""Print summary of results of executed tests."""
failures = 0
success = 0
print("\n\n======= FINAL RESULTS ==========")
print("\n--- FAILURE RESULTS:")
for test in self._tests:
if not test.executed:
continue
if test.exitcode != ExitStatus.OK:
failures += 1
test.print_result(" ")
else:
success += 1
if failures == 0:
print(" None")
print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures))
def run_single(self, name):
"""Run a single named test."""
for test in self._tests:
if test.name == name:
test.run()
break
def run_tests(self):
"""Run all tests."""
for test in self._tests:
test.run()
def run_tests_matching(self, pattern):
"""Run all tests whose name matches a pattern."""
for test in self._tests:
if test.name.count(pattern) != 0:
test.run()