diff --git a/cts/support/fence_dummy.in b/cts/support/fence_dummy.in index 38f4b66d44..6404000dd5 100644 --- a/cts/support/fence_dummy.in +++ b/cts/support/fence_dummy.in @@ -1,516 +1,517 @@ #!@PYTHON@ """Dummy fence agent for testing.""" -__copyright__ = "Copyright 2012-2023 the Pacemaker project contributors" +__copyright__ = "Copyright 2012-2024 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import io import os import re import sys import time import random import atexit import getopt import contextlib AGENT_VERSION = "4.1.0" OCF_VERSION = "1.0" SHORT_DESC = "Dummy fence agent" LONG_DESC = """fence_dummy is a fake fencing agent which reports success based on its mode (pass|fail|random) without doing anything.""" # Short options used: difhmnoqsvBDHMRUV ALL_OPT = { "quiet": { "getopt": "q", "help": "", "order": 50 }, "verbose": { "getopt": "v", "longopt": "verbose", "help": "-v, --verbose Verbose mode", "required": "0", "shortdesc": "Verbose mode", "order": 51 }, "debug": { "getopt": "D:", "longopt": "debug-file", "help": "-D, --debug-file=[debugfile] Debugging to output file", "required": "0", "shortdesc": "Write debug information to given file", "order": 52 }, "version": { "getopt": "V", "longopt": "version", "help": "-V, --version Display version information and exit", "required": "0", "shortdesc": "Display version information and exit", "order": 53 }, "help": { "getopt": "h", "longopt": "help", "help": "-h, --help Display this help and exit", "required": "0", "shortdesc": "Display help and exit", "order": 54 }, "action": { "getopt": "o:", "longopt": "action", "help": "-o, --action=[action] Action: validate-all, status, list, reboot (default), off or on", "required": "1", "shortdesc": "Fencing Action", "default": "reboot", "order": 1 }, "nodename": { "getopt": "N:", "longopt": "nodename", "help": "-N, --nodename Node name of fence target (ignored)", "required": "0", "shortdesc": "The node name of fence target (ignored)", "order": 2 }, "mode": { "getopt": "M:", "longopt": "mode", "required": "0", "help": "-M, --mode=(pass|fail|random) Exit status to return for non-monitor operations", "shortdesc": "Whether fence operations should always pass, always fail, or fail at random", "order": 3 }, "monitor_mode": { "getopt": "m:", "longopt": "monitor_mode", "help": "-m, --monitor_mode=(pass|fail|random) Exit status to return for monitor operations", "required": "0", "shortdesc": "Whether monitor operations should always pass, always fail, or fail at random", "order": 3 }, "random_sleep_range": { "getopt": "R:", "required": "0", "longopt": "random_sleep_range", "help": "-R, --random_sleep_range=[seconds] Sleep between 1 and [seconds] before returning", "shortdesc": "Wait randomly between 1 and [seconds]", "order": 3 }, "mock_dynamic_hosts": { "getopt": "H:", "longopt": "mock_dynamic_hosts", "help": "-H, --mock_dynamic_hosts=[list] What to return when dynamically queried for possible targets", "required": "0", "shortdesc": "A list of hosts we can fence", "order": 3 }, "delay": { "getopt": "f:", "longopt": "delay", "help": "-f, --delay [seconds] Wait X seconds before fencing is started", "required": "0", "shortdesc": "Wait X seconds before fencing is started", "default": "0", "order": 3 }, "monitor_delay": { "getopt": "d:", "longopt": "monitor_delay", "help": "-d, --monitor_delay [seconds] Wait X seconds before monitor completes", "required": "0", "shortdesc": "Wait X seconds before monitor completes", "default": "0", "order": 3 }, "off_delay": { "getopt": "F:", "longopt": "off_delay", "help": "-F, --off_delay [seconds] Wait additional X seconds before off action", "required": "0", "shortdesc": "Wait additional X seconds before off action", "default": "0", "order": 3 }, "plug": { "getopt": "n:", "longopt": "plug", "help": "-n, --plug=[id] Physical plug number on device (ignored)", "required": "1", "shortdesc": "Ignored", "order": 4 }, "port": { "getopt": "n:", "longopt": "plug", "help": "-n, --plug=[id] Physical plug number on device (ignored)", "required": "1", "shortdesc": "Ignored", "order": 4 }, "switch": { "getopt": "s:", "longopt": "switch", "help": "-s, --switch=[id] Physical switch number on device (ignored)", "required": "0", "shortdesc": "Ignored", "order": 4 }, "nodeid": { "getopt": "i:", "longopt": "nodeid", "help": "-i, --nodeid Corosync id of fence target (ignored)", "required": "0", "shortdesc": "Ignored", "order": 4 }, "uuid": { "getopt": "U:", "longopt": "uuid", "help": "-U, --uuid UUID of the VM to fence (ignored)", "required": "0", "shortdesc": "Ignored", "order": 4 } } def agent(): """Return name this file was run as.""" return os.path.basename(sys.argv[0]) def fail_usage(message): """Print a usage message and exit.""" sys.exit("%s\nPlease use '-h' for usage" % message) def show_docs(options, auto_unfence, no_reboot, no_on): """Handle informational options (display info and exit).""" device_opt = options["device_opt"] if "-h" in options: usage(device_opt) sys.exit(0) if "-o" in options and options["-o"].lower() == "metadata": f = "%s.fail" % __file__ if not os.path.exists(f): metadata(device_opt, options, auto_unfence, no_reboot, no_on) else: os.remove(f) sys.exit(0) if "-V" in options: print(AGENT_VERSION) sys.exit(0) def sorted_options(avail_opt): """Return a list of all options, in their internally specified order.""" sorted_list = [(key, ALL_OPT[key]) for key in avail_opt] sorted_list.sort(key=lambda x: x[1]["order"]) return sorted_list def usage(avail_opt): """Print a usage message.""" print("Usage:") print("\t%s [options]" % agent()) print("Options:") for (_, value) in sorted_options(avail_opt): if len(value["help"]) != 0: print(" %s" % value["help"]) def metadata(avail_opt, options, auto_unfence, no_reboot, no_on): """Print agent metadata.""" # This log is just for testing handling of stderr output print("asked for fence_dummy metadata", file=sys.stderr) print(""" %s %s """ % (agent(), SHORT_DESC, AGENT_VERSION, OCF_VERSION, LONG_DESC)) for (option, _) in sorted_options(avail_opt): if "shortdesc" not in ALL_OPT[option]: continue print(' ' % (option, ALL_OPT[option]["required"])) default = "" default_name_arg = "-%s" % ALL_OPT[option]["getopt"][:-1] default_name_no_arg = "-%s" % ALL_OPT[option]["getopt"] if "default" in ALL_OPT[option]: default = 'default="%s"' % ALL_OPT[option]["default"] elif options.get(default_name_arg) is not None: try: default = 'default="%s"' % options[default_name_arg] except TypeError: # @todo/@note: Currently there is no clean way how to handle lists # we can create a string from it but we can't set it on command line default = 'default="%s"' % str(options[default_name_arg]) elif default_name_no_arg in options: default = 'default="true"' mixed = ALL_OPT[option]["help"] # split it between option and help text res = re.compile(r"^(.*--\S+)\s+", re.IGNORECASE | re.S).search(mixed) if res is not None: mixed = res.group(1) mixed = mixed.replace("<", "<").replace(">", ">") print(' ' % mixed) if ALL_OPT[option]["getopt"].count(":") > 0: print(' ' % default) else: print(' ' % default) print(' %s' % ALL_OPT[option]["shortdesc"]) print(' ') print(' \n ') if not no_on: if auto_unfence: attr_name = 'automatic' else: attr_name = 'on_target' print(' ' % attr_name) print(' ') if not no_reboot: print(' ') print(' ') print(' ') print(' ') print(' ') print(' ') print('') def option_longopt(option): """Return the getopt-compatible long-option name of the given option.""" if ALL_OPT[option]["getopt"].endswith(":"): return ALL_OPT[option]["longopt"] + "=" return ALL_OPT[option]["longopt"] def opts_from_command_line(argv, avail_opt): """Read options from command-line arguments.""" # Prepare list of options for getopt getopt_string = "" longopt_list = [] for k in avail_opt: if k in ALL_OPT: getopt_string += ALL_OPT[k]["getopt"] else: fail_usage("Parse error: unknown option '%s'" % k) if k in ALL_OPT and "longopt" in ALL_OPT[k]: longopt_list.append(option_longopt(k)) try: (opt, _) = getopt.gnu_getopt(argv, getopt_string, longopt_list) except getopt.GetoptError as error: fail_usage("Parse error: %s" % error.msg) # Transform longopt to short one which are used in fencing agents old_opt = opt opt = {} for old_option in dict(old_opt): if old_option.startswith("--"): for rec in ALL_OPT.values(): if rec.get("longopt") is None: continue long = "--%s" % rec["longopt"] if long == old_option: short = "-%s" % rec["getopt"][0] opt[short] = dict(old_opt)[old_option] else: opt[old_option] = dict(old_opt)[old_option] # Compatibility Layer (with what? probably not needed for fence_dummy) new_opt = dict(opt) if "-T" in new_opt: new_opt["-o"] = "status" if "-n" in new_opt: new_opt["-m"] = new_opt["-n"] opt = new_opt return opt def opts_from_stdin(avail_opt): """Read options from standard input.""" opt = {} name = "" for line in sys.stdin.readlines(): line = line.strip() if line.startswith("#") or (len(line) == 0): continue (name, value) = (line + "=").split("=", 1) value = value[:-1] # Compatibility Layer (with what? probably not needed for fence_dummy) if name == "option": name = "action" if name not in avail_opt: print("Parse error: Ignoring unknown option '%s'" % line, file=sys.stderr) continue if ALL_OPT[name]["getopt"].endswith(":"): short = "-%s" % ALL_OPT[name]["getopt"][0] opt[short] = value elif value.lower() in ["1", "yes", "on", "true"]: short = "-%s" % ALL_OPT[name]["getopt"] opt[short] = "1" return opt def process_input(avail_opt): """Set standard environment variables, and parse all options.""" # Set standard environment os.putenv("LANG", "C") os.putenv("LC_ALL", "C") # Read options from command line or standard input if len(sys.argv) > 1: return opts_from_command_line(sys.argv[1:], avail_opt) return opts_from_stdin(avail_opt) def atexit_handler(): """Close stdout on exit.""" try: sys.stdout.close() os.close(1) except IOError: sys.exit("%s failed to close standard output" % agent()) def success_mode(options, option, default_value): """Return exit code specified by option.""" if option in options: test_value = options[option] else: test_value = default_value if test_value == "pass": exitcode = 0 elif test_value == "fail": exitcode = 1 else: exitcode = random.randint(0, 1) return exitcode def write_options(options): """Write out all options to debug file.""" with contextlib.suppress(IOError): with io.open(options["-D"], "at", encoding="utf-8") as debugfile: debugfile.write("### %s ###\n" % time.strftime("%Y-%m-%d %H:%M:%S")) for option in sorted(options): debugfile.write("%s=%s\n" % (option, options[option])) debugfile.write("###\n") def main(): + """Run the dummy fencing agent.""" auto_unfence = False no_reboot = False no_on = False # Meta-data can't take parameters, so we simulate different meta-data # behavior based on the executable name (which can be a symbolic link). if sys.argv[0].endswith("_auto_unfence"): auto_unfence = True elif sys.argv[0].endswith("_no_reboot"): no_reboot = True elif sys.argv[0].endswith("_no_on"): no_on = True device_opt = ALL_OPT.keys() # Defaults for fence agent atexit.register(atexit_handler) options = process_input(device_opt) options["device_opt"] = device_opt show_docs(options, auto_unfence, no_reboot, no_on) action = options.get("-o", "reboot") # dump input to file if "-D" in options and action != "validate-all": write_options(options) if "-f" in options and action != "validate-all": val = int(options["-f"]) print("delay sleep for %d seconds" % val, file=sys.stderr) time.sleep(val) # random sleep for testing if "-R" in options and action != "validate-all": val = int(options["-R"]) ran = random.randint(1, val) print("random sleep for %d seconds" % ran, file=sys.stderr) time.sleep(ran) if action == "monitor": if "-d" in options: time.sleep(int(options["-d"])) exitcode = success_mode(options, "-m", "pass") elif action == "list": print("fence_dummy action (list) called", file=sys.stderr) if "-H" in options: print(options["-H"]) exitcode = 0 else: print("dynamic hostlist requires mock_dynamic_hosts to be set", file=sys.stderr) exitcode = 1 elif action == "validate-all": if "-f" in options: val = int(options["-f"]) if val > 10: exitcode = 1 else: exitcode = 0 else: exitcode = 1 elif action == "off": if "-F" in options: time.sleep(int(options["-F"])) exitcode = success_mode(options, "-M", "random") else: exitcode = success_mode(options, "-M", "random") # Ensure we generate some error output on failure exit. if exitcode == 1: print("simulated %s failure" % action, file=sys.stderr) sys.exit(exitcode) if __name__ == "__main__": main()