Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/python/pacemaker/_cts/CTS.py b/python/pacemaker/_cts/CTS.py
index 3dc36a2253..bc46525b5f 100644
--- a/python/pacemaker/_cts/CTS.py
+++ b/python/pacemaker/_cts/CTS.py
@@ -1,230 +1,230 @@
"""Main classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["CtsLab", "NodeStatus", "Process"]
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import sys
import time
import traceback
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.input import should_continue
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
class CtsLab:
"""
A class that defines the Lab Environment for the Cluster Test System.
It defines those things which are expected to change from test
environment to test environment for the same cluster manager.
This is where you define the set of nodes that are in your test lab,
what kind of reset mechanism you use, etc. All this data is stored
as key/value pairs in an Environment instance constructed from arguments
passed to this class.
The CTS code ignores names it doesn't know about or need. Individual
tests have access to this information, and it is perfectly acceptable
to provide hints, tweaks, fine-tuning directions, or other information
to the tests through this mechanism.
"""
def __init__(self, args=None):
"""
Create a new CtsLab instance.
This class can be treated kind of like a dictionary due to the presence
of typical dict functions like __contains__, __getitem__, and __setitem__.
However, it is not a dictionary so do not rely on standard dictionary
behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
"""
self._env = EnvFactory().getInstance(args)
self._logger = LogFactory()
def dump(self):
"""Print the current environment."""
self._env.dump()
def __contains__(self, key):
"""Return True if the given environment key exists."""
# pylint gets confused because of EnvFactory here.
# pylint: disable=unsupported-membership-test
return key in self._env
def __getitem__(self, key):
"""Return the given environment key, or raise KeyError if it does not exist."""
# Throughout this file, pylint has trouble understanding that EnvFactory
# and RemoteFactory are singleton instances that can be treated as callable
# and subscriptable objects. Various warnings are disabled because of this.
# See also a comment about self._rsh in environment.py.
# pylint: disable=unsubscriptable-object
return self._env[key]
def __setitem__(self, key, value):
"""Set the given environment key to the given value, overriding any previous value."""
# pylint: disable=unsupported-assignment-operation
self._env[key] = value
def run(self, scenario, iterations):
"""
Run the given scenario the given number of times.
Returns ExitStatus.OK on success, or ExitStatus.ERROR on error.
"""
if not scenario:
self._logger.log("No scenario was defined")
return ExitStatus.ERROR
self._logger.log("Cluster nodes: ")
# pylint: disable=unsubscriptable-object
for node in self._env["nodes"]:
self._logger.log(" * %s" % (node))
if not scenario.setup():
return ExitStatus.ERROR
# We want to alert on any exceptions caused by running a scenario, so
# here it's okay to disable the pylint warning.
# pylint: disable=bare-except
try:
scenario.run(iterations)
- except:
+ except: # noqa: E722
self._logger.log("Exception by %s" % sys.exc_info()[0])
self._logger.traceback(traceback)
scenario.summarize()
scenario.teardown()
return ExitStatus.ERROR
scenario.teardown()
scenario.summarize()
if scenario.stats["failure"] > 0:
return ExitStatus.ERROR
if scenario.stats["success"] != iterations:
self._logger.log("No failure count but success != requested iterations")
return ExitStatus.ERROR
return ExitStatus.OK
class NodeStatus:
"""
A class for querying the status of cluster nodes.
Are nodes up? Do they respond to SSH connections?
"""
def __init__(self, env):
"""
Create a new NodeStatus instance.
Arguments:
env -- An Environment instance
"""
self._env = env
def _node_booted(self, node):
"""Return True if the given node is booted (responds to pings)."""
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, verbose=0)
return rc == 0
def _sshd_up(self, node):
"""Return true if sshd responds on the given node."""
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()(node, "true", verbose=0)
return rc == 0
def wait_for_node(self, node, timeout=300):
"""
Wait for a node to become available.
Should the timeout be reached, the user will be given a choice whether
to continue or not. If not, ValueError will be raised.
Returns True when the node is available, or False if the timeout is
reached.
"""
initial_timeout = timeout
anytimeouts = False
while timeout > 0:
if self._node_booted(node) and self._sshd_up(node):
if anytimeouts:
# Fudge to wait for the system to finish coming up
time.sleep(30)
LogFactory().debug("Node %s now up" % node)
return True
time.sleep(30)
if not anytimeouts:
LogFactory().debug("Waiting for node %s to come up" % node)
anytimeouts = True
timeout -= 1
LogFactory().log("%s did not come up within %d tries" % (node, initial_timeout))
if not should_continue(self._env["continue"]):
raise ValueError("%s did not come up within %d tries" % (node, initial_timeout))
return False
def wait_for_all_nodes(self, nodes, timeout=300):
"""Return True when all nodes come up, or False if the timeout is reached."""
for node in nodes:
if not self.wait_for_node(node, timeout):
return False
return True
class Process:
"""A class for managing a Pacemaker daemon."""
# pylint: disable=invalid-name
def __init__(self, cm, name, dc_only=False, pats=None, dc_pats=None,
badnews_ignore=None):
"""
Create a new Process instance.
Arguments:
cm -- A ClusterManager instance
name -- The command being run
dc_only -- Should this daemon be killed only on the DC?
pats -- Regexes we expect to find in log files
dc_pats -- Additional DC-specific regexes we expect to find
in log files
badnews_ignore -- Regexes for lines in the log that can be ignored
"""
self._cm = cm
self.badnews_ignore = badnews_ignore
self.dc_only = dc_only
self.dc_pats = dc_pats
self.name = name
self.pats = pats
if self.badnews_ignore is None:
self.badnews_ignore = []
if self.dc_pats is None:
self.dc_pats = []
if self.pats is None:
self.pats = []
def kill(self, node):
"""Kill the instance of this process running on the given node."""
(rc, _) = self._cm.rsh(node, "killall -9 %s" % self.name)
if rc != 0:
self._cm.log("ERROR: Kill %s failed on node %s" % (self.name, node))
diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
index dba7938f4f..4ce1359417 100644
--- a/python/pacemaker/_cts/environment.py
+++ b/python/pacemaker/_cts/environment.py
@@ -1,638 +1,638 @@
"""Test environment classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["EnvFactory"]
__copyright__ = "Copyright 2014-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
import os
import random
import socket
import sys
import time
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogKind
class Environment:
"""
A class for managing the CTS environment.
This consists largely of processing and storing command line parameters.
"""
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory). It's possible we could fix this with type annotations,
# but those were introduced with python 3.5 and we only support python 3.4.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
def __init__(self, args):
"""
Create a new Environment instance.
This class can be treated kind of like a dictionary due to the presence
of typical dict functions like __contains__, __getitem__, and __setitem__.
However, it is not a dictionary so do not rely on standard dictionary
behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
If None, sys.argv will be used.
"""
self.data = {}
self._nodes = []
# Set some defaults before processing command line arguments. These are
# either not set by any command line parameter, or they need a default
# that can't be set in add_argument.
self["DeadTime"] = 300
self["StartTime"] = 300
self["StableTime"] = 30
self["tests"] = []
self["IPagent"] = "IPaddr2"
self["DoFencing"] = True
self["ClobberCIB"] = False
self["CIBfilename"] = None
self["CIBResource"] = False
self["LogWatcher"] = LogKind.ANY
self["node-limit"] = 0
self["scenario"] = "random"
self.random_gen = random.Random()
self._logger = LogFactory()
self._rsh = RemoteFactory().getInstance()
self._target = "localhost"
self._seed_random()
self._parse_args(args)
if not self["ListTests"]:
self._validate()
self._discover()
def _seed_random(self, seed=None):
"""
Initialize the random number generator.
Arguments:
seed -- Use this to see the random number generator, or use the
current time if None.
"""
if not seed:
seed = int(time.time())
self["RandSeed"] = seed
self.random_gen.seed(str(seed))
def dump(self):
"""Print the current environment."""
keys = []
for key in list(self.data.keys()):
keys.append(key)
keys.sort()
for key in keys:
s = "Environment[%s]" % key
self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key])))
def keys(self):
"""Return a list of all environment keys stored in this instance."""
return list(self.data.keys())
def __contains__(self, key):
"""Return True if the given key exists in the environment."""
if key == "nodes":
return True
return key in self.data
def __getitem__(self, key):
"""Return the given environment key, or None if it does not exist."""
if str(key) == "0":
raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead")
if key == "nodes":
return self._nodes
if key == "Name":
return self._get_stack_short()
return self.data.get(key)
def __setitem__(self, key, value):
"""Set the given environment key to the given value, overriding any previous value."""
if key == "Stack":
self._set_stack(value)
elif key == "node-limit":
self.data[key] = value
self._filter_nodes()
elif key == "nodes":
self._nodes = []
for node in value:
# I don't think I need the IP address, etc. but this validates
# the node name against /etc/hosts and/or DNS, so it's a
# GoodThing(tm).
try:
n = node.strip()
socket.gethostbyname_ex(n)
self._nodes.append(n)
- except:
+ except socket.herror:
self._logger.log("%s not found in DNS... aborting" % node)
raise
self._filter_nodes()
else:
self.data[key] = value
def random_node(self):
"""Choose a random node from the cluster."""
return self.random_gen.choice(self["nodes"])
def get(self, key, default=None):
"""Return the value for key if key is in the environment, else default."""
if key == "nodes":
return self._nodes
return self.data.get(key, default)
def _set_stack(self, name):
"""Normalize the given cluster stack name."""
if name in ["corosync", "cs", "mcp"]:
self.data["Stack"] = "corosync 2+"
else:
raise ValueError("Unknown stack: %s" % name)
def _get_stack_short(self):
"""Return the short name for the currently set cluster stack."""
if "Stack" not in self.data:
return "unknown"
if self.data["Stack"] == "corosync 2+":
return "crm-corosync"
LogFactory().log("Unknown stack: %s" % self["stack"])
raise ValueError("Unknown stack: %s" % self["stack"])
def _detect_systemd(self):
"""Detect whether systemd is in use on the target node."""
if "have_systemd" not in self.data:
(rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0)
self["have_systemd"] = rc == 0
def _detect_syslog(self):
"""Detect the syslog variant in use on the target node."""
if "syslogd" not in self.data:
if self["have_systemd"]:
# Systemd
(_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
self["syslogd"] = lines[0].strip()
else:
# SYS-V
(_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
self["syslogd"] = lines[0].strip()
if "syslogd" not in self.data or not self["syslogd"]:
# default
self["syslogd"] = "rsyslog"
def disable_service(self, node, service):
"""Disable the given service on the given node."""
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, "systemctl disable %s" % service)
return rc
# SYS-V
(rc, _) = self._rsh(node, "chkconfig %s off" % service)
return rc
def enable_service(self, node, service):
"""Enable the given service on the given node."""
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, "systemctl enable %s" % service)
return rc
# SYS-V
(rc, _) = self._rsh(node, "chkconfig %s on" % service)
return rc
def service_is_enabled(self, node, service):
"""Return True if the given service is enabled on the given node."""
if self["have_systemd"]:
# Systemd
# With "systemctl is-enabled", we should check if the service is
# explicitly "enabled" instead of the return code. For example it returns
# 0 if the service is "static" or "indirect", but they don't really count
# as "enabled".
(rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service)
return rc == 0
# SYS-V
(rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service)
return rc == 0
def _detect_at_boot(self):
"""Detect if the cluster starts at boot."""
if "at-boot" not in self.data:
self["at-boot"] = self.service_is_enabled(self._target, "corosync") \
or self.service_is_enabled(self._target, "pacemaker")
def _detect_ip_offset(self):
"""Detect the offset for IPaddr resources."""
if self["CIBResource"] and "IPBase" not in self.data:
(_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
network = lines[0].strip()
(_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
try:
self["IPBase"] = lines[0].strip()
except (IndexError, TypeError):
self["IPBase"] = None
if not self["IPBase"]:
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.")
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
return
# pylint thinks self["IPBase"] is a list, not a string, which causes it
# to error out because a list doesn't have split().
# pylint: disable=no-member
if int(self["IPBase"].split('.')[3]) >= 240:
self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
% (self["IPBase"], self["IPBase"].split('.')[3]))
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
def _filter_nodes(self):
"""
Filter the list of cluster nodes.
If --limit-nodes is given, keep that many nodes from the front of the
list of cluster nodes and drop the rest.
"""
if self["node-limit"] > 0:
if len(self["nodes"]) > self["node-limit"]:
# pylint thinks self["node-limit"] is a list even though we initialize
# it as an int in __init__ and treat it as an int everywhere.
# pylint: disable=bad-string-format-type
self._logger.log("Limiting the number of nodes configured=%d (max=%d)"
% (len(self["nodes"]), self["node-limit"]))
while len(self["nodes"]) > self["node-limit"]:
self["nodes"].pop(len(self["nodes"]) - 1)
def _validate(self):
"""Check that we were given all required command line parameters."""
if not self["nodes"]:
raise ValueError("No nodes specified!")
def _discover(self):
"""Probe cluster nodes to figure out how to log and manage services."""
self._target = random.Random().choice(self["nodes"])
exerciser = socket.gethostname()
# Use the IP where possible to avoid name lookup failures
for ip in socket.gethostbyname_ex(exerciser)[2]:
if ip != "127.0.0.1":
exerciser = ip
break
self["cts-exerciser"] = exerciser
self._detect_systemd()
self._detect_syslog()
self._detect_at_boot()
self._detect_ip_offset()
def _parse_args(self, argv):
"""
Parse and validate command line parameters.
Set the appropriate values in the environment dictionary. If argv is
None, use sys.argv instead.
"""
if not argv:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0])
grp1 = parser.add_argument_group("Common options")
grp1.add_argument("-g", "--dsh-group", "--group",
metavar="GROUP", dest="group",
help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
grp1.add_argument("-l", "--limit-nodes",
type=int, default=0,
metavar="MAX",
help="Only use the first MAX cluster nodes supplied with --nodes")
grp1.add_argument("--benchmark",
action="store_true",
help="Add timing information")
grp1.add_argument("--list", "--list-tests",
action="store_true", dest="list_tests",
help="List the valid tests")
grp1.add_argument("--nodes",
metavar="NODES",
help="List of cluster nodes separated by whitespace")
grp1.add_argument("--stack",
default="corosync",
metavar="STACK",
help="Which cluster stack is installed")
grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
grp2.add_argument("-L", "--logfile",
metavar="PATH",
help="Where to look for logs from cluster nodes")
grp2.add_argument("--at-boot", "--cluster-starts-at-boot",
choices=["1", "0", "yes", "no"],
help="Does the cluster software start at boot time?")
grp2.add_argument("--facility", "--syslog-facility",
default="daemon",
metavar="NAME",
help="Which syslog facility to log to")
grp2.add_argument("--ip", "--test-ip-base",
metavar="IP",
help="Offset for generated IP address resources")
grp3 = parser.add_argument_group("Options for release testing")
grp3.add_argument("-r", "--populate-resources",
action="store_true",
help="Generate a sample configuration")
grp3.add_argument("--choose",
metavar="NAME",
help="Run only the named test")
grp3.add_argument("--fencing", "--stonith",
choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
default="1",
help="What fencing agent to use")
grp3.add_argument("--once",
action="store_true",
help="Run all valid tests once")
grp4 = parser.add_argument_group("Additional (less common) options")
grp4.add_argument("-c", "--clobber-cib",
action="store_true",
help="Erase any existing configuration")
grp4.add_argument("-y", "--yes",
action="store_true", dest="always_continue",
help="Continue to run whenever prompted")
grp4.add_argument("--boot",
action="store_true",
help="")
grp4.add_argument("--cib-filename",
metavar="PATH",
help="Install the given CIB file to the cluster")
grp4.add_argument("--experimental-tests",
action="store_true",
help="Include experimental tests")
grp4.add_argument("--loop-minutes",
type=int, default=60,
help="")
grp4.add_argument("--no-loop-tests",
action="store_true",
help="Don't run looping/time-based tests")
grp4.add_argument("--no-unsafe-tests",
action="store_true",
help="Don't run tests that are unsafe for use with ocfs2/drbd")
grp4.add_argument("--notification-agent",
metavar="PATH",
default="/var/lib/pacemaker/notify.sh",
help="Script to configure for Pacemaker alerts")
grp4.add_argument("--notification-recipient",
metavar="R",
default="/var/lib/pacemaker/notify.log",
help="Recipient to pass to alert script")
grp4.add_argument("--oprofile",
metavar="NODES",
help="List of cluster nodes to run oprofile on")
grp4.add_argument("--outputfile",
metavar="PATH",
help="Location to write logs to")
grp4.add_argument("--qarsh",
action="store_true",
help="Use QARSH to access nodes instead of SSH")
grp4.add_argument("--schema",
metavar="SCHEMA",
default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION,
help="Create a CIB conforming to the given schema")
grp4.add_argument("--seed",
metavar="SEED",
help="Use the given string as the random number seed")
grp4.add_argument("--set",
action="append",
metavar="ARG",
default=[],
help="Set key=value pairs (can be specified multiple times)")
grp4.add_argument("--stonith-args",
metavar="ARGS",
default="hostlist=all,livedangerously=yes",
help="")
grp4.add_argument("--stonith-type",
metavar="TYPE",
default="external/ssh",
help="")
grp4.add_argument("--trunc",
action="store_true", dest="truncate",
help="Truncate log file before starting")
grp4.add_argument("--valgrind-procs",
metavar="PROCS",
default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd",
help="Run valgrind against the given space-separated list of processes")
grp4.add_argument("--valgrind-tests",
action="store_true",
help="Include tests using valgrind")
grp4.add_argument("--warn-inactive",
action="store_true",
help="Warn if a resource is assigned to an inactive node")
parser.add_argument("iterations",
nargs='?',
type=int, default=1,
help="Number of tests to run")
args = parser.parse_args(args=argv)
# Set values on this object based on what happened with command line
# processing. This has to be done in several blocks.
# These values can always be set. They get a default from the add_argument
# calls, only do one thing, and they do not have any side effects.
self["ClobberCIB"] = args.clobber_cib
self["ListTests"] = args.list_tests
self["Schema"] = args.schema
self["Stack"] = args.stack
self["SyslogFacility"] = args.facility
self["TruncateLog"] = args.truncate
self["at-boot"] = args.at_boot in ["1", "yes"]
self["benchmark"] = args.benchmark
self["continue"] = args.always_continue
self["experimental-tests"] = args.experimental_tests
self["iterations"] = args.iterations
self["loop-minutes"] = args.loop_minutes
self["loop-tests"] = not args.no_loop_tests
self["notification-agent"] = args.notification_agent
self["notification-recipient"] = args.notification_recipient
self["node-limit"] = args.limit_nodes
self["stonith-params"] = args.stonith_args
self["stonith-type"] = args.stonith_type
self["unsafe-tests"] = not args.no_unsafe_tests
self["valgrind-procs"] = args.valgrind_procs
self["valgrind-tests"] = args.valgrind_tests
self["warn-inactive"] = args.warn_inactive
# Nodes and groups are mutually exclusive, so their defaults cannot be
# set in their add_argument calls. Additionally, groups does more than
# just set a value. Here, set nodes first and then if a group is
# specified, override the previous nodes value.
if args.nodes:
self["nodes"] = args.nodes.split(" ")
else:
self["nodes"] = []
if args.group:
self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group)
LogFactory().add_file(self["OutputFile"], "CTS")
dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group)
if os.path.isfile(dsh_file):
self["nodes"] = []
with open(dsh_file, "r", encoding="utf-8") as f:
for line in f:
stripped = line.strip()
if not stripped.startswith('#'):
self["nodes"].append(stripped)
else:
print("Unknown DSH group: %s" % args.dsh_group)
# Everything else either can't have a default set in an add_argument
# call (likely because we don't want to always have a value set for it)
# or it does something fancier than just set a single value. However,
# order does not matter for these as long as the user doesn't provide
# conflicting arguments on the command line. So just do Everything
# alphabetically.
if args.boot:
self["scenario"] = "boot"
if args.cib_filename:
self["CIBfilename"] = args.cib_filename
else:
self["CIBfilename"] = None
if args.choose:
self["scenario"] = "sequence"
self["tests"].append(args.choose)
if args.fencing:
if args.fencing in ["0", "no"]:
self["DoFencing"] = False
else:
self["DoFencing"] = True
if args.fencing in ["rhcs", "virt", "xvm"]:
self["stonith-type"] = "fence_xvm"
elif args.fencing == "scsi":
self["stonith-type"] = "fence_scsi"
elif args.fencing in ["lha", "ssh"]:
self["stonith-params"] = "hostlist=all,livedangerously=yes"
self["stonith-type"] = "external/ssh"
elif args.fencing == "openstack":
self["stonith-type"] = "fence_openstack"
print("Obtaining OpenStack credentials from the current environment")
self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % (
os.environ['OS_REGION_NAME'],
os.environ['OS_TENANT_NAME'],
os.environ['OS_AUTH_URL'],
os.environ['OS_USERNAME'],
os.environ['OS_PASSWORD']
)
elif args.fencing == "rhevm":
self["stonith-type"] = "fence_rhevm"
print("Obtaining RHEV-M credentials from the current environment")
self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
os.environ['RHEVM_USERNAME'],
os.environ['RHEVM_PASSWORD'],
os.environ['RHEVM_SERVER'],
os.environ['RHEVM_PORT'],
)
if args.ip:
self["CIBResource"] = True
self["ClobberCIB"] = True
self["IPBase"] = args.ip
if args.logfile:
self["LogAuditDisabled"] = True
self["LogFileName"] = args.logfile
self["LogWatcher"] = LogKind.REMOTE_FILE
else:
# We can't set this as the default on the parser.add_argument call
# for this option because then args.logfile will be set, which means
# the above branch will be taken and those other values will also be
# set.
self["LogFileName"] = "/var/log/messages"
if args.once:
self["scenario"] = "all-once"
if args.oprofile:
self["oprofile"] = args.oprofile.split(" ")
else:
self["oprofile"] = []
if args.outputfile:
self["OutputFile"] = args.outputfile
LogFactory().add_file(self["OutputFile"])
if args.populate_resources:
self["CIBResource"] = True
self["ClobberCIB"] = True
if args.qarsh:
self._rsh.enable_qarsh()
for kv in args.set:
(name, value) = kv.split("=")
self[name] = value
print("Setting %s = %s" % (name, value))
class EnvFactory:
"""A class for constructing a singleton instance of an Environment object."""
instance = None
# pylint: disable=invalid-name
def getInstance(self, args=None):
"""
Return the previously created instance of Environment.
If no instance exists, create a new instance and return that.
"""
if not EnvFactory.instance:
EnvFactory.instance = Environment(args)
return EnvFactory.instance
diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py
index 7de8c0e37d..24de347fb3 100644
--- a/python/pacemaker/_cts/scenarios.py
+++ b/python/pacemaker/_cts/scenarios.py
@@ -1,424 +1,424 @@
"""Test scenario classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = [
"AllOnce",
"Boot",
"BootCluster",
"LeaveBooted",
"RandomTests",
"Sequence",
]
__copyright__ = "Copyright 2000-2024 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
from pacemaker._cts.audits import ClusterAudit
from pacemaker._cts.input import should_continue
from pacemaker._cts.tests.ctstest import CTSTest
from pacemaker._cts.watcher import LogWatcher
class ScenarioComponent:
"""
The base class for all scenario components.
A scenario component is one single step in a scenario. Each component is
basically just a setup and teardown method.
"""
def __init__(self, cm, env):
"""
Create a new ScenarioComponent instance.
Arguments:
cm -- A ClusterManager instance
env -- An Environment instance
"""
# pylint: disable=invalid-name
self._cm = cm
self._env = env
def is_applicable(self):
"""
Return True if this component is applicable in the given Environment.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def setup(self):
"""
Set up the component, returning True on success.
This method must be provided by all subclasses.
"""
raise NotImplementedError
def teardown(self):
"""
Tear down the given component.
This method must be provided by all subclasses.
"""
raise NotImplementedError
class Scenario:
"""
The base class for scenarios.
A scenario is an ordered list of ScenarioComponent objects. A scenario
proceeds by setting up all its components in sequence, running a list of
tests and audits, and then tearing down its components in reverse.
"""
def __init__(self, cm, components, audits, tests):
"""
Create a new Scenario instance.
Arguments:
cm -- A ClusterManager instance
components -- A list of ScenarioComponents comprising this Scenario
audits -- A list of ClusterAudits that will be performed as
part of this Scenario
tests -- A list of CTSTests that will be run
"""
# pylint: disable=invalid-name
self.stats = {
"success": 0,
"failure": 0,
"BadNews": 0,
"skipped": 0
}
self.tests = tests
self._audits = audits
self._bad_news = None
self._cm = cm
self._components = components
for comp in components:
if not issubclass(comp.__class__, ScenarioComponent):
raise ValueError("Init value must be subclass of ScenarioComponent")
for audit in audits:
if not issubclass(audit.__class__, ClusterAudit):
raise ValueError("Init value must be subclass of ClusterAudit")
for test in tests:
if not issubclass(test.__class__, CTSTest):
raise ValueError("Init value must be a subclass of CTSTest")
def is_applicable(self):
"""Return True if all ScenarioComponents are applicable."""
for comp in self._components:
if not comp.is_applicable():
return False
return True
def setup(self):
"""
Set up the scenario, returning True on success.
If setup fails at some point, tear down those components that did
successfully set up.
"""
self._cm.prepare()
self.audit() # Also detects remote/local log config
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
self.audit()
self._cm.install_support()
self._bad_news = LogWatcher(self._cm.env["LogFileName"],
self._cm.templates.get_patterns("BadNews"),
self._cm.env["nodes"],
self._cm.env["LogWatcher"],
"BadNews", 0)
self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
j = 0
while j < len(self._components):
if not self._components[j].setup():
# OOPS! We failed. Tear partial setups down.
self.audit()
self._cm.log("Tearing down partial setup")
self.teardown(j)
return False
j += 1
self.audit()
return True
def teardown(self, n_components=None):
"""
Tear down the scenario in the reverse order it was set up.
If n_components is not None, only tear down that many components.
"""
if not n_components:
n_components = len(self._components) - 1
j = n_components
while j >= 0:
self._components[j].teardown()
j -= 1
self.audit()
self._cm.install_support("uninstall")
def incr(self, name):
"""Increment the given stats key."""
if name not in self.stats:
self.stats[name] = 0
self.stats[name] += 1
def run(self, iterations):
"""Run all tests in the scenario the given number of times."""
self._cm.oprofile_start()
try:
self._run_loop(iterations)
self._cm.oprofile_stop()
- except:
+ except: # noqa: E722
self._cm.oprofile_stop()
raise
def _run_loop(self, iterations):
"""Run all the tests the given number of times."""
raise NotImplementedError
def run_test(self, test, testcount):
"""
Run the given test.
testcount is the number of tests (including this one) that have been
run across all iterations.
"""
nodechoice = self._cm.env.random_node()
ret = True
did_run = False
self._cm.clear_instance_errors_to_ignore()
choice = "(%s)" % nodechoice
self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount))
starttime = test.set_timer()
if not test.setup(nodechoice):
self._cm.log("Setup failed")
ret = False
else:
did_run = True
ret = test(nodechoice)
if not test.teardown(nodechoice):
self._cm.log("Teardown failed")
if not should_continue(self._cm.env):
raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
ret = False
stoptime = time.time()
self._cm.oprofile_save(testcount)
elapsed_time = stoptime - starttime
test_time = stoptime - test.get_timer()
if "min_time" not in test.stats:
test.stats["elapsed_time"] = elapsed_time
test.stats["min_time"] = test_time
test.stats["max_time"] = test_time
else:
test.stats["elapsed_time"] += elapsed_time
if test_time < test.stats["min_time"]:
test.stats["min_time"] = test_time
if test_time > test.stats["max_time"]:
test.stats["max_time"] = test_time
if ret:
self.incr("success")
test.log_timer()
else:
self.incr("failure")
self._cm.statall()
did_run = True # Force the test count to be incremented anyway so test extraction works
self.audit(test.errors_to_ignore)
return did_run
def summarize(self):
"""Output scenario results."""
self._cm.log("****************")
self._cm.log("Overall Results:%r" % self.stats)
self._cm.log("****************")
stat_filter = {
"calls": 0,
"failure": 0,
"skipped": 0,
"auditfail": 0,
}
self._cm.log("Test Summary")
for test in self.tests:
for key in stat_filter:
stat_filter[key] = test.stats[key]
name = "Test %s:" % test.name
self._cm.log("{:<25} {!r}".format(name, stat_filter))
self._cm.debug("Detailed Results")
for test in self.tests:
name = "Test %s:" % test.name
self._cm.debug("{:<25} {!r}".format(name, stat_filter))
self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
def audit(self, local_ignore=None):
"""
Perform all scenario audits and log results.
If there are too many failures, prompt the user to confirm that the
scenario should continue running.
"""
errcount = 0
ignorelist = ["CTS:"]
if local_ignore:
ignorelist.extend(local_ignore)
ignorelist.extend(self._cm.errors_to_ignore)
ignorelist.extend(self._cm.instance_errors_to_ignore)
# This makes sure everything is stabilized before starting...
failed = 0
for audit in self._audits:
if not audit():
self._cm.log("Audit %s FAILED." % audit.name)
failed += 1
else:
self._cm.debug("Audit %s passed." % audit.name)
while errcount < 1000:
match = None
if self._bad_news:
match = self._bad_news.look(0)
if match:
add_err = True
for ignore in ignorelist:
if add_err and re.search(ignore, match):
add_err = False
if add_err:
self._cm.log("BadNews: %s" % match)
self.incr("BadNews")
errcount += 1
else:
break
else:
print("Big problems")
if not should_continue(self._cm.env):
self._cm.log("Shutting down.")
self.summarize()
self.teardown()
raise ValueError("Looks like we hit a BadNews jackpot!")
if self._bad_news:
self._bad_news.end()
return failed
class AllOnce(Scenario):
"""Every Test Once."""
def _run_loop(self, iterations):
testcount = 1
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class RandomTests(Scenario):
"""Random Test Execution."""
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
test = self._cm.env.random_gen.choice(self.tests)
self.run_test(test, testcount)
testcount += 1
class Sequence(Scenario):
"""Named Tests in Sequence."""
def _run_loop(self, iterations):
testcount = 1
while testcount <= iterations:
for test in self.tests:
self.run_test(test, testcount)
testcount += 1
class Boot(Scenario):
"""Start the Cluster."""
def _run_loop(self, iterations):
return
class BootCluster(ScenarioComponent):
"""
Start the cluster manager on all nodes.
Wait for each to come up before starting in order to account for the
possibility that a given node might have been rebooted or crashed
beforehand.
"""
def is_applicable(self):
"""Return whether this scenario is applicable."""
return True
def setup(self):
"""Set up the component, returning True on success."""
self._cm.prepare()
# Clear out the cobwebs ;-)
self._cm.stopall(verbose=True, force=True)
# Now start the Cluster Manager on all the nodes.
self._cm.log("Starting Cluster Manager on all nodes.")
return self._cm.startall(verbose=True, quick=True)
def teardown(self):
"""Tear down the component."""
self._cm.log("Stopping Cluster Manager on all nodes")
self._cm.stopall(verbose=True, force=False)
class LeaveBooted(BootCluster):
"""Leave all nodes up when the scenario is complete."""
def teardown(self):
"""Tear down the component."""
self._cm.log("Leaving Cluster running on all nodes")

File Metadata

Mime Type
text/x-diff
Expires
Thu, Oct 16, 3:29 PM (7 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2536601
Default Alt Text
(47 KB)

Event Timeline