diff --git a/cts/benchmark/clubench.in b/cts/benchmark/clubench.in
index 60e9ab5fc6..5e24ccbb63 100644
--- a/cts/benchmark/clubench.in
+++ b/cts/benchmark/clubench.in
@@ -1,194 +1,193 @@
#!/bin/sh
#
# Copyright 2010-2025 the Pacemaker project contributors
#
# The version control history for this file may have further details.
#
# This source code is licensed under the GNU General Public License version 2
# or later (GPLv2+) WITHOUT ANY WARRANTY.
SSHOPTS="-l root -o PasswordAuthentication=no -o ConnectTimeout=5"
msg() {
echo "$@" >&2
}
usage() {
echo "usage: $0
"
echo " dir: working directory (with the control file)"
exit 0
}
[ $# -eq 0 ] && usage
WORKDIR=$1
test -d "$WORKDIR" || usage
CTSCTRL=~/.cts
CTRL=$WORKDIR/control
CSV=$WORKDIR/bench.csv
STATS=$WORKDIR/bench.stats
test -f $CTRL && . $CTRL
@datadir@/@PACKAGE@/tests/cts/cluster_test 500 || {
msg "cluster_test failed"
exit 1
}
test -f $CTSCTRL || {
msg no CTS control file $CTSCTRL
exit 1
}
. $CTSCTRL
-: ${CTS_logfacility:=local7}
: ${CTS_logfile:="@CRM_LOG_DIR@/ha-log-bench"}
: ${CTS_adv:="--schema pacemaker-1.2 --clobber-cib -r"}
: ${RUNS:=3}
: ${CTSTESTS:="--benchmark"}
: ${CTSDIR:="@datadir@/@PACKAGE@/tests/cts"}
: ${CTS_node_list:=""}
: ${CTS_stonith:=""}
: ${CTS_stonith_args:=""}
[ -n "$CTS_node_list" ] || {
msg no node list specified
exit 1
}
-CTSOPTS="$CTS_adv --facility $CTS_logfacility --logfile $CTS_logfile"
+CTSOPTS="$CTS_adv --logfile $CTS_logfile"
if [ "x$CTS_stonith" != "x" ]; then
CTSOPTS="$CTSOPTS --stonith-type $CTS_stonith"
[ "x$CTS_stonith_args" != "x" ] &&
CTSOPTS="$CTSOPTS --stonith-params \"$CTS_stonith_args\""
else
CTSOPTS="$CTSOPTS --stonith 0"
fi
CTSOPTS="$CTSOPTS $CTSTESTS"
fibonacci() {
F_LIMIT=$1
F_N=2
F_N_PREV=1
while [ $F_N -le $F_LIMIT ]; do
echo $F_N
F_N_TMP=$F_N
F_N=$((F_N+F_N_PREV))
F_N_PREV=$F_N_TMP
done
[ $F_N_PREV -ne $F_LIMIT ] && echo $F_LIMIT
}
[ "$SERIES" ] ||
SERIES=$(fibonacci "$(echo $CTS_node_list | wc -w)")
get_nodes() {
GN_C_NODES=$(echo $CTS_node_list | awk -v n="$1" '
{ for( i=1; i<=NF; i++ ) node[cnt++]=$i }
END{for( i=0; i "$RC_ODIR/ctsrun.out" 2>&1 &
ctspid=$!
tail -f "$RC_ODIR/ctsrun.out" &
tailpid=$!
wait $ctspid
kill $tailpid >/dev/null 2>&1
}
bench_re='CTS:.*runtime:'
diginfo() {
DI_CTS_DIR="$1"
DI_S="$2"
filter="$3"
(
cd "$DI_CTS_DIR" || return
for r in [0-9]*.tar.bz2; do
tar xjf $r
DI_D=$(basename "$r" .tar.bz2)
for DI_V in $(grep "$bench_re" "$DI_D/ha-log.txt" | eval "$filter"); do
DI_S="$DI_S,$DI_V"
done
rm -r "$DI_D"
done
echo $DI_S
)
}
printheader() {
diginfo $1 "" "awk '{print \$(NF-2)}'"
}
printstats() {
diginfo $1 "$clusize" "awk '{print \$(NF)}'"
}
printmedians() {
PM_F="$1"
PM_S="$clusize"
PM_MIDDLE=$((RUNS/2 + 1))
set $(head -1 "$PM_F" | sed 's/,/ /g')
PM_COLS=$#
for PM_I in $(seq 2 $PM_COLS); do
PM_V=$(awk -v i=$PM_I -F, '{print $i}' < $PM_F | sort -n | head -$PM_MIDDLE | tail -1)
PM_S="$PM_S,$PM_V"
done
echo $PM_S
}
rm -f $CSV
tmpf=`mktemp`
test -f "$tmpf" || {
msg "can't create temporary file"
exit 1
}
trap "rm -f $tmpf" 0
for clusize in $SERIES; do
nodes=`get_nodes $clusize`
outdir=$WORKDIR/$clusize
rm -rf $outdir
mkdir -p $outdir
rm -f $tmpf
node_cleanup
for i in `seq $RUNS`; do
true > $CTS_logfile
mkdir -p $outdir/$i
runcts $outdir/$i
mkreports $outdir/$i
printstats $outdir/$i >> $tmpf
done
[ -f "$CSV" ] || printheader $outdir/1 > $CSV
printmedians $tmpf >> $CSV
cat $tmpf >> $STATS
msg "Statistics for $clusize-node cluster saved"
done
msg "Tests done for series $SERIES, output in $CSV and $STATS"
# vim: set filetype=sh:
diff --git a/cts/cluster_test.in b/cts/cluster_test.in
index cd42ae7be2..a898eff311 100755
--- a/cts/cluster_test.in
+++ b/cts/cluster_test.in
@@ -1,155 +1,148 @@
#!@BASH_PATH@
#
# Copyright 2008-2025 the Pacemaker project contributors
#
# The version control history for this file may have further details.
#
# This source code is licensed under the GNU General Public License version 2
# or later (GPLv2+) WITHOUT ANY WARRANTY.
#
if [ -e ~/.cts ]; then
. ~/.cts
fi
anyAsked=0
[ $# -lt 1 ] || CTS_numtests=$1
die() { echo "$@"; exit 1; }
if [ -z "$CTS_asked_once" ]; then
anyAsked=1
echo "This script should only be executed on the test exerciser."
echo "The test exerciser will remotely execute the actions required by the"
echo "tests and should not be part of the cluster itself."
read -p "Is this host intended to be the test exerciser? (yN) " doUnderstand
[ "$doUnderstand" = "y" ] \
|| die "This script must be executed on the test exerciser"
fi
if [ -z "$CTS_node_list" ]; then
anyAsked=1
read -p "Please list your cluster nodes (eg. node1 node2 node3): " CTS_node_list
else
echo "Beginning test of cluster: $CTS_node_list"
fi
[ "${CTS_node_list}" = "${CTS_node_list/$HOSTNAME/}" ] \
|| die "This script must be executed on the test exerciser, and the test exerciser cannot be part of the cluster"
printf "+ Bootstrapping ssh... "
if [ -z "$SSH_AUTH_SOCK" ]; then
printf "\n + Initializing SSH "
eval "$(ssh-agent)"
echo " + Adding identities..."
ssh-add
rc=$?
if [ $rc -ne 0 ]; then
echo " -- No identities added"
printf "\nThe ability to open key-based 'ssh' connections (as the user 'root') is required to use CTS.\n"
read -p " - Do you want this program to help you create one? (yN) " auto_fix
if [ "$auto_fix" = "y" ]; then
ssh-keygen -t dsa
ssh-add
else
die "Please run 'ssh-keygen -t dsa' to create a new key"
fi
fi
else
echo "OK"
fi
test_ok=1
printf "+ Testing ssh configuration... "
for n in $CTS_node_list; do
ssh -l root -o PasswordAuthentication=no -o ConnectTimeout=5 "$n" /bin/true
rc=$?
if [ $rc -ne 0 ]; then
echo " - connection to $n failed"
test_ok=0
fi
done
if [ $test_ok -eq 0 ]; then
printf "\nThe ability to open key-based 'ssh' connections (as the user 'root') is required to use CTS.\n"
read -p " - Do you want this program to help you with such a setup? (yN) " auto_fix
if [ "$auto_fix" = "y" ]; then
# XXX are we picking the most suitable identity?
privKey=$(ssh-add -L | head -n1 | cut -d" " -f3)
sshCopyIdOpts="-o User=root"
[ -z "$privKey" ] || sshCopyIdOpts+=" -i \"${privKey}.pub\""
for n in $CTS_node_list; do
eval "ssh-copy-id $sshCopyIdOpts \"${n}\"" \
|| die "Attempt to 'ssh-copy-id $sshCopyIdOpts \"$n\"' failed"
done
else
die "Please install one of your SSH public keys to root's account on all cluster nodes"
fi
fi
echo "OK"
if [ -z "$CTS_logfile" ]; then
anyAsked=1
read -p " + Where does/should syslog store logs from remote hosts? (/var/log/messages) " CTS_logfile
[ -n "$CTS_logfile" ] || CTS_logfile=/var/log/messages
fi
[ -e "$CTS_logfile" ] || die "$CTS_logfile doesn't exist"
-if [ -z "$CTS_logfacility" ]; then
- anyAsked=1
- read -p " + Which log facility does the cluster use? (daemon) " CTS_logfacility
- [ -n "$CTS_logfacility" ] || CTS_logfacility=daemon
-fi
-
if [ -z "$CTS_numtests" ]; then
read -p "+ How many test iterations should be performed? (500) " CTS_numtests
[ -n "$CTS_numtests" ] || CTS_numtests=500
fi
if [ -z "$CTS_asked_once" ]; then
anyAsked=1
read -p "+ What type of STONITH agent do you use? (none) " CTS_stonith
[ -z "$CTS_stonith" ] \
|| read -p "+ List any STONITH agent parameters (eq. device_host=switch.power.com): " CTS_stonith_args
[ -n "$CTS_adv" ] \
|| read -p "+ (Advanced) Any extra CTS parameters? (none) " CTS_adv
fi
[ $anyAsked -eq 0 ] \
|| read -p "+ Save values to ~/.cts for next time? (yN) " doSave
if [ "$doSave" = "y" ]; then
cat > ~/.cts <<-EOF
# CTS Test data
CTS_node_list="$CTS_node_list"
CTS_logfile="$CTS_logfile"
CTS_logport="$CTS_logport"
- CTS_logfacility="$CTS_logfacility"
CTS_asked_once=1
CTS_adv="$CTS_adv"
CTS_stonith="$CTS_stonith"
CTS_stonith_args="$CTS_stonith_args"
EOF
fi
cts_extra=""
if [ -n "$CTS_stonith" ]; then
cts_extra="$cts_extra --stonith-type $CTS_stonith"
[ -z "$CTS_stonith_args" ] \
|| cts_extra="$cts_extra --stonith-params \"$CTS_stonith_args\""
else
cts_extra="$cts_extra --stonith 0"
echo " - Testing a cluster without STONITH is like a blunt pencil... pointless"
fi
printf "\nAll set to go for %d iterations!\n" "$CTS_numtests"
[ $anyAsked -ne 0 ] \
|| echo "+ To use a different configuration, remove ~/.cts and re-run cts (or edit it manually)."
echo Now paste the following command into this shell:
-echo "@PYTHON@ `dirname "$0"`/cts-lab -L \"$CTS_logfile\" --syslog-facility \"$CTS_logfacility\" --no-unsafe-tests $CTS_adv $cts_extra \"$CTS_numtests\" --nodes \"$CTS_node_list\""
+echo "@PYTHON@ `dirname "$0"`/cts-lab -L \"$CTS_logfile\" --no-unsafe-tests $CTS_adv $cts_extra \"$CTS_numtests\" --nodes \"$CTS_node_list\""
# vim: set filetype=sh:
diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
index 18fdac7e9a..63a82f5612 100644
--- a/python/pacemaker/_cts/audits.py
+++ b/python/pacemaker/_cts/audits.py
@@ -1,1052 +1,1052 @@
"""Auditing classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import re
import time
import uuid
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.input import should_continue
from pacemaker._cts.watcher import LogKind, LogWatcher
class ClusterAudit:
"""
The base class for various kinds of auditors.
Specific audit implementations should be built on top of this one. Audits
can do all kinds of checks on the system. The basic interface for callers
is the `__call__` method, which returns True if the audit passes and False
if it fails.
"""
def __init__(self, cm):
"""
Create a new ClusterAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
# pylint: disable=invalid-name
self._cm = cm
self.name = None
def __call__(self):
"""Perform the audit action."""
raise NotImplementedError
def is_applicable(self):
"""
Return True if this audit is applicable in the current test configuration.
This method must be implemented by all subclasses.
"""
raise NotImplementedError
def log(self, args):
"""Log a message."""
self._cm.log(f"audit: {args}")
def debug(self, args):
"""Log a debug message."""
self._cm.debug(f"audit: {args}")
class LogAudit(ClusterAudit):
"""
Audit each cluster node to verify that some logging system is usable.
This is done by logging a unique test message and then verifying that we
can read back that test message using logging tools.
"""
def __init__(self, cm):
"""
Create a new LogAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "LogAudit"
def _restart_cluster_logging(self, nodes=None):
"""Restart logging on the given nodes, or all if none are given."""
if not nodes:
nodes = self._cm.env["nodes"]
self._cm.debug(f"Restarting logging on: {nodes!r}")
for node in nodes:
if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log(f"ERROR: Cannot stop 'systemd-journald' on {node}")
(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log(f"ERROR: Cannot start 'systemd-journald' on {node}")
if "syslogd" in self._cm.env:
(rc, _) = self._cm.rsh(node, f"service {self._cm.env['syslogd']} restart")
if rc != 0:
self._cm.log(f"""ERROR: Cannot restart '{self._cm.env["syslogd"]}' on {node}""")
def _create_watcher(self, patterns, kind):
"""Create a new LogWatcher instance for the given patterns."""
watch = LogWatcher(self._cm.env["LogFileName"], patterns,
self._cm.env["nodes"], kind, "LogAudit", 5,
silent=True)
watch.set_watch()
return watch
def _test_logging(self):
"""Perform the log audit."""
patterns = []
prefix = "Test message from"
suffix = str(uuid.uuid4())
watch = {}
for node in self._cm.env["nodes"]:
# Look for the node name in two places to make sure
# that syslog is logging with the correct hostname
m = re.search("^([^.]+).*", node)
if m:
simple = m.group(1)
else:
simple = node
patterns.append(f"{simple}.*{prefix} {node} {suffix}")
watch_pref = self._cm.env["log_kind"]
if watch_pref is None:
kinds = [LogKind.LOCAL_FILE]
if self._cm.env["have_systemd"]:
kinds.append(LogKind.JOURNAL)
kinds.append(LogKind.REMOTE_FILE)
for k in kinds:
watch[k] = self._create_watcher(patterns, k)
self._cm.log(f"Logging test message with identifier {suffix}")
else:
watch[watch_pref] = self._create_watcher(patterns, watch_pref)
for node in self._cm.env["nodes"]:
- cmd = f"logger -p {self._cm.env['SyslogFacility']}.info {prefix} {node} {suffix}"
+ cmd = f"logger -p {self._cm.env['syslog_facility']}.info {prefix} {node} {suffix}"
(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log(f"ERROR: Cannot execute remote command [{cmd}] on {node}")
for k, w in watch.items():
if watch_pref is None:
self._cm.log(f"Checking for test message in {k} logs")
w.look_for_all(silent=True)
if not w.unmatched:
if watch_pref is None:
self._cm.log(f"Found test message in {k} logs")
self._cm.env["log_kind"] = k
return True
for regex in w.unmatched:
self._cm.log(f"Test message [{regex}] not found in {w.kind} logs")
return False
def __call__(self):
"""Perform the audit action."""
max_attempts = 3
attempt = 0
passed = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
while attempt <= max_attempts and not self._test_logging():
attempt += 1
self._restart_cluster_logging()
time.sleep(60 * attempt)
if attempt > max_attempts:
self._cm.log("ERROR: Cluster logging unrecoverable.")
passed = False
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
if self._cm.env["LogAuditDisabled"]:
return False
return True
class DiskAudit(ClusterAudit):
"""
Audit disk usage on cluster nodes.
Verify that there is enough free space left on whichever mounted file
system holds the logs.
Warn on: less than 100 MB or 10% of free space
Error on: less than 10 MB or 5% of free space
"""
def __init__(self, cm):
"""
Create a new DiskAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "DiskspaceAudit"
def __call__(self):
"""Perform the audit action."""
passed = True
# @TODO Use directory of PCMK_logfile if set on host
dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
if not dfout:
self._cm.log(f"ERROR: Cannot execute remote df command [{dfcmd}] on {node}")
continue
dfout = dfout[0].strip()
try:
(used, remain) = dfout.split()
used_percent = int(used)
remaining_mb = int(remain)
except (ValueError, TypeError):
self._cm.log(f"Warning: df output '{dfout}' from {node} was invalid [{used}, {remain}]")
else:
if remaining_mb < 10 or used_percent > 95:
self._cm.log(f"CRIT: Out of log disk space on {node} ({used_percent}% / {remaining_mb}MB)")
passed = False
if not should_continue(self._cm.env):
raise ValueError(f"Disk full on {node}")
elif remaining_mb < 100 or used_percent > 90:
self._cm.log(f"WARN: Low on log disk space ({remaining_mb}MB) on {node}")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
return True
class FileAudit(ClusterAudit):
"""
Audit the filesystem looking for various failure conditions.
Check for:
* The presence of core dumps from corosync or Pacemaker daemons
* Stale IPC files
"""
def __init__(self, cm):
"""
Create a new FileAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.known = []
self.name = "FileAudit"
def _output_has_core(self, output, node):
"""Check output for any lines that would indicate the presence of a core dump."""
found = False
for line in output:
line = line.strip()
if line in self.known:
continue
found = True
self.known.append(line)
self._cm.log(f"Warning: core file on {node}: {line}")
return found
def _find_core_with_coredumpctl(self, node):
"""Use coredumpctl to find core dumps on the given node."""
(_, lsout) = self._cm.rsh(node, "coredumpctl --no-legend --no-pager")
return self._output_has_core(lsout, node)
def _find_core_on_fs(self, node, paths):
"""Check for core dumps on the given node, under any of the given paths."""
(_, lsout) = self._cm.rsh(node, f"ls -al {' '.join(paths)} | grep core.[0-9]",
verbose=1)
return self._output_has_core(lsout, node)
def __call__(self):
"""Perform the audit action."""
passed = True
self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
found = False
# If systemd is present, first see if coredumpctl logged any core dumps.
if self._cm.env["have_systemd"]:
found = self._find_core_with_coredumpctl(node)
if found:
passed = False
# If we didn't find any core dumps, it's for one of three reasons:
# (1) Nothing crashed
# (2) systemd is not present
# (3) systemd is present but coredumpctl is not enabled
#
# To handle the last two cases, check the other filesystem locations.
if not found:
found = self._find_core_on_fs(node, ["/var/lib/pacemaker/cores/*",
"/var/lib/corosync"])
if found:
passed = False
if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
for line in lsout:
passed = False
clean = True
self._cm.log(f"Warning: Stale IPC file on {node}: {line}")
if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
for line in lsout:
self._cm.debug(f"ps[{node}]: {line}")
self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
else:
self._cm.debug(f"Skipping {node}")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
return True
class AuditResource:
"""A base class for storing information about a cluster resource."""
def __init__(self, cm, line):
"""
Create a new AuditResource instance.
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
resource
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.clone_id = fields[3]
self.parent = fields[4]
self.rprovider = fields[5]
self.rclass = fields[6]
self.rtype = fields[7]
self.host = fields[8]
self.needs_quorum = fields[9]
self.flags = int(fields[10])
self.flags_s = fields[11]
if self.parent == "NA":
self.parent = None
@property
def unique(self):
"""Return True if this resource is unique."""
return self.flags & 0x20
@property
def orphan(self):
"""Return True if this resource is an orphan."""
return self.flags & 0x01
@property
def managed(self):
"""Return True if this resource is managed by the cluster."""
return self.flags & 0x02
class AuditConstraint:
"""A base class for storing information about a cluster constraint."""
def __init__(self, cm, line):
"""
Create a new AuditConstraint instance.
Arguments:
cm -- A ClusterManager instance
line -- One line of output from `crm_resource` describing a single
constraint
"""
# pylint: disable=invalid-name
fields = line.split()
self._cm = cm
self.line = line
self.type = fields[1]
self.id = fields[2]
self.rsc = fields[3]
self.target = fields[4]
self.score = fields[5]
self.rsc_role = fields[6]
self.target_role = fields[7]
if self.rsc_role == "NA":
self.rsc_role = None
if self.target_role == "NA":
self.target_role = None
class PrimitiveAudit(ClusterAudit):
"""
Audit primitive resources to verify a variety of conditions.
Check that:
* Resources are active and managed only when expected
* Resources are active on the expected cluster node
* Resources are not orphaned
"""
def __init__(self, cm):
"""
Create a new PrimitiveAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PrimitiveAudit"
self._active_nodes = []
self._constraints = []
self._inactive_nodes = []
self._resources = []
self._target = None
def _audit_resource(self, resource, quorum):
"""Perform the audit of a single resource."""
rc = True
active = self._cm.resource_location(resource.id)
if len(active) == 1:
if quorum:
self.debug(f"Resource {resource.id} active on {active!r}")
elif resource.needs_quorum == 1:
self._cm.log(f"Resource {resource.id} active without quorum: {active!r}")
rc = False
elif not resource.managed:
self._cm.log(f"Resource {resource.id} not managed. Active on {active!r}")
elif not resource.unique:
# TODO: Figure out a clever way to actually audit these resource types
if len(active) > 1:
self.debug(f"Non-unique resource {resource.id} is active on: {active!r}")
else:
self.debug(f"Non-unique resource {resource.id} is not active")
elif len(active) > 1:
self._cm.log(f"Resource {resource.id} is active multiple times: {active!r}")
rc = False
elif resource.orphan:
self.debug(f"Resource {resource.id} is an inactive orphan")
elif not self._inactive_nodes:
self._cm.log(f"WARN: Resource {resource.id} not served anywhere")
rc = False
elif self._cm.env["warn-inactive"]:
if quorum or not resource.needs_quorum:
self._cm.log(f"WARN: Resource {resource.id} not served anywhere "
f"(Inactive nodes: {self._inactive_nodes!r})")
else:
self.debug(f"Resource {resource.id} not served anywhere "
f"(Inactive nodes: {self._inactive_nodes!r})")
elif quorum or not resource.needs_quorum:
self.debug(f"Resource {resource.id} not served anywhere "
f"(Inactive nodes: {self._inactive_nodes!r})")
return rc
def _setup(self):
"""
Verify cluster nodes are active.
Collect resource and colocation information used for performing the audit.
"""
for node in self._cm.env["nodes"]:
if self._cm.expected_status[node] == "up":
self._active_nodes.append(node)
else:
self._inactive_nodes.append(node)
for node in self._cm.env["nodes"]:
if self._target is None and self._cm.expected_status[node] == "up":
self._target = node
if not self._target:
# TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
# with CIB_file=/path/to/cib.xml even when the cluster isn't running
self.debug(f"No nodes active - skipping {self.name}")
return False
(_, lines) = self._cm.rsh(self._target, "crm_resource --list-cts",
verbose=1)
for line in lines:
if re.search("^Resource", line):
self._resources.append(AuditResource(self._cm, line))
elif re.search("^Constraint", line):
self._constraints.append(AuditConstraint(self._cm, line))
else:
self._cm.log(f"Unknown entry: {line}")
return True
def __call__(self):
"""Perform the audit action."""
passed = True
if not self._setup():
return passed
quorum = self._cm.has_quorum(None)
for resource in self._resources:
if resource.type == "primitive" and not self._audit_resource(resource, quorum):
passed = False
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
# if self._cm.name == "crm-corosync":
# return True
return False
class GroupAudit(PrimitiveAudit):
"""
Audit group resources.
Check that:
* Each of its child primitive resources is active on the expected cluster node
"""
def __init__(self, cm):
"""
Create a new GroupAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "GroupAudit"
def __call__(self):
passed = True
if not self._setup():
return passed
for group in self._resources:
if group.type != "group":
continue
first_match = True
group_location = None
for child in self._resources:
if child.parent != group.id:
continue
nodes = self._cm.resource_location(child.id)
if first_match and len(nodes) > 0:
group_location = nodes[0]
first_match = False
if len(nodes) > 1:
passed = False
self._cm.log(f"Child {child.id} of {group.id} is active more than once: {nodes!r}")
elif not nodes:
# Groups are allowed to be partially active
# However we do need to make sure later children aren't running
group_location = None
self.debug(f"Child {child.id} of {group.id} is stopped")
elif nodes[0] != group_location:
passed = False
self._cm.log(f"Child {child.id} of {group.id} is active on the wrong "
f"node ({nodes[0]}) expected {group_location}")
else:
self.debug(f"Child {child.id} of {group.id} is active on {nodes[0]}")
return passed
class CloneAudit(PrimitiveAudit):
"""
Audit clone resources.
NOTE: Currently, this class does not perform any actual audit functions.
"""
def __init__(self, cm):
"""
Create a new CloneAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "CloneAudit"
def __call__(self):
passed = True
if not self._setup():
return passed
for clone in self._resources:
if clone.type != "clone":
continue
for child in self._resources:
if child.parent == clone.id and child.type == "primitive":
self.debug(f"Checking child {child.id} of {clone.id}...")
# Check max and node_max
# Obtain with:
# crm_resource -g clone_max --meta -r child.id
# crm_resource -g clone_node_max --meta -r child.id
return passed
class ColocationAudit(PrimitiveAudit):
"""
Audit cluster resources.
Check that:
* Resources are colocated with the expected resource
"""
def __init__(self, cm):
"""
Create a new ColocationAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
PrimitiveAudit.__init__(self, cm)
self.name = "ColocationAudit"
def _crm_location(self, resource):
"""Return a list of cluster nodes where a given resource is running."""
(rc, lines) = self._cm.rsh(self._target,
f"crm_resource --locate -r {resource} -Q",
verbose=1)
hosts = []
if rc == 0:
for line in lines:
fields = line.split()
hosts.append(fields[0])
return hosts
def __call__(self):
passed = True
if not self._setup():
return passed
for coloc in self._constraints:
if coloc.type != "rsc_colocation":
continue
source = self._crm_location(coloc.rsc)
target = self._crm_location(coloc.target)
if not source:
self.debug(f"Colocation audit ({coloc.id}): {coloc.rsc} not running")
else:
for node in source:
if node not in target:
passed = False
self._cm.log(f"Colocation audit ({coloc.id}): {coloc.rsc} running "
f"on {node} (not in {target!r})")
else:
self.debug(f"Colocation audit ({coloc.id}): {coloc.rsc} running "
f"on {node} (in {target!r})")
return passed
class ControllerStateAudit(ClusterAudit):
"""Verify active and inactive resources."""
def __init__(self, cm):
"""
Create a new ControllerStateAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "ControllerStateAudit"
def __call__(self):
passed = True
up_are_down = 0
down_are_up = 0
unstable_list = []
for node in self._cm.env["nodes"]:
should_be = self._cm.expected_status[node]
rc = self._cm.test_node_cm(node)
if rc > 0:
if should_be == "down":
down_are_up += 1
if rc == 1:
unstable_list.append(node)
elif should_be == "up":
up_are_down += 1
if len(unstable_list) > 0:
passed = False
self._cm.log(f"Cluster is not stable: {len(unstable_list)} (of "
f"{self._cm.upcount()}): {unstable_list!r}")
if up_are_down > 0:
passed = False
self._cm.log(f"{up_are_down} (of {len(self._cm.env['nodes'])}) nodes "
"expected to be up were down.")
if down_are_up > 0:
passed = False
self._cm.log(f"{down_are_up} (of {len(self._cm.env['nodes'])}) nodes "
"expected to be down were up.")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
# if self._cm.name == "crm-corosync":
# return True
return False
class CIBAudit(ClusterAudit):
"""Audit the CIB by verifying that it is identical across cluster nodes."""
def __init__(self, cm):
"""
Create a new CIBAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "CibAudit"
def __call__(self):
passed = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
self.debug("\tNo partitions to audit")
return passed
for partition in ccm_partitions:
self.debug(f"\tAuditing CIB consistency for: {partition}")
if self._audit_cib_contents(partition) == 0:
passed = False
return passed
def _audit_cib_contents(self, hostlist):
"""Perform the CIB audit on the given hosts."""
passed = True
node0 = None
node0_xml = None
partition_hosts = hostlist.split()
for node in partition_hosts:
node_xml = self._store_remote_cib(node, node0)
if node_xml is None:
self._cm.log(f"Could not perform audit: No configuration from {node}")
passed = False
elif node0 is None:
node0 = node
node0_xml = node_xml
elif node0_xml is None:
self._cm.log(f"Could not perform audit: No configuration from {node0}")
passed = False
else:
(rc, result) = self._cm.rsh(
node0, f"crm_diff -VV -cf --new {node_xml} --original {node0_xml}", verbose=1)
if rc != 0:
self._cm.log(f"Diff between {node0_xml} and {node_xml} failed: {rc}")
passed = False
for line in result:
if not re.search("", line):
passed = False
self.debug(f"CibDiff[{node0}-{node}]: {line}")
else:
self.debug(f"CibDiff[{node0}-{node}] Ignoring: {line}")
return passed
def _store_remote_cib(self, node, target):
"""
Store a copy of the given node's CIB on the given target node.
If no target is given, store the CIB on the given node.
"""
filename = f"/tmp/ctsaudit.{node}.xml"
if not target:
target = node
(rc, lines) = self._cm.rsh(node, self._cm.templates["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None
self._cm.rsh("localhost", f"rm -f {filename}")
for line in lines:
self._cm.rsh("localhost", f"echo \'{line[:-1]}\' >> {filename}", verbose=0)
if self._cm.rsh.copy(filename, f"root@{target}:{filename}", silent=True) != 0:
self._cm.log("Could not store configuration")
return None
return filename
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
# if self._cm.name == "crm-corosync":
# return True
return False
class PartitionAudit(ClusterAudit):
"""
Audit each partition in a cluster to verify a variety of conditions.
Check that:
* The number of partitions and the nodes in each is as expected
* Each node is active when it should be active and inactive when it
should be inactive
* The status and epoch of each node is as expected
* A partition has quorum
* A partition has a DC when expected
"""
def __init__(self, cm):
"""
Create a new PartitionAudit instance.
Arguments:
cm -- A ClusterManager instance
"""
ClusterAudit.__init__(self, cm)
self.name = "PartitionAudit"
self._node_epoch = {}
self._node_state = {}
self._node_quorum = {}
def __call__(self):
passed = True
ccm_partitions = self._cm.find_partitions()
if not ccm_partitions:
return passed
self._cm.cluster_stable(double_check=True)
if len(ccm_partitions) != self._cm.partitions_expected:
self._cm.log(f"ERROR: {len(ccm_partitions)} cluster partitions detected:")
passed = False
for partition in ccm_partitions:
self._cm.log(f"\t {partition}")
for partition in ccm_partitions:
if self._audit_partition(partition) == 0:
passed = False
return passed
def _trim_string(self, avalue):
"""Remove the last character from a multi-character string."""
if not avalue:
return None
if len(avalue) > 1:
return avalue[:-1]
return avalue
def _trim2int(self, avalue):
"""Remove the last character from a multi-character string and convert the result to an int."""
trimmed = self._trim_string(avalue)
if trimmed:
return int(trimmed)
return None
def _audit_partition(self, partition):
"""Perform the audit of a single partition."""
passed = True
dc_found = []
dc_allowed_list = []
lowest_epoch = None
node_list = partition.split()
self.debug(f"Auditing partition: {partition}")
for node in node_list:
if self._cm.expected_status[node] != "up":
self._cm.log(f"Warn: Node {node} appeared out of nowhere")
self._cm.expected_status[node] = "up"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
(_, out) = self._cm.rsh(node, self._cm.templates["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm.templates["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()
(_, out) = self._cm.rsh(node, self._cm.templates["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()
self.debug(f"Node {node}: {self._node_state[node]} - {self._node_epoch[node]} - {self._node_quorum[node]}.")
self._node_state[node] = self._trim_string(self._node_state[node])
self._node_epoch[node] = self._trim2int(self._node_epoch[node])
self._node_quorum[node] = self._trim_string(self._node_quorum[node])
if not self._node_epoch[node]:
self._cm.log(f"Warn: Node {node} disappeared: can't determine epoch")
self._cm.expected_status[node] = "down"
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)
elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
lowest_epoch = self._node_epoch[node]
if not lowest_epoch:
self._cm.log(f"Lowest epoch not determined in {partition}")
passed = False
for node in node_list:
if self._cm.expected_status[node] != "up":
continue
if self._cm.is_node_dc(node, self._node_state[node]):
dc_found.append(node)
if self._node_epoch[node] == lowest_epoch:
self.debug(f"{node}: OK")
elif not self._node_epoch[node]:
self.debug(f"Check on {node} ignored: no node epoch")
elif not lowest_epoch:
self.debug(f"Check on {node} ignored: no lowest epoch")
else:
self._cm.log(f"DC {node} is not the oldest node "
f"({self._node_epoch[node]} vs. {lowest_epoch})")
passed = False
if not dc_found:
self._cm.log(f"DC not found on any of the {len(dc_allowed_list)} allowed "
f"nodes: {dc_allowed_list} (of {node_list})")
elif len(dc_found) > 1:
self._cm.log(f"{len(dc_found)} DCs ({dc_found}) found in cluster partition: {node_list}")
passed = False
if not passed:
for node in node_list:
if self._cm.expected_status[node] == "up":
self._cm.log(f"epoch {self._node_epoch[node]} : {self._node_state[node]}")
return passed
def is_applicable(self):
"""Return True if this audit is applicable in the current test configuration."""
# @TODO Due to long-ago refactoring, this name test would never match,
# so this audit (and those derived from it) would never run.
# Uncommenting the next lines fixes the name test, but that then
# exposes pre-existing bugs that need to be fixed.
# if self._cm.name == "crm-corosync":
# return True
return False
# pylint: disable=invalid-name
def audit_list(cm):
"""Return a list of instances of applicable audits that can be performed."""
result = []
for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
ColocationAudit, CIBAudit]:
a = auditclass(cm)
if a.is_applicable():
result.append(a)
return result
diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
index 151ae4c662..14ca157651 100644
--- a/python/pacemaker/_cts/environment.py
+++ b/python/pacemaker/_cts/environment.py
@@ -1,500 +1,496 @@
"""Test environment classes for Pacemaker's Cluster Test Suite (CTS)."""
__all__ = ["EnvFactory", "set_cts_path"]
__copyright__ = "Copyright 2014-2025 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
import argparse
from contextlib import suppress
from glob import glob
import os
import random
import shlex
import socket
import sys
from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogKind
class Environment:
"""
A class for managing the CTS environment.
This consists largely of processing and storing command line parameters.
"""
# pylint doesn't understand that self._rsh is callable (it stores the
# singleton instance of RemoteExec, as returned by the getInstance method
# of RemoteFactory).
# @TODO See if type annotations fix this.
# I think we could also fix this by getting rid of the getInstance methods,
# but that's a project for another day. For now, just disable the warning.
# pylint: disable=not-callable
def __init__(self, args):
"""
Create a new Environment instance.
This class can be treated kind of like a dictionary due to the presence
of typical dict functions like __contains__, __getitem__, and __setitem__.
However, it is not a dictionary so do not rely on standard dictionary
behavior.
Arguments:
args -- A list of command line parameters, minus the program name.
If None, sys.argv will be used.
"""
self.data = {}
# Set some defaults before processing command line arguments. These are
# either not set by any command line parameter, or they need a default
# that can't be set in add_argument.
self["DeadTime"] = 300
self["StartTime"] = 300
self["StableTime"] = 30
self["tests"] = []
self["DoFencing"] = True
self["CIBResource"] = False
self["log_kind"] = None
self["scenario"] = "random"
+ self["syslog_facility"] = "daemon"
# Hard-coded since there is only one supported cluster manager/stack
self["Name"] = "crm-corosync"
self["Stack"] = "corosync 2+"
self.random_gen = random.Random()
self._logger = LogFactory()
self._rsh = RemoteFactory().getInstance()
self._parse_args(args)
if not self["ListTests"]:
self._validate()
self._discover()
def dump(self):
"""Print the current environment."""
for key in sorted(self.data.keys()):
self._logger.debug(f"{f'Environment[{key}]':35}: {str(self[key])}")
def __contains__(self, key):
"""Return True if the given key exists in the environment."""
return key in self.data
def __getitem__(self, key):
"""Return the given environment key, or None if it does not exist."""
return self.data.get(key)
def __setitem__(self, key, value):
"""Set the given environment key to the given value, overriding any previous value."""
if key == "nodes":
self.data["nodes"] = []
for node in value:
node = node.strip()
# I don't think I need the IP address, etc. but this validates
# the node name against /etc/hosts and/or DNS, so it's a
# GoodThing(tm).
try:
# @TODO This only handles IPv4, use getaddrinfo() instead
# (here and in _discover())
socket.gethostbyname_ex(node)
self.data["nodes"].append(node)
except socket.herror:
self._logger.log(f"{node} not found in DNS... aborting")
raise
else:
self.data[key] = value
def random_node(self):
"""Choose a random node from the cluster."""
return self.random_gen.choice(self["nodes"])
def _detect_systemd(self, node):
"""Detect whether systemd is in use on the target node."""
if "have_systemd" not in self.data:
(rc, _) = self._rsh(node, "systemctl list-units", verbose=0)
self["have_systemd"] = rc == 0
def _detect_syslog(self, node):
"""Detect the syslog variant in use on the target node (if any)."""
if "syslogd" in self.data:
return
if self["have_systemd"]:
# Systemd
(_, lines) = self._rsh(node, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
else:
# SYS-V
(_, lines) = self._rsh(node, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
with suppress(IndexError):
self["syslogd"] = lines[0].strip()
def disable_service(self, node, service):
"""Disable the given service on the given node."""
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, f"systemctl disable {service}")
return rc
# SYS-V
(rc, _) = self._rsh(node, f"chkconfig {service} off")
return rc
def enable_service(self, node, service):
"""Enable the given service on the given node."""
if self["have_systemd"]:
# Systemd
(rc, _) = self._rsh(node, f"systemctl enable {service}")
return rc
# SYS-V
(rc, _) = self._rsh(node, f"chkconfig {service} on")
return rc
def service_is_enabled(self, node, service):
"""Return True if the given service is enabled on the given node."""
if self["have_systemd"]:
# Systemd
# With "systemctl is-enabled", we should check if the service is
# explicitly "enabled" instead of the return code. For example it returns
# 0 if the service is "static" or "indirect", but they don't really count
# as "enabled".
(rc, _) = self._rsh(node, f"systemctl is-enabled {service} | grep enabled")
return rc == 0
# SYS-V
(rc, _) = self._rsh(node, f"chkconfig --list | grep -e {service}.*on")
return rc == 0
def _detect_at_boot(self, node):
"""Detect if the cluster starts at boot."""
self["at-boot"] = any(self.service_is_enabled(node, service)
for service in ("pacemaker", "corosync"))
def _detect_ip_offset(self, node):
"""Detect the offset for IPaddr resources."""
if self["CIBResource"] and "IPBase" not in self.data:
(_, lines) = self._rsh(node, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
network = lines[0].strip()
(_, lines) = self._rsh(node, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
try:
self["IPBase"] = lines[0].strip()
except (IndexError, TypeError):
self["IPBase"] = None
if not self["IPBase"]:
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Could not determine an offset for IPaddr resources. Perhaps nmap is not installed on the nodes.")
self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""")
return
last_part = self["IPBase"].split('.')[3]
if int(last_part) >= 240:
self._logger.log(f"Could not determine an offset for IPaddr resources. Upper bound is too high: {self['IPBase']} {last_part}")
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log(f"""Defaulting to '{self["IPBase"]}', use --test-ip-base to override""")
def _validate(self):
"""Check that we were given all required command line parameters."""
if not self["nodes"]:
raise ValueError("No nodes specified!")
def _discover(self):
"""Probe cluster nodes to figure out how to log and manage services."""
exerciser = socket.gethostname()
# Use the IP where possible to avoid name lookup failures
for ip in socket.gethostbyname_ex(exerciser)[2]:
if ip != "127.0.0.1":
exerciser = ip
break
self["cts-exerciser"] = exerciser
node = self["nodes"][0]
self._detect_systemd(node)
self._detect_syslog(node)
self._detect_at_boot(node)
self._detect_ip_offset(node)
def _parse_args(self, argv):
"""
Parse and validate command line parameters.
Set the appropriate values in the environment dictionary. If argv is
None, use sys.argv instead.
"""
if not argv:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(epilog=f"{sys.argv[0]} -g virt1 -r --stonith ssh --schema pacemaker-2.0 500")
grp1 = parser.add_argument_group("Common options")
grp1.add_argument("--benchmark",
action="store_true",
help="Add timing information")
grp1.add_argument("--list", "--list-tests",
action="store_true", dest="list_tests",
help="List the valid tests")
grp1.add_argument("--nodes",
default="",
metavar="NODES",
help="List of cluster nodes separated by whitespace")
grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
grp2.add_argument("-L", "--logfile",
metavar="PATH",
help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)")
- grp2.add_argument("--facility", "--syslog-facility",
- default="daemon",
- metavar="NAME",
- help="Which syslog facility to log to")
grp2.add_argument("--ip", "--test-ip-base",
metavar="IP",
help="Offset for generated IP address resources")
grp3 = parser.add_argument_group("Options for release testing")
grp3.add_argument("-r", "--populate-resources",
action="store_true",
help="Generate a sample configuration")
grp3.add_argument("--choose",
metavar="NAME",
help="Run only the named tests, separated by whitespace")
grp3.add_argument("--fencing", "--stonith",
choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
default="1",
help="What fencing agent to use")
grp3.add_argument("--once",
action="store_true",
help="Run all valid tests once")
grp4 = parser.add_argument_group("Additional (less common) options")
grp4.add_argument("-c", "--clobber-cib",
action="store_true",
help="Erase any existing configuration")
grp4.add_argument("-y", "--yes",
action="store_true", dest="always_continue",
help="Continue to run whenever prompted")
grp4.add_argument("--boot",
action="store_true",
help="")
grp4.add_argument("--cib-filename",
metavar="PATH",
help="Install the given CIB file to the cluster")
grp4.add_argument("--experimental-tests",
action="store_true",
help="Include experimental tests")
grp4.add_argument("--no-unsafe-tests",
action="store_true",
help="Don't run tests that are unsafe for use with ocfs2/drbd")
grp4.add_argument("--notification-agent",
metavar="PATH",
default="/var/lib/pacemaker/notify.sh",
help="Script to configure for Pacemaker alerts")
grp4.add_argument("--notification-recipient",
metavar="R",
default="/var/lib/pacemaker/notify.log",
help="Recipient to pass to alert script")
grp4.add_argument("--oprofile",
default="",
metavar="NODES",
help="List of cluster nodes to run oprofile on")
grp4.add_argument("--outputfile",
metavar="PATH",
help="Location to write logs to")
grp4.add_argument("--schema",
metavar="SCHEMA",
default=f"pacemaker-{BuildOptions.CIB_SCHEMA_VERSION}",
help="Create a CIB conforming to the given schema")
grp4.add_argument("--seed",
metavar="SEED",
help="Use the given string as the random number seed")
grp4.add_argument("--stonith-args",
metavar="ARGS",
default="hostlist=all,livedangerously=yes",
help="")
grp4.add_argument("--stonith-type",
metavar="TYPE",
default="external/ssh",
help="")
grp4.add_argument("--trunc",
action="store_true", dest="truncate",
help="Truncate log file before starting")
grp4.add_argument("--warn-inactive",
action="store_true",
help="Warn if a resource is assigned to an inactive node")
parser.add_argument("iterations",
nargs='?',
type=int, default=1,
help="Number of tests to run")
args = parser.parse_args(args=argv)
# Set values on this object based on what happened with command line
# processing. This has to be done in several blocks.
# These values can always be set. Most get a default from the add_argument
# calls, they only do one thing, and they do not have any side effects.
self["CIBfilename"] = args.cib_filename if args.cib_filename else None
self["ClobberCIB"] = args.clobber_cib
self["ListTests"] = args.list_tests
self["Schema"] = args.schema
- self["SyslogFacility"] = args.facility
self["TruncateLog"] = args.truncate
self["benchmark"] = args.benchmark
self["continue"] = args.always_continue
self["experimental-tests"] = args.experimental_tests
self["iterations"] = args.iterations
self["nodes"] = shlex.split(args.nodes)
self["notification-agent"] = args.notification_agent
self["notification-recipient"] = args.notification_recipient
self["oprofile"] = shlex.split(args.oprofile)
self["stonith-params"] = args.stonith_args
self["stonith-type"] = args.stonith_type
self["unsafe-tests"] = not args.no_unsafe_tests
self["warn-inactive"] = args.warn_inactive
# Everything else either can't have a default set in an add_argument
# call (likely because we don't want to always have a value set for it)
# or it does something fancier than just set a single value. However,
# order does not matter for these as long as the user doesn't provide
# conflicting arguments on the command line. So just do Everything
# alphabetically.
if args.boot:
self["scenario"] = "boot"
if args.choose:
self["scenario"] = "sequence"
self["tests"].extend(shlex.split(args.choose))
self["iterations"] = len(self["tests"])
if args.fencing in ["0", "no"]:
self["DoFencing"] = False
elif args.fencing in ["rhcs", "virt", "xvm"]:
self["stonith-type"] = "fence_xvm"
elif args.fencing == "scsi":
self["stonith-type"] = "fence_scsi"
elif args.fencing in ["lha", "ssh"]:
self["stonith-params"] = "hostlist=all,livedangerously=yes"
self["stonith-type"] = "external/ssh"
elif args.fencing == "openstack":
self["stonith-type"] = "fence_openstack"
print("Obtaining OpenStack credentials from the current environment")
region = os.environ['OS_REGION_NAME']
tenant = os.environ['OS_TENANT_NAME']
auth = os.environ['OS_AUTH_URL']
user = os.environ['OS_USERNAME']
password = os.environ['OS_PASSWORD']
self["stonith-params"] = f"region={region},tenant={tenant},auth={auth},user={user},password={password}"
elif args.fencing == "rhevm":
self["stonith-type"] = "fence_rhevm"
print("Obtaining RHEV-M credentials from the current environment")
user = os.environ['RHEVM_USERNAME']
password = os.environ['RHEVM_PASSWORD']
server = os.environ['RHEVM_SERVER']
port = os.environ['RHEVM_PORT']
self["stonith-params"] = f"login={user},passwd={password},ipaddr={server},ipport={port},ssl=1,shell_timeout=10"
if args.ip:
self["CIBResource"] = True
self["ClobberCIB"] = True
self["IPBase"] = args.ip
if args.logfile == "journal":
self["LogAuditDisabled"] = True
self["log_kind"] = LogKind.JOURNAL
elif args.logfile:
self["LogAuditDisabled"] = True
self["LogFileName"] = args.logfile
self["log_kind"] = LogKind.REMOTE_FILE
else:
# We can't set this as the default on the parser.add_argument call
# for this option because then args.logfile will be set, which means
# the above branch will be taken and those other values will also be
# set.
self["LogFileName"] = "/var/log/messages"
if args.once:
self["scenario"] = "all-once"
if args.outputfile:
self["OutputFile"] = args.outputfile
LogFactory().add_file(self["OutputFile"])
if args.populate_resources:
self["CIBResource"] = True
self["ClobberCIB"] = True
self.random_gen.seed(args.seed)
class EnvFactory:
"""A class for constructing a singleton instance of an Environment object."""
instance = None
# pylint: disable=invalid-name
def getInstance(self, args=None):
"""
Return the previously created instance of Environment.
If no instance exists, create a new instance and return that.
"""
if not EnvFactory.instance:
EnvFactory.instance = Environment(args)
return EnvFactory.instance
def set_cts_path(extra=None):
"""Set the PATH environment variable appropriately for the tests."""
new_path = os.environ['PATH']
# Add any search paths given on the command line
if extra is not None:
for p in extra:
new_path = f"{p}:{new_path}"
cwd = os.getcwd()
if os.path.exists(f"{cwd}/cts/cts-attrd.in"):
# pylint: disable=protected-access
print(f"Running tests from the source tree: {BuildOptions._BUILD_DIR}")
for d in glob(f"{BuildOptions._BUILD_DIR}/daemons/*/"):
new_path = f"{d}:{new_path}"
new_path = f"{BuildOptions._BUILD_DIR}/tools:{new_path}"
new_path = f"{BuildOptions._BUILD_DIR}/cts/support:{new_path}"
print(f"Using local schemas from: {cwd}/xml")
os.environ["PCMK_schema_directory"] = f"{cwd}/xml"
else:
print(f"Running tests from the install tree: {BuildOptions.DAEMON_DIR} (not {cwd})")
new_path = f"{BuildOptions.DAEMON_DIR}:{new_path}"
os.environ["PCMK_schema_directory"] = BuildOptions.SCHEMA_DIR
print(f'Using PATH="{new_path}"')
os.environ['PATH'] = new_path