Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2825100
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
68 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/cts/cts-fencing.in b/cts/cts-fencing.in
index ce5331f34d..0a484bebce 100644
--- a/cts/cts-fencing.in
+++ b/cts/cts-fencing.in
@@ -1,1442 +1,1437 @@
#!@PYTHON@
""" Regression tests for Pacemaker's fencer
"""
# Pacemaker targets compatibility with Python 2.7 and 3.2+
from __future__ import print_function, unicode_literals, absolute_import, division
__copyright__ = "Copyright 2012-2018 Andrew Beekhof <andrew@beekhof.net>"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import io
import os
import sys
import subprocess
import shlex
import time
# Where to find test binaries
# Prefer the source tree if available
BUILD_DIR = "@abs_top_builddir@"
TEST_DIR = sys.path[0]
+AUTOGEN_COROSYNC_TEMPLATE = """
+totem {
+ version: 2
+ cluster_name: cts-fencing
+ crypto_cipher: none
+ crypto_hash: none
+ transport: udp
+}
+
+nodelist {
+ node {
+ nodeid: 1
+ name: %s
+ ring0_addr: 127.0.0.1
+ }
+}
+
+logging {
+ debug: off
+ to_syslog: no
+ to_stderr: no
+ to_logfile: yes
+ logfile: @CRM_LOG_DIR@/corosync.test.log
+}
+"""
+
# These values must be kept in sync with include/crm/crm.h
class CrmExit(object):
OK = 0
ERROR = 1
INVALID_PARAM = 2
UNIMPLEMENT_FEATURE = 3
INSUFFICIENT_PRIV = 4
NOT_INSTALLED = 5
NOT_CONFIGURED = 6
NOT_RUNNING = 7
USAGE = 64
DATAERR = 65
NOINPUT = 66
NOUSER = 67
NOHOST = 68
UNAVAILABLE = 69
SOFTWARE = 70
OSERR = 71
OSFILE = 72
CANTCREAT = 73
IOERR = 74
TEMPFAIL = 75
PROTOCOL = 76
NOPERM = 77
CONFIG = 78
FATAL = 100
PANIC = 101
DISCONNECT = 102
SOLO = 103
DIGEST = 104
NOSUCH = 105
QUORUM = 106
UNSAFE = 107
EXISTS = 108
MULTIPLE = 109
OLD = 110
TIMEOUT = 124
MAX = 255
def update_path():
""" Set the PATH environment variable appropriately for the tests """
new_path = os.environ['PATH']
if os.path.exists("%s/cts-fencing.in" % TEST_DIR):
print("Running tests from the source tree: %s (%s)" % (BUILD_DIR, TEST_DIR))
# For pacemaker-fenced and cts-fence-helper
new_path = "%s/daemons/fenced:%s" % (BUILD_DIR, new_path)
new_path = "%s/tools:%s" % (BUILD_DIR, new_path) # For stonith_admin
new_path = "%s/cts:%s" % (BUILD_DIR, new_path) # For cts-support
else:
print("Running tests from the install tree: @CRM_DAEMON_DIR@ (not %s)" % TEST_DIR)
# For pacemaker-fenced, cts-fence-helper, and cts-support
new_path = "@CRM_DAEMON_DIR@:%s" % (new_path)
print('Using PATH="{}"'.format(new_path))
os.environ['PATH'] = new_path
def pipe_output(pipes, stdout=True, stderr=False):
""" Wrapper to get text output from pipes regardless of Python version """
output = ""
pipe_outputs = pipes.communicate()
if sys.version_info < (3,):
if stdout:
output = output + pipe_outputs[0]
if stderr:
output = output + pipe_outputs[1]
else:
if stdout:
output = output + pipe_outputs[0].decode(sys.stdout.encoding)
if stderr:
output = output + pipe_outputs[1].decode(sys.stderr.encoding)
return output
def output_from_command(command):
""" Execute command and return its standard output """
test = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
test.wait()
return pipe_output(test).split("\n")
def localname():
""" Return the uname of the local host """
our_uname = output_from_command("uname -n")
if our_uname:
our_uname = our_uname[0]
else:
our_uname = "localhost"
return our_uname
def killall(process):
""" Kill all instances of a process """
cmd = shlex.split("killall -9 -q %s" % process)
test = subprocess.Popen(cmd, stdout=subprocess.PIPE)
test.wait()
class TestError(Exception):
""" Base class for exceptions in this module """
pass
class ExitCodeError(TestError):
""" Exception raised when command exit status is unexpected """
def __init__(self, exit_code):
self.exit_code = exit_code
def __str__(self):
return repr(self.exit_code)
class OutputNotFoundError(TestError):
""" Exception raised when command output does not contain wanted string """
def __init__(self, exit_code):
self.output = output
def __str__(self):
return repr(self.output)
class OutputFoundError(TestError):
""" Exception raised when command output contains unwanted string """
def __init__(self, exit_code):
self.output = output
def __str__(self):
return repr(self.output)
class Test(object):
""" Executor for a single test """
def __init__(self, name, description, verbose=0, with_cpg=0):
self.name = name
self.description = description
self.cmds = []
self.verbose = verbose
self.result_txt = ""
self.cmd_tool_output = ""
self.result_exitcode = CrmExit.OK
if with_cpg:
self.stonith_options = "-c"
self.enable_corosync = 1
else:
self.stonith_options = "-s"
self.enable_corosync = 0
self.stonith_process = None
self.stonith_output = ""
self.stonith_patterns = []
self.negative_stonith_patterns = []
self.executed = 0
def __new_cmd(self, cmd, args, exitcode, stdout_match="", no_wait=0, stdout_negative_match="", kill=None):
""" Add a command to be executed as part of this test """
self.cmds.append(
{
"cmd" : cmd,
"kill" : kill,
"args" : args,
"expected_exitcode" : exitcode,
"stdout_match" : stdout_match,
"stdout_negative_match" : stdout_negative_match,
"no_wait" : no_wait,
}
)
def start_environment(self):
""" Prepare the host for executing a test """
# Make sure we are in full control
killall("pacemakerd")
killall("pacemaker-fenced")
if self.verbose:
self.stonith_options = self.stonith_options + " -V"
print("Starting pacemaker-fenced with %s" % self.stonith_options)
if os.path.exists("/tmp/stonith-regression.log"):
os.remove('/tmp/stonith-regression.log')
cmd = "pacemaker-fenced %s -l /tmp/stonith-regression.log" % self.stonith_options
self.stonith_process = subprocess.Popen(shlex.split(cmd))
time.sleep(1)
def clean_environment(self):
""" Clean up the host after executing a test """
if self.stonith_process:
self.stonith_process.terminate()
self.stonith_process.wait()
self.stonith_output = ""
self.stonith_process = None
logfile = io.open('/tmp/stonith-regression.log', 'rt')
for line in logfile.readlines():
self.stonith_output = self.stonith_output + line
if self.verbose:
print("Daemon Output Start")
print(self.stonith_output)
print("Daemon Output End")
os.remove('/tmp/stonith-regression.log')
def add_stonith_log_pattern(self, pattern):
""" Add a log pattern to expect from this test """
self.stonith_patterns.append(pattern)
def add_stonith_neg_log_pattern(self, pattern):
""" Add a log pattern that should not occur with this test """
self.negative_stonith_patterns.append(pattern)
def add_cmd(self, cmd, args):
""" Add a simple command to be executed as part of this test """
self.__new_cmd(cmd, args, CrmExit.OK, "")
def add_cmd_no_wait(self, cmd, args):
""" Add a simple command to be executed (without waiting) as part of this test """
self.__new_cmd(cmd, args, CrmExit.OK, "", 1)
def add_cmd_check_stdout(self, cmd, args, match, no_match=""):
""" Add a simple command with expected output to be executed as part of this test """
self.__new_cmd(cmd, args, CrmExit.OK, match, 0, no_match)
def add_expected_fail_cmd(self, cmd, args, exitcode=CrmExit.ERROR):
""" Add a command to be executed as part of this test and expected to fail """
self.__new_cmd(cmd, args, exitcode, "")
def get_exitcode(self):
""" Return the exit status of the last test execution """
return self.result_exitcode
def print_result(self, filler):
""" Print the result of the last test execution """
print("%s%s" % (filler, self.result_txt))
def run_cmd(self, args):
""" Execute a command as part of this test """
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print("\n\nRunning: "+" ".join(cmd))
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print("Also running: "+args['kill'])
subprocess.Popen(shlex.split(args['kill']))
if args['no_wait'] == 0:
test.wait()
else:
return CrmExit.OK
output = pipe_output(test, stderr=True)
if self.verbose:
print(output)
if test.returncode != args['expected_exitcode']:
raise ExitCodeError(test.returncode)
if args['stdout_match'] != "" and output.count(args['stdout_match']) == 0:
raise OutputNotFoundError(output)
if args['stdout_negative_match'] != "" and output.count(args['stdout_negative_match']) != 0:
raise OutputFoundError(output)
def count_negative_matches(self, outline):
""" Return 1 if a line matches patterns that shouldn't have occurred """
count = 0
for line in self.negative_stonith_patterns:
if outline.count(line):
count = 1
if self.verbose:
print("This pattern should not have matched = '%s" % (line))
return count
def match_stonith_patterns(self):
""" Check test output for expected patterns """
negative_matches = 0
cur = 0
pats = self.stonith_patterns
total_patterns = len(self.stonith_patterns)
if len(self.stonith_patterns) == 0 and len(self.negative_stonith_patterns) == 0:
return
for line in self.stonith_output.split("\n"):
negative_matches = negative_matches + self.count_negative_matches(line)
if len(pats) == 0:
continue
cur = -1
for pat in pats:
cur = cur + 1
if line.count(pats[cur]):
del pats[cur]
break
if len(pats) > 0 or negative_matches:
if self.verbose:
for pat in pats:
print("Pattern Not Matched = '%s'" % pat)
msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches."
self.result_txt = msg % (self.name, len(pats), total_patterns, negative_matches)
self.result_exitcode = CrmExit.ERROR
def set_error(self, step, cmd):
""" Record failure of this test """
msg = "FAILURE - '%s' failed at step %d. Command: %s %s"
self.result_txt = msg % (self.name, step, cmd['cmd'], cmd['args'])
self.result_exitcode = CrmExit.ERROR
def run(self):
""" Execute this test. """
res = 0
i = 1
self.start_environment()
if self.verbose:
print("\n--- START TEST - %s" % self.name)
self.result_txt = "SUCCESS - '%s'" % (self.name)
self.result_exitcode = CrmExit.OK
for cmd in self.cmds:
try:
self.run_cmd(cmd)
except ExitCodeError as e:
print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode']))
self.set_error(i, cmd);
break
except OutputNotFoundError as e:
print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e))
self.set_error(i, cmd);
break
except OutputFoundError as e:
print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e))
self.set_error(i, cmd);
break
if self.verbose:
print("Step %d SUCCESS" % (i))
i = i + 1
self.clean_environment()
if self.result_exitcode == CrmExit.OK:
self.match_stonith_patterns()
print(self.result_txt)
if self.verbose:
print("--- END TEST - %s\n" % self.name)
self.executed = 1
return res
class Tests(object):
""" Collection of all fencing regression tests """
def __init__(self, verbose=0):
self.tests = []
self.verbose = verbose
- self.autogen_corosync_cfg = 0
- if not os.path.exists("/etc/corosync/corosync.conf"):
- self.autogen_corosync_cfg = 1
+ self.autogen_corosync_cfg = not os.path.exists("/etc/corosync/corosync.conf")
def new_test(self, name, description, with_cpg=0):
""" Create a named test """
test = Test(name, description, self.verbose, with_cpg)
self.tests.append(test)
return test
def print_list(self):
""" List all registered tests """
print("\n==== %d TESTS FOUND ====" % (len(self.tests)))
print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION"))
print("%35s - %s" % ("--------------------", "--------------------"))
for test in self.tests:
print("%35s - %s" % (test.name, test.description))
print("==== END OF LIST ====\n")
def start_corosync(self):
""" Start the corosync process """
if self.verbose:
print("Starting corosync")
test = subprocess.Popen("corosync", stdout=subprocess.PIPE)
test.wait()
time.sleep(10)
def run_single(self, name):
""" Run a single named test """
for test in self.tests:
if test.name == name:
test.run()
break
def run_tests_matching(self, pattern):
""" Run all tests whose name matches a pattern """
for test in self.tests:
if test.name.count(pattern) != 0:
test.run()
def run_cpg_only(self):
""" Run all corosync-enabled tests """
for test in self.tests:
if test.enable_corosync:
test.run()
def run_no_cpg(self):
""" Run all standalone tests """
for test in self.tests:
if not test.enable_corosync:
test.run()
def run_tests(self):
""" Run all tests """
for test in self.tests:
test.run()
def exit(self):
""" Exit (with error status code if any test failed) """
for test in self.tests:
if test.executed == 0:
continue
if test.get_exitcode() != CrmExit.OK:
sys.exit(CrmExit.ERROR)
sys.exit(CrmExit.OK)
def print_results(self):
""" Print summary of results of executed tests """
failures = 0
success = 0
print("\n\n======= FINAL RESULTS ==========")
print("\n--- FAILURE RESULTS:")
for test in self.tests:
if test.executed == 0:
continue
if test.get_exitcode() != CrmExit.OK:
failures = failures + 1
test.print_result(" ")
else:
success = success + 1
if failures == 0:
print(" None")
print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures))
def build_api_sanity_tests(self):
""" Register tests to verify basic API usage """
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("standalone_low_level_api_test", "Sanity test client api in standalone mode.")
test.add_cmd("cts-fence-helper", "-t %s" % (verbose_arg))
test = self.new_test("cpg_low_level_api_test", "Sanity test client api using mainloop and cpg.", 1)
test.add_cmd("cts-fence-helper", "-m %s" % (verbose_arg))
def build_custom_timeout_tests(self):
""" Register tests to verify custom timeout usage """
# custom timeout without topology
test = self.new_test("cpg_custom_timeout_1",
"Verify per device timeouts work as expected without using topology.", 1)
test.add_cmd('stonith_admin',
'-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=1"')
test.add_cmd('stonith_admin',
'-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=4"')
test.add_cmd("stonith_admin", "-F node3 -t 2")
# timeout is 2+1+4 = 7
test.add_stonith_log_pattern("Total timeout set to 7")
# custom timeout _WITH_ topology
test = self.new_test("cpg_custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology.", 1)
test.add_cmd('stonith_admin',
'-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=1"')
test.add_cmd('stonith_admin',
'-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=4000"')
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", "-F node3 -t 2")
# timeout is 2+1+4000 = 4003
test.add_stonith_log_pattern("Total timeout set to 4003")
def build_fence_merge_tests(self):
""" Register tests to verify when fence operations should be merged """
### Simple test that overlapping fencing operations get merged
test = self.new_test("cpg_custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used.", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### one merger will happen
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
### Test that multiple mergers occur
test = self.new_test("cpg_custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"delay=2\" -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### 4 mergers should occur
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
### Test that multiple mergers occur with topologies used
test = self.new_test("cpg_custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels.",
1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### 4 mergers should occur
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
def build_fence_no_merge_tests(self):
""" Register tests to verify when fence operations should not be merged """
test = self.new_test("cpg_custom_no_merge",
"Verify differing fencing operations are not merged", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3 node2\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3 node2\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3 node2\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", "-F node2 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
test.add_stonith_neg_log_pattern("Merging stonith action off for node node3 originating from client")
def build_standalone_tests(self):
""" Register a grab bag of tests that can be executed in standalone or corosync mode """
test_types = [
{
"prefix" : "standalone",
"use_cpg" : 0,
},
{
"prefix" : "cpg",
"use_cpg" : 1,
},
]
# test what happens when all devices timeout
for test_type in test_types:
test = self.new_test("%s_fence_multi_device_failure" % test_type["prefix"],
"Verify that all devices timeout, a fencing failure is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false3 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
if test_type["use_cpg"] == 1:
test.add_expected_fail_cmd("stonith_admin", "-F node3 -t 2", CrmExit.TIMEOUT)
test.add_stonith_log_pattern("Total timeout set to 6")
else:
test.add_expected_fail_cmd("stonith_admin", "-F node3 -t 2", CrmExit.ERROR)
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: ")
test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: ")
test.add_stonith_log_pattern("for host 'node3' with device 'false3' returned: ")
# test what happens when multiple devices can fence a node, but the first device fails.
for test_type in test_types:
test = self.new_test("%s_fence_device_failure_rollover" % test_type["prefix"],
"Verify that when one fence device fails for a node, the others are tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-F node3 -t 2")
if test_type["use_cpg"] == 1:
test.add_stonith_log_pattern("Total timeout set to 6")
# simple topology test for one device
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_simple" % test_type["prefix"],
"Verify all fencing devices at a level are used.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("Total timeout set to 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# add topology, delete topology, verify fencing still works
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_add_remove" % test_type["prefix"],
"Verify fencing occurrs after all topology levels are removed",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true")
test.add_cmd("stonith_admin", "-d node3 -i 1")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("Total timeout set to 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test what happens when the first fencing level has multiple devices.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_device_fails" % test_type["prefix"],
"Verify if one device in a level fails, the other is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R false -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true")
test.add_cmd("stonith_admin", "-F node3 -t 20")
test.add_stonith_log_pattern("Total timeout set to 40")
test.add_stonith_log_pattern("for host 'node3' with device 'false' returned: -201")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test what happens when the first fencing level fails.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_multi_level_fails" % test_type["prefix"],
"Verify if one level fails, the next leve is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", "-F node3 -t 3")
test.add_stonith_log_pattern("Total timeout set to 18")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -201")
test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: -201")
test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0")
test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0")
# test what happens when the first fencing level had devices that no one has registered
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_missing_devices" % test_type["prefix"],
"Verify topology can continue with missing devices.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", "-F node3 -t 2")
# Test what happens if multiple fencing levels are defined, and then the first one is removed.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_level_removal" % test_type["prefix"],
"Verify level removal works.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit.
test.add_cmd("stonith_admin", "-d node3 -i 2")
test.add_cmd("stonith_admin", "-F node3 -t 20")
test.add_stonith_log_pattern("Total timeout set to 8")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -201")
test.add_stonith_neg_log_pattern("for host 'node3' with device 'false2' returned: ")
test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0")
test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0")
# Test targeting a topology level by node name pattern.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_level_pattern" % test_type["prefix"],
"Verify targeting topology by node name pattern works.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"""-R true -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node1 node2 node3" """)
test.add_cmd("stonith_admin", """-r '@node.*' -i 1 -v true""")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test allowing commas and semicolons as delimiters in pcmk_host_list
for test_type in test_types:
test = self.new_test("%s_host_list_delimiters" % test_type["prefix"],
"Verify commas and semicolons can be used as pcmk_host_list delimiters",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"""-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node1,node2,node3" """)
test.add_cmd("stonith_admin",
"""-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=pcmk1;pcmk2;pcmk3" """)
test.add_cmd("stonith_admin", "stonith_admin -F node2 -t 2")
test.add_cmd("stonith_admin", "stonith_admin -F pcmk3 -t 2")
test.add_stonith_log_pattern("for host 'node2' with device 'true1' returned: 0")
test.add_stonith_log_pattern("for host 'pcmk3' with device 'true2' returned: 0")
# test the stonith builds the correct list of devices that can fence a node.
for test_type in test_types:
test = self.new_test("%s_list_devices" % test_type["prefix"],
"Verify list of devices that can fence a node is correct",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd_check_stdout("stonith_admin", "-l node1 -V", "true2", "true1")
test.add_cmd_check_stdout("stonith_admin", "-l node1 -V", "true3", "true1")
# simple test of device monitor
for test_type in test_types:
test = self.new_test("%s_monitor" % test_type["prefix"],
"Verify device is reachable", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-Q true1")
test.add_cmd("stonith_admin", "-Q false1")
test.add_expected_fail_cmd("stonith_admin", "-Q true2", CrmExit.ERROR)
# Verify monitor occurs for duration of timeout period on failure
for test_type in test_types:
test = self.new_test("%s_monitor_timeout" % test_type["prefix"],
"Verify monitor uses duration of timeout period given.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
'-R true1 -a fence_dummy -o "mode=fail" -o "monitor_mode=fail" -o "pcmk_host_list=node3"')
test.add_expected_fail_cmd("stonith_admin", "-Q true1 -t 5", CrmExit.ERROR)
test.add_stonith_log_pattern("Attempt 2 to execute")
# Verify monitor occurs for duration of timeout period on failure, but stops at max retries
for test_type in test_types:
test = self.new_test("%s_monitor_timeout_max_retries" % test_type["prefix"],
"Verify monitor retries until max retry value or timeout is hit.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
'-R true1 -a fence_dummy -o "mode=fail" -o "monitor_mode=fail" -o "pcmk_host_list=node3"')
test.add_expected_fail_cmd("stonith_admin", "-Q true1 -t 15", CrmExit.ERROR)
test.add_stonith_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times")
# simple register test
for test_type in test_types:
test = self.new_test("%s_register" % test_type["prefix"],
"Verify devices can be registered and un-registered",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-Q true1")
test.add_cmd("stonith_admin", "-D true1")
test.add_expected_fail_cmd("stonith_admin", "-Q true1", CrmExit.ERROR)
# simple reboot test
for test_type in test_types:
test = self.new_test("%s_reboot" % test_type["prefix"],
"Verify devices can be rebooted",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-B node3 -t 2")
test.add_cmd("stonith_admin", "-D true1")
test.add_expected_fail_cmd("stonith_admin", "-Q true1", CrmExit.ERROR)
# test fencing history.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_fence_history" % test_type["prefix"],
"Verify last fencing operation is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-F node3 -t 2 -V")
test.add_cmd_check_stdout("stonith_admin", "-H node3", "was able to turn off node node3", "")
# simple test of dynamic list query
for test_type in test_types:
test = self.new_test("%s_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd_check_stdout("stonith_admin", "-l fake_port_1", "3 devices found")
# fence using dynamic list query
for test_type in test_types:
test = self.new_test("%s_fence_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-F fake_port_1 -t 5 -V")
# simple test of query using status action
for test_type in test_types:
test = self.new_test("%s_status_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"")
test.add_cmd("stonith_admin", "-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"")
test.add_cmd("stonith_admin", "-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"")
test.add_cmd_check_stdout("stonith_admin", "-l fake_port_1", "3 devices found")
# test what happens when no reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_no_reboot_support" % test_type["prefix"],
"Verify reboot action defaults to off when no reboot action is advertised by agent.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy_no_reboot -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-B node1 -t 5 -V")
test.add_stonith_log_pattern("does not advertise support for 'reboot', performing 'off'")
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
# make sure reboot is used when reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_with_reboot_support" % test_type["prefix"],
"Verify reboot action can be used when metadata advertises it.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-B node1 -t 5 -V")
test.add_stonith_neg_log_pattern("does not advertise support for 'reboot', performing 'off'")
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
def build_nodeid_tests(self):
""" Register tests that use a corosync node id """
our_uname = localname()
### verify nodeid is supplied when nodeid is in the metadata parameters
test = self.new_test("cpg_supply_nodeid",
"Verify nodeid is given when fence agent has nodeid as parameter", 1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname))
test.add_cmd("stonith_admin", "-F %s -t 3" % (our_uname))
test.add_stonith_log_pattern("For stonith action (off) for victim %s, adding nodeid" % (our_uname))
### verify nodeid is _NOT_ supplied when nodeid is not in the metadata parameters
test = self.new_test("cpg_do_not_supply_nodeid",
"Verify nodeid is _NOT_ given when fence agent does not have nodeid as parameter",
1)
# use a host name that won't be in corosync.conf
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=regr-test\"")
test.add_cmd("stonith_admin", "-F regr-test -t 3")
test.add_stonith_neg_log_pattern("For stonith action (off) for victim regr-test, adding nodeid")
### verify nodeid use doesn't explode standalone mode
test = self.new_test("standalone_do_not_supply_nodeid",
"Verify nodeid in metadata parameter list doesn't kill standalone mode",
0)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname))
test.add_cmd("stonith_admin", "-F %s -t 3" % (our_uname))
test.add_stonith_neg_log_pattern("For stonith action (off) for victim %s, adding nodeid" % (our_uname))
def build_unfence_tests(self):
""" Register tests that verify unfencing """
our_uname = localname()
### verify unfencing using automatic unfencing
test = self.new_test("cpg_unfence_required_1",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
# both devices should be executed
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)")
### verify unfencing using automatic unfencing fails if any of the required agents fail
test = self.new_test("cpg_unfence_required_2",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=fail" -o "pcmk_host_list=%s"' % (our_uname))
test.add_expected_fail_cmd("stonith_admin", "-U %s -t 6" % (our_uname), CrmExit.ERROR)
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_3",
"Verify require unfencing on all devices even when at different topology levels",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v true1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true2" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)")
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_4",
"Verify all required devices are executed even with topology levels fail.",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true3 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true4 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false3 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false4 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v true1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v false1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v false2" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true2" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v false3" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true3" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 3 -v false4" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 4 -v true4" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true3' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true4' returned: 0 (OK)")
def build_unfence_on_target_tests(self):
""" Register tests that verify unfencing that runs on the target """
our_uname = localname()
### verify unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_1",
"Verify unfencing with on_target = true", 1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("(on) to be executed on the target node")
### verify failure of unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_2",
"Verify failure unfencing with on_target = true",
1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake_1234\"" % (our_uname))
test.add_expected_fail_cmd("stonith_admin", "-U node_fake_1234 -t 3", CrmExit.ERROR)
test.add_stonith_log_pattern("(on) to be executed on the target node")
### verify unfencing using on_target device with topology
test = self.new_test("cpg_unfence_on_target_3",
"Verify unfencing with on_target = true using topology",
1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node3\"" % (our_uname))
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node3\"" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v true1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true2" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("(on) to be executed on the target node")
### verify unfencing using on_target device with topology fails when victim node doesn't exist
test = self.new_test("cpg_unfence_on_target_4",
"Verify unfencing failure with on_target = true using topology",
1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake\"" % (our_uname))
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake\"" % (our_uname))
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node_fake -i 2 -v true2")
test.add_expected_fail_cmd("stonith_admin", "-U node_fake -t 3", CrmExit.ERROR)
test.add_stonith_log_pattern("(on) to be executed on the target node")
def build_remap_tests(self):
""" Register tests that verify remapping of reboots to off-on """
test = self.new_test("cpg_remap_simple",
"Verify sequential topology reboot is remapped to all-off-then-all-on", 1)
test.add_cmd("stonith_admin",
"""-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """
"""-o "pcmk_off_timeout=1" -o "pcmk_reboot_timeout=10" """)
test.add_cmd("stonith_admin",
"""-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """
"""-o "pcmk_off_timeout=2" -o "pcmk_reboot_timeout=20" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_stonith_log_pattern("Total timeout set to 3 for peer's fencing of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true2'")
test.add_stonith_log_pattern("Remapped off of node_fake complete, remapping to on")
# fence_dummy sets "on" as an on_target action
test.add_stonith_log_pattern("Ignoring true1 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Ignoring true2 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test = self.new_test("cpg_remap_automatic",
"Verify remapped topology reboot skips automatic 'on'", 1)
test.add_cmd("stonith_admin",
"""-R true1 -a fence_dummy_auto_unfence """
"""-o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin",
"""-R true2 -a fence_dummy_auto_unfence """
"""-o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true2'")
test.add_stonith_log_pattern("Remapped off of node_fake complete, remapping to on")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test.add_stonith_neg_log_pattern("perform op 'node_fake on' with")
test.add_stonith_neg_log_pattern("'on' failure")
test = self.new_test("cpg_remap_complex_1",
"Verify remapped topology reboot in second level works if non-remapped first level fails",
1)
test.add_cmd("stonith_admin", """-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node_fake -i 2 -v true1 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("perform op 'node_fake reboot' with 'false1'")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true2'")
test.add_stonith_log_pattern("Remapped off of node_fake complete, remapping to on")
test.add_stonith_log_pattern("Ignoring true1 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Ignoring true2 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test = self.new_test("cpg_remap_complex_2",
"Verify remapped topology reboot failure in second level proceeds to third level",
1)
test.add_cmd("stonith_admin", """-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true3 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node_fake -i 2 -v true1 -v false2 -v true3")
test.add_cmd("stonith_admin", "-r node_fake -i 3 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("perform op 'node_fake reboot' with 'false1'")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'false2'")
test.add_stonith_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake reboot' with 'true2'")
test.add_stonith_neg_log_pattern("node_fake with true3")
def setup_environment(self, use_corosync):
""" Prepare the host before executing any tests """
- if self.autogen_corosync_cfg and use_corosync:
- corosync_conf = ("""
-totem {
- version: 2
- crypto_cipher: none
- crypto_hash: none
-
- nodeid: 101
- secauth: off
-
- interface {
- ttl: 1
- ringnumber: 0
- mcastport: 6666
- mcastaddr: 226.94.1.1
- bindnetaddr: 127.0.0.1
- }
-}
-
-logging {
- debug: off
- fileline: off
- to_syslog: no
- to_stderr: no
- syslog_facility: daemon
- timestamp: on
- to_logfile: yes
- logfile: @CRM_LOG_DIR@/corosync.test.log
- logfile_priority: info
-}
-""")
-
- os.system("cat <<-END >>/etc/corosync/corosync.conf\n%s\nEND" % (corosync_conf))
-
if use_corosync:
+ if self.autogen_corosync_cfg:
+ corosync_cfg = io.open("/etc/corosync/corosync.conf", "w")
+ corosync_cfg.write(AUTOGEN_COROSYNC_TEMPLATE % (localname()))
+ corosync_cfg.close()
+
### make sure we are in control ###
killall("corosync")
self.start_corosync()
subprocess.call(["cts-support", "install"])
def cleanup_environment(self, use_corosync):
""" Clean up the host after executing desired tests """
if use_corosync:
killall("corosync")
if self.verbose and os.path.exists('@CRM_LOG_DIR@/corosync.test.log'):
print("Corosync output")
logfile = io.open('@CRM_LOG_DIR@/corosync.test.log', 'rt')
for line in logfile.readlines():
print(line.strip())
os.remove('@CRM_LOG_DIR@/corosync.test.log')
- if self.autogen_corosync_cfg:
- os.system("rm -f /etc/corosync/corosync.conf")
+ if self.autogen_corosync_cfg:
+ os.remove("/etc/corosync/corosync.conf")
subprocess.call(["cts-support", "uninstall"])
class TestOptions(object):
""" Option handler """
def __init__(self):
self.options = {}
self.options['list-tests'] = 0
self.options['run-all'] = 1
self.options['run-only'] = ""
self.options['run-only-pattern'] = ""
self.options['verbose'] = 0
self.options['invalid-arg'] = ""
self.options['cpg-only'] = 0
self.options['no-cpg'] = 0
self.options['show-usage'] = 0
def build_options(self, argv):
""" Set options based on command-line arguments """
args = argv[1:]
skip = 0
for i in range(0, len(args)):
if skip:
skip = 0
continue
elif args[i] == "-h" or args[i] == "--help":
self.options['show-usage'] = 1
elif args[i] == "-l" or args[i] == "--list-tests":
self.options['list-tests'] = 1
elif args[i] == "-V" or args[i] == "--verbose":
self.options['verbose'] = 1
elif args[i] == "-n" or args[i] == "--no-cpg":
self.options['no-cpg'] = 1
elif args[i] == "-c" or args[i] == "--cpg-only":
self.options['cpg-only'] = 1
elif args[i] == "-r" or args[i] == "--run-only":
self.options['run-only'] = args[i+1]
skip = 1
elif args[i] == "-p" or args[i] == "--run-only-pattern":
self.options['run-only-pattern'] = args[i+1]
skip = 1
def show_usage(self):
""" Show command usage """
print("usage: " + sys.argv[0] + " [options]")
print("If no options are provided, all tests will run")
print("Options:")
print("\t [--help | -h] Show usage")
print("\t [--list-tests | -l] Print out all registered tests.")
print("\t [--cpg-only | -c] Only run tests that require corosync.")
print("\t [--no-cpg | -n] Only run tests that do not require corosync")
print("\t [--run-only | -r 'testname'] Run a specific test")
print("\t [--verbose | -V] Verbose output")
print("\t [--run-only-pattern | -p 'string'] Run only tests containing the string value")
print("\n\tExample: Run only the test 'start_stop'")
print("\t\t " + sys.argv[0] + " --run-only start_stop")
print("\n\tExample: Run only the tests with the string 'systemd' present in them")
print("\t\t " + sys.argv[0] + " --run-only-pattern systemd")
def main(argv):
""" Run fencing regression tests as specified by arguments """
update_path()
opts = TestOptions()
opts.build_options(argv)
use_corosync = 1
tests = Tests(opts.options['verbose'])
tests.build_standalone_tests()
tests.build_custom_timeout_tests()
tests.build_api_sanity_tests()
tests.build_fence_merge_tests()
tests.build_fence_no_merge_tests()
tests.build_unfence_tests()
tests.build_unfence_on_target_tests()
tests.build_nodeid_tests()
tests.build_remap_tests()
if opts.options['list-tests']:
tests.print_list()
sys.exit(CrmExit.OK)
elif opts.options['show-usage']:
opts.show_usage()
sys.exit(CrmExit.OK)
print("Starting ...")
if opts.options['no-cpg']:
use_corosync = 0
tests.setup_environment(use_corosync)
if opts.options['run-only-pattern'] != "":
tests.run_tests_matching(opts.options['run-only-pattern'])
tests.print_results()
elif opts.options['run-only'] != "":
tests.run_single(opts.options['run-only'])
tests.print_results()
elif opts.options['no-cpg']:
tests.run_no_cpg()
tests.print_results()
elif opts.options['cpg-only']:
tests.run_cpg_only()
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(use_corosync)
tests.exit()
if __name__ == "__main__":
main(sys.argv)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jan 25, 11:26 AM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322358
Default Alt Text
(68 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment