diff --git a/python/pacemaker/_cts/remote.py b/python/pacemaker/_cts/remote.py index 9605a139c1..2a917e8dc3 100644 --- a/python/pacemaker/_cts/remote.py +++ b/python/pacemaker/_cts/remote.py @@ -1,282 +1,282 @@ """Remote command runner for Pacemaker's Cluster Test Suite (CTS).""" __all__ = ["RemoteExec", "RemoteFactory"] __copyright__ = "Copyright 2014-2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import re import os from subprocess import Popen, PIPE from threading import Thread from pacemaker._cts.logging import LogFactory def convert2string(lines): """ Convert byte strings to UTF-8 strings. Lists of byte strings are converted to a list of UTF-8 strings. All other text formats are passed through. """ if isinstance(lines, bytes): return lines.decode("utf-8") if isinstance(lines, list): lst = [] for line in lines: if isinstance(line, bytes): line = line.decode("utf-8") lst.append(line) return lst return lines class AsyncCmd(Thread): """A class for doing the hard work of running a command on another machine.""" def __init__(self, node, command, proc=None, delegate=None): """ Create a new AsyncCmd instance. Arguments: node -- The remote machine to run on command -- The ssh command string to use for remote execution proc -- If not None, a process object previously created with Popen. Instead of spawning a new process, we will then wait on this process to finish and handle its output. delegate -- When the command completes, call the async_complete method on this object """ self._command = command self._delegate = delegate self._logger = LogFactory() self._node = node self._proc = proc Thread.__init__(self) def run(self): """Run the previously instantiated AsyncCmd object.""" out = None err = None if not self._proc: # pylint: disable=consider-using-with self._proc = Popen(self._command, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True) self._logger.debug(f"cmd: async: target={self._node}, pid={self._proc.pid}: {self._command}") self._proc.wait() if self._delegate: self._logger.debug(f"cmd: pid {self._proc.pid} returned {self._proc.returncode} to {self._delegate!r}") else: self._logger.debug(f"cmd: pid {self._proc.pid} returned {self._proc.returncode}") if self._proc.stderr: err = self._proc.stderr.readlines() self._proc.stderr.close() for line in err: self._logger.debug(f"cmd: stderr[{self._proc.pid}]: {line}") err = convert2string(err) if self._proc.stdout: out = self._proc.stdout.readlines() self._proc.stdout.close() out = convert2string(out) if self._delegate: self._delegate.async_complete(self._proc.pid, self._proc.returncode, out, err) class RemoteExec: """ An abstract class for remote execution. It runs a command on another machine using ssh and scp. """ def __init__(self, command, cp_command, silent=False): """ Create a new RemoteExec instance. Arguments: command -- The ssh command string to use for remote execution cp_command -- The scp command string to use for copying files silent -- Should we log command status? """ - self._command = command - self._cp_command = cp_command + self.command = command + self.cp_command = cp_command self._logger = LogFactory() self._silent = silent self._our_node = os.uname()[1].lower() def _fixcmd(self, cmd): """Perform shell escapes on certain characters in the input cmd string.""" return re.sub("\'", "'\\''", cmd) def _cmd(self, args): """Given a list of arguments, return the string that will be run on the remote system.""" sysname = args[0] command = args[1] if sysname is None or sysname.lower() in [self._our_node, "localhost"]: ret = command else: - ret = f"{self._command} {sysname} '{self._fixcmd(command)}'" + ret = f"{self.command} {sysname} '{self._fixcmd(command)}'" return ret def _log(self, args): """Log a message.""" if not self._silent: self._logger.log(args) def _debug(self, args): """Log a message at the debug level.""" if not self._silent: self._logger.debug(args) def call_async(self, node, command, delegate=None): """ Run the given command on the given remote system and do not wait for it to complete. Arguments: node -- The remote machine to run on command -- The command to run, as a string delegate -- When the command completes, call the async_complete method on this object Returns the running process object. """ aproc = AsyncCmd(node, self._cmd([node, command]), delegate=delegate) aproc.start() return aproc def __call__(self, node, command, synchronous=True, verbose=2): """ Run the given command on the given remote system. If you call this class like a function, this is what gets called. It's approximately the same as a system() call on the remote machine. Arguments: node -- The remote machine to run on command -- The command to run, as a string synchronous -- Should we wait for the command to complete? verbose -- If 0, do not log anything. If 1, log the command and its return code but not its output. If 2, additionally log command output. Returns a tuple of (return code, command output). """ rc = 0 result = None # pylint: disable=consider-using-with proc = Popen(self._cmd([node, command]), stdout=PIPE, stderr=PIPE, close_fds=True, shell=True) if not synchronous and proc.pid > 0 and not self._silent: aproc = AsyncCmd(node, command, proc=proc) aproc.start() return (rc, result) if proc.stdout: result = proc.stdout.readlines() proc.stdout.close() else: self._log("No stdout stream") rc = proc.wait() if verbose > 0: self._debug(f"cmd: target={node}, rc={rc}: {command}") result = convert2string(result) if proc.stderr: errors = proc.stderr.readlines() proc.stderr.close() for err in errors: self._debug(f"cmd: stderr: {err}") if verbose == 2: for line in result: self._debug(f"cmd: stdout: {line}") return (rc, result) def copy(self, source, target, silent=False): """ Perform a copy of the source file to the remote target. This function uses the cp_command provided when the RemoteExec object was created. Returns the return code of the cp_command. """ # @TODO Use subprocess module with argument array instead - # (self._cp_command should be an array too) - cmd = f"{self._cp_command} '{source}' '{target}'" + # (self.cp_command should be an array too) + cmd = f"{self.cp_command} '{source}' '{target}'" rc = os.system(cmd) if not silent: self._debug(f"cmd: rc={rc}: {cmd}") return rc def exists_on_all(self, filename, hosts): """Return True if specified file exists on all specified hosts.""" for host in hosts: (rc, _) = self(host, f"test -r {filename}") if rc != 0: return False return True def exists_on_none(self, filename, hosts): """Return True if specified file does not exist on any specified host.""" for host in hosts: (rc, _) = self(host, f"test -r {filename}") if rc == 0: return False return True class RemoteFactory: """A class for constructing a singleton instance of a RemoteExec object.""" # Class variables # -n: no stdin, -x: no X11, # -o ServerAliveInterval=5: disconnect after 3*5s if the server # stops responding command = ("ssh -l root -n -x -o ServerAliveInterval=5 " "-o ConnectTimeout=10 -o TCPKeepAlive=yes " "-o ServerAliveCountMax=3 ") # -B: batch mode, -q: no stats (quiet) cp_command = "scp -B -q" instance = None # pylint: disable=invalid-name def getInstance(self): """ Return the previously created instance of RemoteExec. If no instance exists, create one and then return that. """ if not RemoteFactory.instance: RemoteFactory.instance = RemoteExec(RemoteFactory.command, RemoteFactory.cp_command, False) return RemoteFactory.instance diff --git a/python/pacemaker/_cts/tests/cibsecret.py b/python/pacemaker/_cts/tests/cibsecret.py index 20bc5564f9..9abf97a9a0 100644 --- a/python/pacemaker/_cts/tests/cibsecret.py +++ b/python/pacemaker/_cts/tests/cibsecret.py @@ -1,231 +1,251 @@ """Test managing secrets with cibsecret.""" __all__ = ["CibsecretTest"] __copyright__ = "Copyright 2025 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" +from pacemaker.exitstatus import ExitStatus from pacemaker._cts.tests.ctstest import CTSTest from pacemaker._cts.tests.simulstartlite import SimulStartLite from pacemaker._cts.timer import Timer # Disable various pylint warnings that occur in so many places throughout this # file it's easiest to just take care of them globally. This does introduce the # possibility that we'll miss some other cause of the same warning, but we'll # just have to be careful. # pylint doesn't understand that self._env is subscriptable. # pylint: disable=unsubscriptable-object # pylint doesn't understand that self._rsh is callable. # pylint: disable=not-callable # This comes from include/config.h as private API, assuming pacemaker is built # with cibsecrets support. I don't want to expose this value publically, at # least not until we default to including cibsecrets, so it's just set here # for now. SECRETS_DIR = "/var/lib/pacemaker/lrm/secrets" class CibsecretTest(CTSTest): """Test managing secrets with cibsecret.""" def __init__(self, cm): """ Create a new CibsecretTest instance. Arguments: cm -- A ClusterManager instance """ CTSTest.__init__(self, cm) self.name = "Cibsecret" self._secret = "passwd" self._secret_val = "SecreT_PASS" self._rid = "secretDummy" self._startall = SimulStartLite(cm) def _insert_dummy(self, node): """Create a dummy resource on the given node.""" pats = [ f"{node}.*" + (self._cm.templates["Pat:RscOpOK"] % ("start", self._rid)) ] watch = self.create_watch(pats, 60) watch.set_watch() self._cm.add_dummy_rsc(node, self._rid) with Timer(self._logger, self.name, "addDummy"): watch.look_for_all() if watch.unmatched: self.debug("Failed to find patterns when adding dummy resource") return repr(watch.unmatched) return "" def _check_cib_value(self, node, expected): """Check that the secret has the expected value.""" (rc, lines) = self._rsh(node, f"crm_resource -r {self._rid} -g {self._secret}", verbose=1) s = " ".join(lines).strip() if rc != 0 or s != expected: return self.failure(f"Secret set to '{s}' in CIB, not '{expected}'") # This is self.success, except without incrementing the success counter return True def _test_check(self, node): """Test the 'cibsecret check' subcommand.""" (rc, _) = self._rsh(node, f"cibsecret check {self._rid} {self._secret}", verbose=1) if rc != 0: return self.failure("Failed to check secret") # This is self.success, except without incrementing the success counter return True def _test_delete(self, node): """Test the 'cibsecret delete' subcommand.""" (rc, _) = self._rsh(node, f"cibsecret delete {self._rid} {self._secret}", verbose=1) if rc != 0: return self.failure("Failed to delete secret") # This is self.success, except without incrementing the success counter return True def _test_get(self, node, expected): """Test the 'cibsecret get' subcommand.""" (rc, lines) = self._rsh(node, f"cibsecret get {self._rid} {self._secret}", verbose=1) s = " ".join(lines).strip() if rc != 0 or s != expected: return self.failure(f"Secret set to '{s}' in local file, not '{expected}'") # This is self.success, except without incrementing the success counter return True def _test_set(self, node): """Test the 'cibsecret set' subcommand.""" (rc, _) = self._rsh(node, f"cibsecret set {self._rid} {self._secret} {self._secret_val}", verbose=1) if rc != 0: return self.failure("Failed to set secret") # This is self.success, except without incrementing the success counter return True def _test_stash(self, node): """Test the 'cibsecret stash' subcommand.""" (rc, _) = self._rsh(node, f"cibsecret stash {self._rid} {self._secret}", verbose=1) if rc != 0: return self.failure(f"Failed to stash secret {self._secret}") # This is self.success, except without incrementing the success counter return True def _test_sync(self, node): """Test the 'cibsecret sync' subcommand.""" (rc, _) = self._rsh(node, "cibsecret sync", verbose=1) if rc != 0: return self.failure("Failed to sync secrets") # This is self.success, except without incrementing the success counter return True def _test_unstash(self, node): """Test the 'cibsecret unstash' subcommand.""" (rc, _) = self._rsh(node, f"cibsecret unstash {self._rid} {self._secret}", verbose=1) if rc != 0: return self.failure(f"Failed to unstash secret {self._secret}") # This is self.success, except without incrementing the success counter return True def _test_secrets_removed(self): """Verify that the secret and its checksum file has been removed.""" f = f"{SECRETS_DIR}/{self._rid}/{self._secret}" if not self._rsh.exists_on_none(f, self._env["nodes"]): return self.failure(f"{f} not deleted from all hosts") f = f"{SECRETS_DIR}/{self._rid}/{self._secret}.sign" if not self._rsh.exists_on_none(f, self._env["nodes"]): return self.failure(f"{f} not deleted from all hosts") return True # @TODO: Two improvements that could be made to this test: # # (1) Add a test for the 'cibsecret sync' command. This requires modifying # the test so it brings down one node before creating secrets, then # bringing the node back up, running 'cibsecret sync', and verifying the # secrets are copied over. All of this is possible with ctslab, it's # just kind of a lot of code. # # (2) Add some tests for failure cases like trying to stash a value that's # already secret, etc. def __call__(self, node): """Perform this test.""" self.incr("calls") ret = self._startall(None) if not ret: return self.failure("Start all nodes failed") ret = self._insert_dummy(node) if ret != "": return self.failure(ret) # Test setting a new secret, verifying its value in both the CIB and # the local store on each node. if not self._test_set(node): return False if not self._check_cib_value(node, "lrm://"): return False for n in self._env["nodes"]: if not self._test_get(n, self._secret_val): return False # Test checking the secret on each node. for n in self._env["nodes"]: if not self._test_check(n): return False # Test moving the secret into the CIB, but now we can only verify that # its value in the CIB is correct since it's no longer a secret. We # can also verify that it's been removed from the local store everywhere. if not self._test_unstash(node): return False if not self._check_cib_value(node, self._secret_val): return False self._test_secrets_removed() # Test moving the secret back out of the CIB, again verifying its # value in both places. if not self._test_stash(node): return False if not self._check_cib_value(node, "lrm://"): return False for n in self._env["nodes"]: if not self._test_get(n, self._secret_val): return False # Delete the secret if not self._test_delete(node): return False self._test_secrets_removed() return self.success() @property def errors_to_ignore(self): return [r"Reloading .* \(agent\)"] + + def is_applicable(self): + # This test requires that the node it runs on can ssh into the other + # nodes without a password. Testing every combination is probably + # overkill (and will slow down `cts-lab --list-tests`), so here we're + # just going to test that the first node can ssh into the others. + if len(self._cm.env["nodes"]) < 2: + return False + + node = self._cm.env["nodes"][0] + other = self._cm.env["nodes"][1:] + + for o in other: + (rc, _) = self._cm.rsh(node, f"{self._cm.rsh.command} {o} exit", + verbose=0) + if rc != ExitStatus.OK: + return False + + return True