Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/cts/cts-fencing.in b/cts/cts-fencing.in
index ce5331f34d..0a484bebce 100644
--- a/cts/cts-fencing.in
+++ b/cts/cts-fencing.in
@@ -1,1442 +1,1437 @@
#!@PYTHON@
""" Regression tests for Pacemaker's fencer
"""
# Pacemaker targets compatibility with Python 2.7 and 3.2+
from __future__ import print_function, unicode_literals, absolute_import, division
__copyright__ = "Copyright 2012-2018 Andrew Beekhof <andrew@beekhof.net>"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import io
import os
import sys
import subprocess
import shlex
import time
# Where to find test binaries
# Prefer the source tree if available
BUILD_DIR = "@abs_top_builddir@"
TEST_DIR = sys.path[0]
+AUTOGEN_COROSYNC_TEMPLATE = """
+totem {
+ version: 2
+ cluster_name: cts-fencing
+ crypto_cipher: none
+ crypto_hash: none
+ transport: udp
+}
+
+nodelist {
+ node {
+ nodeid: 1
+ name: %s
+ ring0_addr: 127.0.0.1
+ }
+}
+
+logging {
+ debug: off
+ to_syslog: no
+ to_stderr: no
+ to_logfile: yes
+ logfile: @CRM_LOG_DIR@/corosync.test.log
+}
+"""
+
# These values must be kept in sync with include/crm/crm.h
class CrmExit(object):
OK = 0
ERROR = 1
INVALID_PARAM = 2
UNIMPLEMENT_FEATURE = 3
INSUFFICIENT_PRIV = 4
NOT_INSTALLED = 5
NOT_CONFIGURED = 6
NOT_RUNNING = 7
USAGE = 64
DATAERR = 65
NOINPUT = 66
NOUSER = 67
NOHOST = 68
UNAVAILABLE = 69
SOFTWARE = 70
OSERR = 71
OSFILE = 72
CANTCREAT = 73
IOERR = 74
TEMPFAIL = 75
PROTOCOL = 76
NOPERM = 77
CONFIG = 78
FATAL = 100
PANIC = 101
DISCONNECT = 102
SOLO = 103
DIGEST = 104
NOSUCH = 105
QUORUM = 106
UNSAFE = 107
EXISTS = 108
MULTIPLE = 109
OLD = 110
TIMEOUT = 124
MAX = 255
def update_path():
""" Set the PATH environment variable appropriately for the tests """
new_path = os.environ['PATH']
if os.path.exists("%s/cts-fencing.in" % TEST_DIR):
print("Running tests from the source tree: %s (%s)" % (BUILD_DIR, TEST_DIR))
# For pacemaker-fenced and cts-fence-helper
new_path = "%s/daemons/fenced:%s" % (BUILD_DIR, new_path)
new_path = "%s/tools:%s" % (BUILD_DIR, new_path) # For stonith_admin
new_path = "%s/cts:%s" % (BUILD_DIR, new_path) # For cts-support
else:
print("Running tests from the install tree: @CRM_DAEMON_DIR@ (not %s)" % TEST_DIR)
# For pacemaker-fenced, cts-fence-helper, and cts-support
new_path = "@CRM_DAEMON_DIR@:%s" % (new_path)
print('Using PATH="{}"'.format(new_path))
os.environ['PATH'] = new_path
def pipe_output(pipes, stdout=True, stderr=False):
""" Wrapper to get text output from pipes regardless of Python version """
output = ""
pipe_outputs = pipes.communicate()
if sys.version_info < (3,):
if stdout:
output = output + pipe_outputs[0]
if stderr:
output = output + pipe_outputs[1]
else:
if stdout:
output = output + pipe_outputs[0].decode(sys.stdout.encoding)
if stderr:
output = output + pipe_outputs[1].decode(sys.stderr.encoding)
return output
def output_from_command(command):
""" Execute command and return its standard output """
test = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
test.wait()
return pipe_output(test).split("\n")
def localname():
""" Return the uname of the local host """
our_uname = output_from_command("uname -n")
if our_uname:
our_uname = our_uname[0]
else:
our_uname = "localhost"
return our_uname
def killall(process):
""" Kill all instances of a process """
cmd = shlex.split("killall -9 -q %s" % process)
test = subprocess.Popen(cmd, stdout=subprocess.PIPE)
test.wait()
class TestError(Exception):
""" Base class for exceptions in this module """
pass
class ExitCodeError(TestError):
""" Exception raised when command exit status is unexpected """
def __init__(self, exit_code):
self.exit_code = exit_code
def __str__(self):
return repr(self.exit_code)
class OutputNotFoundError(TestError):
""" Exception raised when command output does not contain wanted string """
def __init__(self, exit_code):
self.output = output
def __str__(self):
return repr(self.output)
class OutputFoundError(TestError):
""" Exception raised when command output contains unwanted string """
def __init__(self, exit_code):
self.output = output
def __str__(self):
return repr(self.output)
class Test(object):
""" Executor for a single test """
def __init__(self, name, description, verbose=0, with_cpg=0):
self.name = name
self.description = description
self.cmds = []
self.verbose = verbose
self.result_txt = ""
self.cmd_tool_output = ""
self.result_exitcode = CrmExit.OK
if with_cpg:
self.stonith_options = "-c"
self.enable_corosync = 1
else:
self.stonith_options = "-s"
self.enable_corosync = 0
self.stonith_process = None
self.stonith_output = ""
self.stonith_patterns = []
self.negative_stonith_patterns = []
self.executed = 0
def __new_cmd(self, cmd, args, exitcode, stdout_match="", no_wait=0, stdout_negative_match="", kill=None):
""" Add a command to be executed as part of this test """
self.cmds.append(
{
"cmd" : cmd,
"kill" : kill,
"args" : args,
"expected_exitcode" : exitcode,
"stdout_match" : stdout_match,
"stdout_negative_match" : stdout_negative_match,
"no_wait" : no_wait,
}
)
def start_environment(self):
""" Prepare the host for executing a test """
# Make sure we are in full control
killall("pacemakerd")
killall("pacemaker-fenced")
if self.verbose:
self.stonith_options = self.stonith_options + " -V"
print("Starting pacemaker-fenced with %s" % self.stonith_options)
if os.path.exists("/tmp/stonith-regression.log"):
os.remove('/tmp/stonith-regression.log')
cmd = "pacemaker-fenced %s -l /tmp/stonith-regression.log" % self.stonith_options
self.stonith_process = subprocess.Popen(shlex.split(cmd))
time.sleep(1)
def clean_environment(self):
""" Clean up the host after executing a test """
if self.stonith_process:
self.stonith_process.terminate()
self.stonith_process.wait()
self.stonith_output = ""
self.stonith_process = None
logfile = io.open('/tmp/stonith-regression.log', 'rt')
for line in logfile.readlines():
self.stonith_output = self.stonith_output + line
if self.verbose:
print("Daemon Output Start")
print(self.stonith_output)
print("Daemon Output End")
os.remove('/tmp/stonith-regression.log')
def add_stonith_log_pattern(self, pattern):
""" Add a log pattern to expect from this test """
self.stonith_patterns.append(pattern)
def add_stonith_neg_log_pattern(self, pattern):
""" Add a log pattern that should not occur with this test """
self.negative_stonith_patterns.append(pattern)
def add_cmd(self, cmd, args):
""" Add a simple command to be executed as part of this test """
self.__new_cmd(cmd, args, CrmExit.OK, "")
def add_cmd_no_wait(self, cmd, args):
""" Add a simple command to be executed (without waiting) as part of this test """
self.__new_cmd(cmd, args, CrmExit.OK, "", 1)
def add_cmd_check_stdout(self, cmd, args, match, no_match=""):
""" Add a simple command with expected output to be executed as part of this test """
self.__new_cmd(cmd, args, CrmExit.OK, match, 0, no_match)
def add_expected_fail_cmd(self, cmd, args, exitcode=CrmExit.ERROR):
""" Add a command to be executed as part of this test and expected to fail """
self.__new_cmd(cmd, args, exitcode, "")
def get_exitcode(self):
""" Return the exit status of the last test execution """
return self.result_exitcode
def print_result(self, filler):
""" Print the result of the last test execution """
print("%s%s" % (filler, self.result_txt))
def run_cmd(self, args):
""" Execute a command as part of this test """
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print("\n\nRunning: "+" ".join(cmd))
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print("Also running: "+args['kill'])
subprocess.Popen(shlex.split(args['kill']))
if args['no_wait'] == 0:
test.wait()
else:
return CrmExit.OK
output = pipe_output(test, stderr=True)
if self.verbose:
print(output)
if test.returncode != args['expected_exitcode']:
raise ExitCodeError(test.returncode)
if args['stdout_match'] != "" and output.count(args['stdout_match']) == 0:
raise OutputNotFoundError(output)
if args['stdout_negative_match'] != "" and output.count(args['stdout_negative_match']) != 0:
raise OutputFoundError(output)
def count_negative_matches(self, outline):
""" Return 1 if a line matches patterns that shouldn't have occurred """
count = 0
for line in self.negative_stonith_patterns:
if outline.count(line):
count = 1
if self.verbose:
print("This pattern should not have matched = '%s" % (line))
return count
def match_stonith_patterns(self):
""" Check test output for expected patterns """
negative_matches = 0
cur = 0
pats = self.stonith_patterns
total_patterns = len(self.stonith_patterns)
if len(self.stonith_patterns) == 0 and len(self.negative_stonith_patterns) == 0:
return
for line in self.stonith_output.split("\n"):
negative_matches = negative_matches + self.count_negative_matches(line)
if len(pats) == 0:
continue
cur = -1
for pat in pats:
cur = cur + 1
if line.count(pats[cur]):
del pats[cur]
break
if len(pats) > 0 or negative_matches:
if self.verbose:
for pat in pats:
print("Pattern Not Matched = '%s'" % pat)
msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches."
self.result_txt = msg % (self.name, len(pats), total_patterns, negative_matches)
self.result_exitcode = CrmExit.ERROR
def set_error(self, step, cmd):
""" Record failure of this test """
msg = "FAILURE - '%s' failed at step %d. Command: %s %s"
self.result_txt = msg % (self.name, step, cmd['cmd'], cmd['args'])
self.result_exitcode = CrmExit.ERROR
def run(self):
""" Execute this test. """
res = 0
i = 1
self.start_environment()
if self.verbose:
print("\n--- START TEST - %s" % self.name)
self.result_txt = "SUCCESS - '%s'" % (self.name)
self.result_exitcode = CrmExit.OK
for cmd in self.cmds:
try:
self.run_cmd(cmd)
except ExitCodeError as e:
print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode']))
self.set_error(i, cmd);
break
except OutputNotFoundError as e:
print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e))
self.set_error(i, cmd);
break
except OutputFoundError as e:
print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e))
self.set_error(i, cmd);
break
if self.verbose:
print("Step %d SUCCESS" % (i))
i = i + 1
self.clean_environment()
if self.result_exitcode == CrmExit.OK:
self.match_stonith_patterns()
print(self.result_txt)
if self.verbose:
print("--- END TEST - %s\n" % self.name)
self.executed = 1
return res
class Tests(object):
""" Collection of all fencing regression tests """
def __init__(self, verbose=0):
self.tests = []
self.verbose = verbose
- self.autogen_corosync_cfg = 0
- if not os.path.exists("/etc/corosync/corosync.conf"):
- self.autogen_corosync_cfg = 1
+ self.autogen_corosync_cfg = not os.path.exists("/etc/corosync/corosync.conf")
def new_test(self, name, description, with_cpg=0):
""" Create a named test """
test = Test(name, description, self.verbose, with_cpg)
self.tests.append(test)
return test
def print_list(self):
""" List all registered tests """
print("\n==== %d TESTS FOUND ====" % (len(self.tests)))
print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION"))
print("%35s - %s" % ("--------------------", "--------------------"))
for test in self.tests:
print("%35s - %s" % (test.name, test.description))
print("==== END OF LIST ====\n")
def start_corosync(self):
""" Start the corosync process """
if self.verbose:
print("Starting corosync")
test = subprocess.Popen("corosync", stdout=subprocess.PIPE)
test.wait()
time.sleep(10)
def run_single(self, name):
""" Run a single named test """
for test in self.tests:
if test.name == name:
test.run()
break
def run_tests_matching(self, pattern):
""" Run all tests whose name matches a pattern """
for test in self.tests:
if test.name.count(pattern) != 0:
test.run()
def run_cpg_only(self):
""" Run all corosync-enabled tests """
for test in self.tests:
if test.enable_corosync:
test.run()
def run_no_cpg(self):
""" Run all standalone tests """
for test in self.tests:
if not test.enable_corosync:
test.run()
def run_tests(self):
""" Run all tests """
for test in self.tests:
test.run()
def exit(self):
""" Exit (with error status code if any test failed) """
for test in self.tests:
if test.executed == 0:
continue
if test.get_exitcode() != CrmExit.OK:
sys.exit(CrmExit.ERROR)
sys.exit(CrmExit.OK)
def print_results(self):
""" Print summary of results of executed tests """
failures = 0
success = 0
print("\n\n======= FINAL RESULTS ==========")
print("\n--- FAILURE RESULTS:")
for test in self.tests:
if test.executed == 0:
continue
if test.get_exitcode() != CrmExit.OK:
failures = failures + 1
test.print_result(" ")
else:
success = success + 1
if failures == 0:
print(" None")
print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures))
def build_api_sanity_tests(self):
""" Register tests to verify basic API usage """
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("standalone_low_level_api_test", "Sanity test client api in standalone mode.")
test.add_cmd("cts-fence-helper", "-t %s" % (verbose_arg))
test = self.new_test("cpg_low_level_api_test", "Sanity test client api using mainloop and cpg.", 1)
test.add_cmd("cts-fence-helper", "-m %s" % (verbose_arg))
def build_custom_timeout_tests(self):
""" Register tests to verify custom timeout usage """
# custom timeout without topology
test = self.new_test("cpg_custom_timeout_1",
"Verify per device timeouts work as expected without using topology.", 1)
test.add_cmd('stonith_admin',
'-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=1"')
test.add_cmd('stonith_admin',
'-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=4"')
test.add_cmd("stonith_admin", "-F node3 -t 2")
# timeout is 2+1+4 = 7
test.add_stonith_log_pattern("Total timeout set to 7")
# custom timeout _WITH_ topology
test = self.new_test("cpg_custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology.", 1)
test.add_cmd('stonith_admin',
'-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node1 node2 node3"')
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=1"')
test.add_cmd('stonith_admin',
'-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=4000"')
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", "-F node3 -t 2")
# timeout is 2+1+4000 = 4003
test.add_stonith_log_pattern("Total timeout set to 4003")
def build_fence_merge_tests(self):
""" Register tests to verify when fence operations should be merged """
### Simple test that overlapping fencing operations get merged
test = self.new_test("cpg_custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used.", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### one merger will happen
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
### Test that multiple mergers occur
test = self.new_test("cpg_custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"delay=2\" -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### 4 mergers should occur
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
### Test that multiple mergers occur with topologies used
test = self.new_test("cpg_custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels.",
1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### 4 mergers should occur
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
def build_fence_no_merge_tests(self):
""" Register tests to verify when fence operations should not be merged """
test = self.new_test("cpg_custom_no_merge",
"Verify differing fencing operations are not merged", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3 node2\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3 node2\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3 node2\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", "-F node2 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
test.add_stonith_neg_log_pattern("Merging stonith action off for node node3 originating from client")
def build_standalone_tests(self):
""" Register a grab bag of tests that can be executed in standalone or corosync mode """
test_types = [
{
"prefix" : "standalone",
"use_cpg" : 0,
},
{
"prefix" : "cpg",
"use_cpg" : 1,
},
]
# test what happens when all devices timeout
for test_type in test_types:
test = self.new_test("%s_fence_multi_device_failure" % test_type["prefix"],
"Verify that all devices timeout, a fencing failure is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false3 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
if test_type["use_cpg"] == 1:
test.add_expected_fail_cmd("stonith_admin", "-F node3 -t 2", CrmExit.TIMEOUT)
test.add_stonith_log_pattern("Total timeout set to 6")
else:
test.add_expected_fail_cmd("stonith_admin", "-F node3 -t 2", CrmExit.ERROR)
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: ")
test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: ")
test.add_stonith_log_pattern("for host 'node3' with device 'false3' returned: ")
# test what happens when multiple devices can fence a node, but the first device fails.
for test_type in test_types:
test = self.new_test("%s_fence_device_failure_rollover" % test_type["prefix"],
"Verify that when one fence device fails for a node, the others are tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-F node3 -t 2")
if test_type["use_cpg"] == 1:
test.add_stonith_log_pattern("Total timeout set to 6")
# simple topology test for one device
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_simple" % test_type["prefix"],
"Verify all fencing devices at a level are used.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("Total timeout set to 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# add topology, delete topology, verify fencing still works
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_add_remove" % test_type["prefix"],
"Verify fencing occurrs after all topology levels are removed",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true")
test.add_cmd("stonith_admin", "-d node3 -i 1")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("Total timeout set to 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test what happens when the first fencing level has multiple devices.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_device_fails" % test_type["prefix"],
"Verify if one device in a level fails, the other is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R false -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true")
test.add_cmd("stonith_admin", "-F node3 -t 20")
test.add_stonith_log_pattern("Total timeout set to 40")
test.add_stonith_log_pattern("for host 'node3' with device 'false' returned: -201")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test what happens when the first fencing level fails.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_multi_level_fails" % test_type["prefix"],
"Verify if one level fails, the next leve is tried.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", "-F node3 -t 3")
test.add_stonith_log_pattern("Total timeout set to 18")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -201")
test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: -201")
test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0")
test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0")
# test what happens when the first fencing level had devices that no one has registered
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_missing_devices" % test_type["prefix"],
"Verify topology can continue with missing devices.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", "-F node3 -t 2")
# Test what happens if multiple fencing levels are defined, and then the first one is removed.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_level_removal" % test_type["prefix"],
"Verify level removal works.", test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit.
test.add_cmd("stonith_admin", "-d node3 -i 2")
test.add_cmd("stonith_admin", "-F node3 -t 20")
test.add_stonith_log_pattern("Total timeout set to 8")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -201")
test.add_stonith_neg_log_pattern("for host 'node3' with device 'false2' returned: ")
test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0")
test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0")
# Test targeting a topology level by node name pattern.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_level_pattern" % test_type["prefix"],
"Verify targeting topology by node name pattern works.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"""-R true -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node1 node2 node3" """)
test.add_cmd("stonith_admin", """-r '@node.*' -i 1 -v true""")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test allowing commas and semicolons as delimiters in pcmk_host_list
for test_type in test_types:
test = self.new_test("%s_host_list_delimiters" % test_type["prefix"],
"Verify commas and semicolons can be used as pcmk_host_list delimiters",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"""-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node1,node2,node3" """)
test.add_cmd("stonith_admin",
"""-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=pcmk1;pcmk2;pcmk3" """)
test.add_cmd("stonith_admin", "stonith_admin -F node2 -t 2")
test.add_cmd("stonith_admin", "stonith_admin -F pcmk3 -t 2")
test.add_stonith_log_pattern("for host 'node2' with device 'true1' returned: 0")
test.add_stonith_log_pattern("for host 'pcmk3' with device 'true2' returned: 0")
# test the stonith builds the correct list of devices that can fence a node.
for test_type in test_types:
test = self.new_test("%s_list_devices" % test_type["prefix"],
"Verify list of devices that can fence a node is correct",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin",
"-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd_check_stdout("stonith_admin", "-l node1 -V", "true2", "true1")
test.add_cmd_check_stdout("stonith_admin", "-l node1 -V", "true3", "true1")
# simple test of device monitor
for test_type in test_types:
test = self.new_test("%s_monitor" % test_type["prefix"],
"Verify device is reachable", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-Q true1")
test.add_cmd("stonith_admin", "-Q false1")
test.add_expected_fail_cmd("stonith_admin", "-Q true2", CrmExit.ERROR)
# Verify monitor occurs for duration of timeout period on failure
for test_type in test_types:
test = self.new_test("%s_monitor_timeout" % test_type["prefix"],
"Verify monitor uses duration of timeout period given.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
'-R true1 -a fence_dummy -o "mode=fail" -o "monitor_mode=fail" -o "pcmk_host_list=node3"')
test.add_expected_fail_cmd("stonith_admin", "-Q true1 -t 5", CrmExit.ERROR)
test.add_stonith_log_pattern("Attempt 2 to execute")
# Verify monitor occurs for duration of timeout period on failure, but stops at max retries
for test_type in test_types:
test = self.new_test("%s_monitor_timeout_max_retries" % test_type["prefix"],
"Verify monitor retries until max retry value or timeout is hit.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
'-R true1 -a fence_dummy -o "mode=fail" -o "monitor_mode=fail" -o "pcmk_host_list=node3"')
test.add_expected_fail_cmd("stonith_admin", "-Q true1 -t 15", CrmExit.ERROR)
test.add_stonith_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times")
# simple register test
for test_type in test_types:
test = self.new_test("%s_register" % test_type["prefix"],
"Verify devices can be registered and un-registered",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-Q true1")
test.add_cmd("stonith_admin", "-D true1")
test.add_expected_fail_cmd("stonith_admin", "-Q true1", CrmExit.ERROR)
# simple reboot test
for test_type in test_types:
test = self.new_test("%s_reboot" % test_type["prefix"],
"Verify devices can be rebooted",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-B node3 -t 2")
test.add_cmd("stonith_admin", "-D true1")
test.add_expected_fail_cmd("stonith_admin", "-Q true1", CrmExit.ERROR)
# test fencing history.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_fence_history" % test_type["prefix"],
"Verify last fencing operation is returned.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-F node3 -t 2 -V")
test.add_cmd_check_stdout("stonith_admin", "-H node3", "was able to turn off node node3", "")
# simple test of dynamic list query
for test_type in test_types:
test = self.new_test("%s_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd_check_stdout("stonith_admin", "-l fake_port_1", "3 devices found")
# fence using dynamic list query
for test_type in test_types:
test = self.new_test("%s_fence_dynamic_list_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1")
test.add_cmd("stonith_admin", "-F fake_port_1 -t 5 -V")
# simple test of query using status action
for test_type in test_types:
test = self.new_test("%s_status_query" % test_type["prefix"],
"Verify dynamic list of fencing devices can be retrieved.",
test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"")
test.add_cmd("stonith_admin", "-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"")
test.add_cmd("stonith_admin", "-R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"")
test.add_cmd_check_stdout("stonith_admin", "-l fake_port_1", "3 devices found")
# test what happens when no reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_no_reboot_support" % test_type["prefix"],
"Verify reboot action defaults to off when no reboot action is advertised by agent.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy_no_reboot -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-B node1 -t 5 -V")
test.add_stonith_log_pattern("does not advertise support for 'reboot', performing 'off'")
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
# make sure reboot is used when reboot action is advertised
for test_type in test_types:
test = self.new_test("%s_with_reboot_support" % test_type["prefix"],
"Verify reboot action can be used when metadata advertises it.",
test_type["use_cpg"])
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-B node1 -t 5 -V")
test.add_stonith_neg_log_pattern("does not advertise support for 'reboot', performing 'off'")
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
def build_nodeid_tests(self):
""" Register tests that use a corosync node id """
our_uname = localname()
### verify nodeid is supplied when nodeid is in the metadata parameters
test = self.new_test("cpg_supply_nodeid",
"Verify nodeid is given when fence agent has nodeid as parameter", 1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname))
test.add_cmd("stonith_admin", "-F %s -t 3" % (our_uname))
test.add_stonith_log_pattern("For stonith action (off) for victim %s, adding nodeid" % (our_uname))
### verify nodeid is _NOT_ supplied when nodeid is not in the metadata parameters
test = self.new_test("cpg_do_not_supply_nodeid",
"Verify nodeid is _NOT_ given when fence agent does not have nodeid as parameter",
1)
# use a host name that won't be in corosync.conf
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=regr-test\"")
test.add_cmd("stonith_admin", "-F regr-test -t 3")
test.add_stonith_neg_log_pattern("For stonith action (off) for victim regr-test, adding nodeid")
### verify nodeid use doesn't explode standalone mode
test = self.new_test("standalone_do_not_supply_nodeid",
"Verify nodeid in metadata parameter list doesn't kill standalone mode",
0)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname))
test.add_cmd("stonith_admin", "-F %s -t 3" % (our_uname))
test.add_stonith_neg_log_pattern("For stonith action (off) for victim %s, adding nodeid" % (our_uname))
def build_unfence_tests(self):
""" Register tests that verify unfencing """
our_uname = localname()
### verify unfencing using automatic unfencing
test = self.new_test("cpg_unfence_required_1",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
# both devices should be executed
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)")
### verify unfencing using automatic unfencing fails if any of the required agents fail
test = self.new_test("cpg_unfence_required_2",
"Verify require unfencing on all devices when automatic=true in agent's metadata",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=fail" -o "pcmk_host_list=%s"' % (our_uname))
test.add_expected_fail_cmd("stonith_admin", "-U %s -t 6" % (our_uname), CrmExit.ERROR)
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_3",
"Verify require unfencing on all devices even when at different topology levels",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v true1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true2" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)")
### verify unfencing using automatic devices with topology
test = self.new_test("cpg_unfence_required_4",
"Verify all required devices are executed even with topology levels fail.",
1)
test.add_cmd('stonith_admin',
'-R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true3 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R true4 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false3 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd('stonith_admin',
'-R false4 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v true1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v false1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v false2" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true2" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v false3" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true3" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 3 -v false4" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 4 -v true4" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true3' returned: 0 (OK)")
test.add_stonith_log_pattern("with device 'true4' returned: 0 (OK)")
def build_unfence_on_target_tests(self):
""" Register tests that verify unfencing that runs on the target """
our_uname = localname()
### verify unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_1",
"Verify unfencing with on_target = true", 1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("(on) to be executed on the target node")
### verify failure of unfencing using on_target device
test = self.new_test("cpg_unfence_on_target_2",
"Verify failure unfencing with on_target = true",
1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake_1234\"" % (our_uname))
test.add_expected_fail_cmd("stonith_admin", "-U node_fake_1234 -t 3", CrmExit.ERROR)
test.add_stonith_log_pattern("(on) to be executed on the target node")
### verify unfencing using on_target device with topology
test = self.new_test("cpg_unfence_on_target_3",
"Verify unfencing with on_target = true using topology",
1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node3\"" % (our_uname))
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node3\"" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 1 -v true1" % (our_uname))
test.add_cmd("stonith_admin", "-r %s -i 2 -v true2" % (our_uname))
test.add_cmd("stonith_admin", "-U %s -t 3" % (our_uname))
test.add_stonith_log_pattern("(on) to be executed on the target node")
### verify unfencing using on_target device with topology fails when victim node doesn't exist
test = self.new_test("cpg_unfence_on_target_4",
"Verify unfencing failure with on_target = true using topology",
1)
test.add_cmd("stonith_admin",
"-R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake\"" % (our_uname))
test.add_cmd("stonith_admin",
"-R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake\"" % (our_uname))
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node_fake -i 2 -v true2")
test.add_expected_fail_cmd("stonith_admin", "-U node_fake -t 3", CrmExit.ERROR)
test.add_stonith_log_pattern("(on) to be executed on the target node")
def build_remap_tests(self):
""" Register tests that verify remapping of reboots to off-on """
test = self.new_test("cpg_remap_simple",
"Verify sequential topology reboot is remapped to all-off-then-all-on", 1)
test.add_cmd("stonith_admin",
"""-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """
"""-o "pcmk_off_timeout=1" -o "pcmk_reboot_timeout=10" """)
test.add_cmd("stonith_admin",
"""-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """
"""-o "pcmk_off_timeout=2" -o "pcmk_reboot_timeout=20" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
# timeout should be sum of off timeouts (1+2=3), not reboot timeouts (10+20=30)
test.add_stonith_log_pattern("Total timeout set to 3 for peer's fencing of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true2'")
test.add_stonith_log_pattern("Remapped off of node_fake complete, remapping to on")
# fence_dummy sets "on" as an on_target action
test.add_stonith_log_pattern("Ignoring true1 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Ignoring true2 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test = self.new_test("cpg_remap_automatic",
"Verify remapped topology reboot skips automatic 'on'", 1)
test.add_cmd("stonith_admin",
"""-R true1 -a fence_dummy_auto_unfence """
"""-o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin",
"""-R true2 -a fence_dummy_auto_unfence """
"""-o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v true1 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true2'")
test.add_stonith_log_pattern("Remapped off of node_fake complete, remapping to on")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test.add_stonith_neg_log_pattern("perform op 'node_fake on' with")
test.add_stonith_neg_log_pattern("'on' failure")
test = self.new_test("cpg_remap_complex_1",
"Verify remapped topology reboot in second level works if non-remapped first level fails",
1)
test.add_cmd("stonith_admin", """-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node_fake -i 2 -v true1 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("perform op 'node_fake reboot' with 'false1'")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true2'")
test.add_stonith_log_pattern("Remapped off of node_fake complete, remapping to on")
test.add_stonith_log_pattern("Ignoring true1 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Ignoring true2 'on' failure (no capable peers) for node_fake")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test = self.new_test("cpg_remap_complex_2",
"Verify remapped topology reboot failure in second level proceeds to third level",
1)
test.add_cmd("stonith_admin", """-R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", """-R true3 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """)
test.add_cmd("stonith_admin", "-r node_fake -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node_fake -i 2 -v true1 -v false2 -v true3")
test.add_cmd("stonith_admin", "-r node_fake -i 3 -v true2")
test.add_cmd("stonith_admin", "-B node_fake -t 5")
test.add_stonith_log_pattern("perform op 'node_fake reboot' with 'false1'")
test.add_stonith_log_pattern("Remapping multiple-device reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'true1'")
test.add_stonith_log_pattern("perform op 'node_fake off' with 'false2'")
test.add_stonith_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times")
test.add_stonith_log_pattern("Undoing remap of reboot of node_fake")
test.add_stonith_log_pattern("perform op 'node_fake reboot' with 'true2'")
test.add_stonith_neg_log_pattern("node_fake with true3")
def setup_environment(self, use_corosync):
""" Prepare the host before executing any tests """
- if self.autogen_corosync_cfg and use_corosync:
- corosync_conf = ("""
-totem {
- version: 2
- crypto_cipher: none
- crypto_hash: none
-
- nodeid: 101
- secauth: off
-
- interface {
- ttl: 1
- ringnumber: 0
- mcastport: 6666
- mcastaddr: 226.94.1.1
- bindnetaddr: 127.0.0.1
- }
-}
-
-logging {
- debug: off
- fileline: off
- to_syslog: no
- to_stderr: no
- syslog_facility: daemon
- timestamp: on
- to_logfile: yes
- logfile: @CRM_LOG_DIR@/corosync.test.log
- logfile_priority: info
-}
-""")
-
- os.system("cat <<-END >>/etc/corosync/corosync.conf\n%s\nEND" % (corosync_conf))
-
if use_corosync:
+ if self.autogen_corosync_cfg:
+ corosync_cfg = io.open("/etc/corosync/corosync.conf", "w")
+ corosync_cfg.write(AUTOGEN_COROSYNC_TEMPLATE % (localname()))
+ corosync_cfg.close()
+
### make sure we are in control ###
killall("corosync")
self.start_corosync()
subprocess.call(["cts-support", "install"])
def cleanup_environment(self, use_corosync):
""" Clean up the host after executing desired tests """
if use_corosync:
killall("corosync")
if self.verbose and os.path.exists('@CRM_LOG_DIR@/corosync.test.log'):
print("Corosync output")
logfile = io.open('@CRM_LOG_DIR@/corosync.test.log', 'rt')
for line in logfile.readlines():
print(line.strip())
os.remove('@CRM_LOG_DIR@/corosync.test.log')
- if self.autogen_corosync_cfg:
- os.system("rm -f /etc/corosync/corosync.conf")
+ if self.autogen_corosync_cfg:
+ os.remove("/etc/corosync/corosync.conf")
subprocess.call(["cts-support", "uninstall"])
class TestOptions(object):
""" Option handler """
def __init__(self):
self.options = {}
self.options['list-tests'] = 0
self.options['run-all'] = 1
self.options['run-only'] = ""
self.options['run-only-pattern'] = ""
self.options['verbose'] = 0
self.options['invalid-arg'] = ""
self.options['cpg-only'] = 0
self.options['no-cpg'] = 0
self.options['show-usage'] = 0
def build_options(self, argv):
""" Set options based on command-line arguments """
args = argv[1:]
skip = 0
for i in range(0, len(args)):
if skip:
skip = 0
continue
elif args[i] == "-h" or args[i] == "--help":
self.options['show-usage'] = 1
elif args[i] == "-l" or args[i] == "--list-tests":
self.options['list-tests'] = 1
elif args[i] == "-V" or args[i] == "--verbose":
self.options['verbose'] = 1
elif args[i] == "-n" or args[i] == "--no-cpg":
self.options['no-cpg'] = 1
elif args[i] == "-c" or args[i] == "--cpg-only":
self.options['cpg-only'] = 1
elif args[i] == "-r" or args[i] == "--run-only":
self.options['run-only'] = args[i+1]
skip = 1
elif args[i] == "-p" or args[i] == "--run-only-pattern":
self.options['run-only-pattern'] = args[i+1]
skip = 1
def show_usage(self):
""" Show command usage """
print("usage: " + sys.argv[0] + " [options]")
print("If no options are provided, all tests will run")
print("Options:")
print("\t [--help | -h] Show usage")
print("\t [--list-tests | -l] Print out all registered tests.")
print("\t [--cpg-only | -c] Only run tests that require corosync.")
print("\t [--no-cpg | -n] Only run tests that do not require corosync")
print("\t [--run-only | -r 'testname'] Run a specific test")
print("\t [--verbose | -V] Verbose output")
print("\t [--run-only-pattern | -p 'string'] Run only tests containing the string value")
print("\n\tExample: Run only the test 'start_stop'")
print("\t\t " + sys.argv[0] + " --run-only start_stop")
print("\n\tExample: Run only the tests with the string 'systemd' present in them")
print("\t\t " + sys.argv[0] + " --run-only-pattern systemd")
def main(argv):
""" Run fencing regression tests as specified by arguments """
update_path()
opts = TestOptions()
opts.build_options(argv)
use_corosync = 1
tests = Tests(opts.options['verbose'])
tests.build_standalone_tests()
tests.build_custom_timeout_tests()
tests.build_api_sanity_tests()
tests.build_fence_merge_tests()
tests.build_fence_no_merge_tests()
tests.build_unfence_tests()
tests.build_unfence_on_target_tests()
tests.build_nodeid_tests()
tests.build_remap_tests()
if opts.options['list-tests']:
tests.print_list()
sys.exit(CrmExit.OK)
elif opts.options['show-usage']:
opts.show_usage()
sys.exit(CrmExit.OK)
print("Starting ...")
if opts.options['no-cpg']:
use_corosync = 0
tests.setup_environment(use_corosync)
if opts.options['run-only-pattern'] != "":
tests.run_tests_matching(opts.options['run-only-pattern'])
tests.print_results()
elif opts.options['run-only'] != "":
tests.run_single(opts.options['run-only'])
tests.print_results()
elif opts.options['no-cpg']:
tests.run_no_cpg()
tests.print_results()
elif opts.options['cpg-only']:
tests.run_cpg_only()
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(use_corosync)
tests.exit()
if __name__ == "__main__":
main(sys.argv)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jan 25, 11:26 AM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322358
Default Alt Text
(68 KB)

Event Timeline