diff --git a/cts/CTS.py.in b/cts/CTS.py.in index c4183183e6..091bb1f8c1 100644 --- a/cts/CTS.py.in +++ b/cts/CTS.py.in @@ -1,939 +1,939 @@ """ Main classes for Pacemaker's Cluster Test Suite (CTS) """ # Pacemaker targets compatibility with Python 2.7 and 3.2+ from __future__ import print_function, unicode_literals, absolute_import, division __copyright__ = "Copyright 2000-2018 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import os import re import sys import time import traceback if sys.version_info > (3,): from collections import UserDict else: from UserDict import UserDict from cts.CTSvars import * from cts.logging import LogFactory from cts.watcher import LogWatcher from cts.remote import RemoteFactory, input_wrapper from cts.environment import EnvFactory from cts.patterns import PatternSelector has_log_stats = {} log_stats_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_stats.sh" log_stats = """ #!@BASH_PATH@ # Tool for generating system load reports while CTS runs trap "" 1 f=$1; shift action=$1; shift base=`basename $0` if [ ! -e $f ]; then echo "Time, Load 1, Load 5, Load 15, Test Marker" > $f fi function killpid() { if [ -e $f.pid ]; then kill -9 `cat $f.pid` rm -f $f.pid fi } function status() { if [ -e $f.pid ]; then kill -0 `cat $f.pid` return $? else return 1 fi } function start() { # Is it already running? if status then return fi echo Active as $$ echo $$ > $f.pid while [ 1 = 1 ]; do uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f #top -b -c -n1 | grep -e usr/libexec/pacemaker | grep -v -e grep -e python | head -n 1 | sed s@/usr/libexec/pacemaker/@@ | awk '{print " 0, "$9", "$10", "$12}' | tr '\\n' ',' >> $f echo 0 >> $f sleep 5 done } case $action in start) start ;; start-bg|bg) # Use c --ssh -- ./stats.sh file start-bg nohup $0 $f start >/dev/null 2>&1 > $f echo " $*" >> $f start ;; *) echo "Unknown action: $action." ;; esac """ class CtsLab(object): '''This class defines the Lab Environment for the Cluster Test System. It defines those things which are expected to change from test environment to test environment for the same cluster manager. It is where you define the set of nodes that are in your test lab what kind of reset mechanism you use, etc. This class is derived from a UserDict because we hold many different parameters of different kinds, and this provides a uniform and extensible interface useful for any kind of communication between the user/administrator/tester and CTS. At this point in time, it is the intent of this class to model static configuration and/or environmental data about the environment which doesn't change as the tests proceed. Well-known names (keys) are an important concept in this class. The HasMinimalKeys member function knows the minimal set of well-known names for the class. The following names are standard (well-known) at this time: nodes An array of the nodes in the cluster reset A ResetMechanism object logger An array of objects that log strings... CMclass The type of ClusterManager we are running (This is a class object, not a class instance) RandSeed Random seed. It is a triple of bytes. (optional) The CTS code ignores names it doesn't know about/need. The individual tests have access to this information, and it is perfectly acceptable to provide hints, tweaks, fine-tuning directions or other information to the tests through this mechanism. ''' def __init__(self, args=None): self.Env = EnvFactory().getInstance(args) self.Scenario = None self.logger = LogFactory() self.rsh = RemoteFactory().getInstance() def dump(self): self.Env.dump() def has_key(self, key): return key in list(self.Env.keys()) def __getitem__(self, key): return self.Env[key] def __setitem__(self, key, value): self.Env[key] = value def run(self, Scenario, Iterations): if not Scenario: self.logger.log("No scenario was defined") return 1 self.logger.log("Cluster nodes: ") for node in self.Env["nodes"]: self.logger.log(" * %s" % (node)) if not Scenario.SetUp(): return 1 try : Scenario.run(Iterations) except : self.logger.log("Exception by %s" % sys.exc_info()[0]) self.logger.traceback(traceback) Scenario.summarize() Scenario.TearDown() return 1 #ClusterManager.oprofileSave(Iterations) Scenario.TearDown() Scenario.summarize() if Scenario.Stats["failure"] > 0: return Scenario.Stats["failure"] elif Scenario.Stats["success"] != Iterations: self.logger.log("No failure count but success != requested iterations") return 1 return 0 def __CheckNode(self, node): "Raise a ValueError if the given node isn't valid" if not self.IsValidNode(node): raise ValueError("Invalid node [%s] in CheckNode" % node) class NodeStatus(object): def __init__(self, env): self.Env = env def IsNodeBooted(self, node): '''Return TRUE if the given node is booted (responds to pings)''' if self.Env["docker"]: return RemoteFactory().getInstance()("localhost", "docker inspect --format {{.State.Running}} %s | grep -q true" % node, silent=True) == 0 return RemoteFactory().getInstance()("localhost", "ping -nq -c1 -w1 %s" % node, silent=True) == 0 def IsSshdUp(self, node): rc = RemoteFactory().getInstance()(node, "true", silent=True) return rc == 0 def WaitForNodeToComeUp(self, node, Timeout=300): '''Return TRUE when given node comes up, or None/FALSE if timeout''' timeout = Timeout anytimeouts = 0 while timeout > 0: if self.IsNodeBooted(node) and self.IsSshdUp(node): if anytimeouts: # Fudge to wait for the system to finish coming up time.sleep(30) LogFactory().debug("Node %s now up" % node) return 1 time.sleep(30) if (not anytimeouts): LogFactory().debug("Waiting for node %s to come up" % node) anytimeouts = 1 timeout = timeout - 1 LogFactory().log("%s did not come up within %d tries" % (node, Timeout)) if self.Env["continue"] == 1: answer = "Y" else: try: answer = input_wrapper('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("%s did not come up within %d tries" % (node, Timeout)) def WaitForAllNodesToComeUp(self, nodes, timeout=300): '''Return TRUE when all nodes come up, or FALSE if timeout''' for node in nodes: if not self.WaitForNodeToComeUp(node, timeout): return None return 1 class ClusterManager(UserDict): '''The Cluster Manager class. This is an subclass of the Python dictionary class. (this is because it contains lots of {name,value} pairs, not because it's behavior is that terribly similar to a dictionary in other ways.) This is an abstract class which class implements high-level operations on the cluster and/or its cluster managers. Actual cluster managers classes are subclassed from this type. One of the things we do is track the state we think every node should be in. ''' def __InitialConditions(self): #if os.geteuid() != 0: # raise ValueError("Must Be Root!") None def _finalConditions(self): for key in list(self.keys()): if self[key] == None: raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") def __init__(self, Environment, randseed=None): self.Env = EnvFactory().getInstance() self.templates = PatternSelector(self.Env["Name"]) self.__InitialConditions() self.logger = LogFactory() self.TestLoggingLevel=0 self.data = {} self.name = self.Env["Name"] self.rsh = RemoteFactory().getInstance() self.ShouldBeStatus={} self.ns = NodeStatus(self.Env) self.OurNode = os.uname()[1].lower() self.__instance_errorstoignore = [] def __getitem__(self, key): if key == "Name": return self.name print("FIXME: Getting %s from %s" % (key, repr(self))) if key in self.data: return self.data[key] return self.templates.get_patterns(self.Env["Name"], key) def __setitem__(self, key, value): print("FIXME: Setting %s=%s on %s" % (key, value, repr(self))) self.data[key] = value def key_for_node(self, node): return node def instance_errorstoignore_clear(self): '''Allows the test scenario to reset instance errors to ignore on each iteration.''' self.__instance_errorstoignore = [] def instance_errorstoignore(self): '''Return list of errors which are 'normal' for a specific test instance''' return self.__instance_errorstoignore def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] def log(self, args): self.logger.log(args) def debug(self, args): self.logger.debug(args) def prepare(self): '''Finish the Initialization process. Prepare to test...''' print(repr(self)+"prepare") for node in self.Env["nodes"]: if self.StataCM(node): self.ShouldBeStatus[node] = "up" else: self.ShouldBeStatus[node] = "down" if self.Env["experimental-tests"]: self.unisolate_node(node) def upcount(self): '''How many nodes are up?''' count = 0 for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": count = count + 1 return count def install_support(self, command="install"): for node in self.Env["nodes"]: self.rsh(node, CTSvars.CRM_DAEMON_DIR + "/cts-support " + command) def install_config(self, node): return None def prepare_fencing_watcher(self, name): # If we don't have quorum now but get it as a result of starting this node, # then a bunch of nodes might get fenced upnode = None if self.HasQuorum(None): self.debug("Have quorum") return None if not self.templates["Pat:Fencing_start"]: print("No start pattern") return None if not self.templates["Pat:Fencing_ok"]: print("No ok pattern") return None stonith = None stonithPats = [] for peer in self.Env["nodes"]: if self.ShouldBeStatus[peer] != "up": stonithPats.append(self.templates["Pat:Fencing_ok"] % peer) stonithPats.append(self.templates["Pat:Fencing_start"] % peer) stonith = LogWatcher(self.Env["LogFileName"], stonithPats, "StartupFencing", 0, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"]) stonith.setwatch() return stonith def fencing_cleanup(self, node, stonith): peer_list = [] peer_state = {} self.debug("Looking for nodes that were fenced as a result of %s starting" % node) # If we just started a node, we may now have quorum (and permission to fence) if not stonith: self.debug("Nothing to do") return peer_list q = self.HasQuorum(None) if not q and len(self.Env["nodes"]) > 2: # We didn't gain quorum - we shouldn't have shot anyone self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"]))) return peer_list for n in self.Env["nodes"]: peer_state[n] = "unknown" # Now see if any states need to be updated self.debug("looking for: " + repr(stonith.regexes)) shot = stonith.look(0) while shot: line = repr(shot) self.debug("Found: " + line) del stonith.regexes[stonith.whichmatch] # Extract node name for n in self.Env["nodes"]: if re.search(self.templates["Pat:Fencing_ok"] % n, shot): peer = n peer_state[peer] = "complete" self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer) elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): # TODO: Correctly detect multiple fencing operations for the same host peer = n peer_state[peer] = "in-progress" self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer) if not peer: self.logger.log("ERROR: Unknown stonith match: %s" % line) elif not peer in peer_list: self.debug("Found peer: " + peer) peer_list.append(peer) # Get the next one shot = stonith.look(60) for peer in peer_list: self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) if self.Env["at-boot"]: self.ShouldBeStatus[peer] = "up" else: self.ShouldBeStatus[peer] = "down" if peer_state[peer] == "in-progress": # Wait for any in-progress operations to complete shot = stonith.look(60) while len(stonith.regexes) and shot: line = repr(shot) self.debug("Found: " + line) del stonith.regexes[stonith.whichmatch] shot = stonith.look(60) # Now make sure the node is alive too self.ns.WaitForNodeToComeUp(peer, self.Env["DeadTime"]) # Poll until it comes up if self.Env["at-boot"]: if not self.StataCM(peer): time.sleep(self.Env["StartTime"]) if not self.StataCM(peer): self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) return None return peer_list def StartaCM(self, node, verbose=False): '''Start up the cluster manager on a given node''' if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node)) else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) ret = 1 if not node in self.ShouldBeStatus: self.ShouldBeStatus[node] = "down" if self.ShouldBeStatus[node] != "down": return 1 patterns = [] # Technically we should always be able to notice ourselves starting patterns.append(self.templates["Pat:Local_started"] % node) if self.upcount() == 0: patterns.append(self.templates["Pat:DC_started"] % node) else: patterns.append(self.templates["Pat:NonDC_started"] % node) watch = LogWatcher( self.Env["LogFileName"], patterns, "StartaCM", self.Env["StartTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"]) self.install_config(node) self.ShouldBeStatus[node] = "any" if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): self.logger.log ("%s was already started" % (node)) return 1 stonith = self.prepare_fencing_watcher(node) watch.setwatch() if self.rsh(node, self.templates["StartCmd"]) != 0: self.logger.log ("Warn: Start command failed on node %s" % (node)) self.fencing_cleanup(node, stonith) return None self.ShouldBeStatus[node] = "up" watch_result = watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) if watch_result and self.cluster_stable(self.Env["DeadTime"]): #self.debug("Found match: "+ repr(watch_result)) self.fencing_cleanup(node, stonith) return 1 elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): self.fencing_cleanup(node, stonith) return 1 self.logger.log ("Warn: Start failed for node %s" % (node)) return None def StartaCMnoBlock(self, node, verbose=False): '''Start up the cluster manager on a given node with none-block mode''' if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node)) else: self.debug("Starting %s on node %s" % (self["Name"], node)) self.install_config(node) self.rsh(node, self.templates["StartCmd"], synchronous=0) self.ShouldBeStatus[node] = "up" return 1 def StopaCM(self, node, verbose=False, force=False): '''Stop the cluster manager on a given node''' if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node)) else: self.debug("Stopping %s on node %s" % (self["Name"], node)) if self.ShouldBeStatus[node] != "up" and force == False: return 1 if self.rsh(node, self.templates["StopCmd"]) == 0: # Make sure we can continue even if corosync leaks # fdata-* is the old name - #self.rsh(node, "rm -f /dev/shm/qb-* /dev/shm/fdata-*") + #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*") self.ShouldBeStatus[node] = "down" self.cluster_stable(self.Env["DeadTime"]) return 1 else: self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) return None def StopaCMnoBlock(self, node): '''Stop the cluster manager on a given node with none-block mode''' self.debug("Stopping %s on node %s" % (self["Name"], node)) self.rsh(node, self.templates["StopCmd"], synchronous=0) self.ShouldBeStatus[node] = "down" return 1 def cluster_stable(self, timeout = None): time.sleep(self.Env["StableTime"]) return 1 def node_stable(self, node): return 1 def RereadCM(self, node): '''Force the cluster manager on a given node to reread its config This may be a no-op on certain cluster managers. ''' rc=self.rsh(node, self.templates["RereadCmd"]) if rc == 0: return 1 else: self.logger.log ("Could not force %s on node %s to reread its config" % (self["Name"], node)) return None def StataCM(self, node): '''Report the status of the cluster manager on a given node''' out=self.rsh(node, self.templates["StatusCmd"] % node, 1) ret= (str.find(out, 'stopped') == -1) try: if ret: if self.ShouldBeStatus[node] == "down": self.logger.log( "Node status for %s is %s but we think it should be %s" % (node, "up", self.ShouldBeStatus[node])) else: if self.ShouldBeStatus[node] == "up": self.logger.log( "Node status for %s is %s but we think it should be %s" % (node, "down", self.ShouldBeStatus[node])) except KeyError: pass if ret: self.ShouldBeStatus[node] = "up" else: self.ShouldBeStatus[node] = "down" return ret def startall(self, nodelist=None, verbose=False, quick=False): '''Start the cluster manager on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.ShouldBeStatus[node] == "down": self.ns.WaitForAllNodesToComeUp(nodelist, 300) if not quick: # This is used for "basic sanity checks", so only start one node ... if not self.StartaCM(node, verbose=verbose): return 0 return 1 # Approximation of SimulStartList for --boot watchpats = [ ] watchpats.append(self.templates["Pat:DC_IDLE"]) for node in nodelist: watchpats.append(self.templates["Pat:Local_started"] % node) watchpats.append(self.templates["Pat:InfraUp"] % node) watchpats.append(self.templates["Pat:PacemakerUp"] % node) # Start all the nodes - at about the same time... watch = LogWatcher(self.Env["LogFileName"], watchpats, "fast-start", self.Env["DeadTime"]+10, hosts=self.Env["nodes"], kind=self.Env["LogWatcher"]) watch.setwatch() if not self.StartaCM(nodelist[0], verbose=verbose): return 0 for node in nodelist: self.StartaCMnoBlock(node, verbose=verbose) watch.lookforall() if watch.unmatched: for regex in watch.unmatched: self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) if not self.cluster_stable(): self.logger.log("Cluster did not stabilize") return 0 return 1 def stopall(self, nodelist=None, verbose=False, force=False): '''Stop the cluster managers on every node in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' ret = 1 map = {} if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up" or force == True: if not self.StopaCM(node, verbose=verbose, force=force): ret = 0 return ret def rereadall(self, nodelist=None): '''Force the cluster managers on every node in the cluster to reread their config files. We can do it on a subset of the cluster if nodelist is not None. ''' map = {} if not nodelist: nodelist = self.Env["nodes"] for node in self.Env["nodes"]: if self.ShouldBeStatus[node] == "up": self.RereadCM(node) def statall(self, nodelist=None): '''Return the status of the cluster managers in the cluster. We can do it on a subset of the cluster if nodelist is not None. ''' result = {} if not nodelist: nodelist = self.Env["nodes"] for node in nodelist: if self.StataCM(node): result[node] = "up" else: result[node] = "down" return result def isolate_node(self, target, nodes=None): '''isolate the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node != target: rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node)) if rc != 0: self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) return None else: self.debug("Communication cut between %s and %s" % (target, node)) return 1 def unisolate_node(self, target, nodes=None): '''fix the communication between the nodes''' if not nodes: nodes = self.Env["nodes"] for node in nodes: if node != target: restored = 0 # Limit the amount of time we have asynchronous connectivity for # Restore both sides as simultaneously as possible self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=0) self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=0) self.debug("Communication restored between %s and %s" % (target, node)) def reducecomm_node(self,node): '''reduce the communication between the nodes''' rc = self.rsh(node, self.templates["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"])) if rc == 0: return 1 else: self.logger.log("Could not reduce the communication between the nodes from node: %s" % node) return None def restorecomm_node(self,node): '''restore the saved communication between the nodes''' rc = 0 if float(self.Env["XmitLoss"]) != 0 or float(self.Env["RecvLoss"]) != 0 : rc = self.rsh(node, self.templates["RestoreCommCmd"]); if rc == 0: return 1 else: self.logger.log("Could not restore the communication between the nodes from node: %s" % node) return None def HasQuorum(self, node_list): "Return TRUE if the cluster currently has quorum" # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes raise ValueError("Abstract Class member (HasQuorum)") def Components(self): raise ValueError("Abstract Class member (Components)") def oprofileStart(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileStart(n) elif node in self.Env["oprofile"]: self.debug("Enabling oprofile on %s" % node) self.rsh(node, "opcontrol --init") self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") self.rsh(node, "opcontrol --start") self.rsh(node, "opcontrol --reset") def oprofileSave(self, test, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileSave(test, n) elif node in self.Env["oprofile"]: self.rsh(node, "opcontrol --dump") self.rsh(node, "opcontrol --save=cts.%d" % test) # Read back with: opreport -l session:cts.0 image:/c* if None: self.rsh(node, "opcontrol --reset") else: self.oprofileStop(node) self.oprofileStart(node) def oprofileStop(self, node=None): if not node: for n in self.Env["oprofile"]: self.oprofileStop(n) elif node in self.Env["oprofile"]: self.debug("Stopping oprofile on %s" % node) self.rsh(node, "opcontrol --reset") self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") def StatsExtract(self): if not self.Env["stats"]: return for host in self.Env["nodes"]: log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR if host in has_log_stats: self.rsh(host, '''bash %s %s stop''' % (log_stats_bin, log_stats_file)) (rc, lines) = self.rsh(host, '''cat %s''' % log_stats_file, stdout=2) self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file)) fname = "cts-stats-%d-nodes-%s.csv" % (len(self.Env["nodes"]), host) print("Extracted stats: %s" % fname) fd = open(fname, "a") fd.writelines(lines) fd.close() def StatsMark(self, testnum): '''Mark the test number in the stats log''' global has_log_stats if not self.Env["stats"]: return for host in self.Env["nodes"]: log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR if not host in has_log_stats: global log_stats global log_stats_bin script=log_stats #script = re.sub("\\\\", "\\\\", script) script = re.sub('\"', '\\\"', script) script = re.sub("'", "\'", script) script = re.sub("`", "\`", script) script = re.sub("\$", "\\\$", script) self.debug("Installing %s on %s" % (log_stats_bin, host)) self.rsh(host, '''echo "%s" > %s''' % (script, log_stats_bin), silent=True) self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file)) has_log_stats[host] = 1 # Now mark it self.rsh(host, '''bash %s %s mark %s''' % (log_stats_bin, log_stats_file, testnum), synchronous=0) class Resource(object): ''' This is an HA resource (not a resource group). A resource group is just an ordered list of Resource objects. ''' def __init__(self, cm, rsctype=None, instance=None): self.CM = cm self.ResourceType = rsctype self.Instance = instance self.needs_quorum = 1 def Type(self): return self.ResourceType def Instance(self, nodename): return self.Instance def IsRunningOn(self, nodename): ''' This member function returns true if our resource is running on the given node in the cluster. It is analagous to the "status" operation on SystemV init scripts and heartbeat scripts. FailSafe calls it the "exclusive" operation. ''' raise ValueError("Abstract Class member (IsRunningOn)") return None def IsWorkingCorrectly(self, nodename): ''' This member function returns true if our resource is operating correctly on the given node in the cluster. OCF does not require this operation, but it might be called the Monitor operation, which is what FailSafe calls it. For remotely monitorable resources (like IP addresses), they *should* be monitored remotely for testing. ''' raise ValueError("Abstract Class member (IsWorkingCorrectly)") return None def Start(self, nodename): ''' This member function starts or activates the resource. ''' raise ValueError("Abstract Class member (Start)") return None def Stop(self, nodename): ''' This member function stops or deactivates the resource. ''' raise ValueError("Abstract Class member (Stop)") return None def __repr__(self): if (self.Instance and len(self.Instance) > 1): return "{" + self.ResourceType + "::" + self.Instance + "}" else: return "{" + self.ResourceType + "}" class Component(object): def kill(self, node): None class Process(Component): def __init__(self, cm, name, process=None, dc_only=0, pats=[], dc_pats=[], badnews_ignore=[], common_ignore=[], triggersreboot=0): self.name = str(name) self.dc_only = dc_only self.pats = pats self.dc_pats = dc_pats self.CM = cm self.badnews_ignore = badnews_ignore self.badnews_ignore.extend(common_ignore) self.triggersreboot = triggersreboot if process: self.proc = str(process) else: self.proc = str(name) self.KillCmd = "killall -9 " + self.proc def kill(self, node): if self.CM.rsh(node, self.KillCmd) != 0: self.CM.log ("ERROR: Kill %s failed on node %s" % (self.name,node)) return None return 1 diff --git a/cts/CTSaudits.py b/cts/CTSaudits.py index b7e082729b..cc82171081 100755 --- a/cts/CTSaudits.py +++ b/cts/CTSaudits.py @@ -1,865 +1,865 @@ """ Auditing classes for Pacemaker's Cluster Test Suite (CTS) """ # Pacemaker targets compatibility with Python 2.7 and 3.2+ from __future__ import print_function, unicode_literals, absolute_import, division __copyright__ = "Copyright 2000-2018 the Pacemaker project contributors" __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" import time, re, uuid from cts.watcher import LogWatcher from cts.remote import input_wrapper from cts.CTSvars import * class ClusterAudit(object): def __init__(self, cm): self.CM = cm def __call__(self): raise ValueError("Abstract Class member (__call__)") def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def log(self, args): self.CM.log("audit: %s" % args) def debug(self, args): self.CM.debug("audit: %s" % args) def name(self): raise ValueError("Abstract Class member (name)") AllAuditClasses = [ ] class LogAudit(ClusterAudit): def name(self): return "LogAudit" def __init__(self, cm): self.CM = cm self.kinds = [ "combined syslog", "journal", "remote" ] def RestartClusterLogging(self, nodes=None): if not nodes: nodes = self.CM.Env["nodes"] self.CM.debug("Restarting logging on: %s" % repr(nodes)) for node in nodes: if self.CM.Env["have_systemd"]: if self.CM.rsh(node, "systemctl stop systemd-journald.socket") != 0: self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) if self.CM.rsh(node, "systemctl start systemd-journald.service") != 0: self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) if self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"]) != 0: self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node)) def TestLogging(self): patterns = [] prefix = "Test message from" suffix = str(uuid.uuid4()) watch = {} for node in self.CM.Env["nodes"]: # Look for the node name in two places to make sure # that syslog is logging with the correct hostname m = re.search("^([^.]+).*", node) if m: simple = m.group(1) else: simple = node patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) watch_pref = self.CM.Env["LogWatcher"] if watch_pref == "any": for k in self.kinds: watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, "LogAudit", 5, silent=True, hosts=self.CM.Env["nodes"], kind=k) watch[k].setwatch() else: k = watch_pref watch[k] = LogWatcher(self.CM.Env["LogFileName"], patterns, "LogAudit", 5, silent=True, hosts=self.CM.Env["nodes"], kind=k) watch[k].setwatch() if watch_pref == "any": self.CM.log("Writing log with key: %s" % (suffix)) for node in self.CM.Env["nodes"]: cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix) if self.CM.rsh(node, cmd, synchronous=0, silent=True) != 0: self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) for k in self.kinds: if k in watch: w = watch[k] if watch_pref == "any": self.CM.log("Testing for %s logs" % (k)) w.lookforall(silent=True) if not w.unmatched: if watch_pref == "any": self.CM.log ("Continuing with %s-based log reader" % (w.kind)) self.CM.Env["LogWatcher"] = w.kind return 1 for k in list(watch.keys()): w = watch[k] if w.unmatched: for regex in w.unmatched: self.CM.log ("Test message [%s] not found in %s logs." % (regex, w.kind)) return 0 def __call__(self): max = 3 attempt = 0 self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) while attempt <= max and self.TestLogging() == 0: attempt = attempt + 1 self.RestartClusterLogging() time.sleep(60*attempt) if attempt > max: self.CM.log("ERROR: Cluster logging unrecoverable.") return 0 return 1 def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 if self.CM.Env["LogAuditDisabled"]: return 0 return 1 class DiskAudit(ClusterAudit): def name(self): return "DiskspaceAudit" def __init__(self, cm): self.CM = cm def __call__(self): result = 1 # @TODO Use directory of PCMK_logfile if set on host dfcmd = "df -BM " + CTSvars.CRM_LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'" self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: dfout = self.CM.rsh(node, dfcmd, 1) if not dfout: self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) else: try: (used, remain) = dfout.split() used_percent = int(used) remaining_mb = int(remain) except (ValueError, TypeError): self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]" % (dfout, node, used, remain)) else: if remaining_mb < 10 or used_percent > 95: self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" % (node, used_percent, remaining_mb)) result = None if self.CM.Env["continue"] == 1: answer = "Y" else: try: answer = input_wrapper('Continue? [nY]') except EOFError as e: answer = "n" if answer and answer == "n": raise ValueError("Disk full on %s" % (node)) elif remaining_mb < 100 or used_percent > 90: self.CM.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) return result def is_applicable(self): if self.CM.Env["DoBSC"]: return 0 return 1 class FileAudit(ClusterAudit): def name(self): return "FileAudit" def __init__(self, cm): self.CM = cm self.known = [] def __call__(self): result = 1 self.CM.ns.WaitForAllNodesToComeUp(self.CM.Env["nodes"]) for node in self.CM.Env["nodes"]: (rc, lsout) = self.CM.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", None) for line in lsout: line = line.strip() if line not in self.known: result = 0 self.known.append(line) self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line)) (rc, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", None) for line in lsout: line = line.strip() if line not in self.known: result = 0 self.known.append(line) self.CM.log("Warning: Corosync core file on %s: %s" % (node, line)) if node in self.CM.ShouldBeStatus and self.CM.ShouldBeStatus[node] == "down": clean = 0 (rc, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", None) for line in lsout: result = 0 clean = 1 self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line)) if clean: (rc, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", None) for line in lsout: self.CM.debug("ps[%s]: %s" % (node, line)) - self.CM.rsh(node, "rm -f /dev/shm/qb-*") + self.CM.rsh(node, "rm -rf /dev/shm/qb-*") else: self.CM.debug("Skipping %s" % node) return result def is_applicable(self): return 1 class AuditResource(object): def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.clone_id = fields[3] self.parent = fields[4] self.rprovider = fields[5] self.rclass = fields[6] self.rtype = fields[7] self.host = fields[8] self.needs_quorum = fields[9] self.flags = int(fields[10]) self.flags_s = fields[11] if self.parent == "NA": self.parent = None def unique(self): if self.flags & int("0x00000020", 16): return 1 return 0 def orphan(self): if self.flags & int("0x00000001", 16): return 1 return 0 def managed(self): if self.flags & int("0x00000002", 16): return 1 return 0 class AuditConstraint(object): def __init__(self, cm, line): fields = line.split() self.CM = cm self.line = line self.type = fields[1] self.id = fields[2] self.rsc = fields[3] self.target = fields[4] self.score = fields[5] self.rsc_role = fields[6] self.target_role = fields[7] if self.rsc_role == "NA": self.rsc_role = None if self.target_role == "NA": self.target_role = None class PrimitiveAudit(ClusterAudit): def name(self): return "PrimitiveAudit" def __init__(self, cm): self.CM = cm def doResourceAudit(self, resource, quorum): rc = 1 active = self.CM.ResourceLocation(resource.id) if len(active) == 1: if quorum: self.debug("Resource %s active on %s" % (resource.id, repr(active))) elif resource.needs_quorum == 1: self.CM.log("Resource %s active without quorum: %s" % (resource.id, repr(active))) rc = 0 elif not resource.managed(): self.CM.log("Resource %s not managed. Active on %s" % (resource.id, repr(active))) elif not resource.unique(): # TODO: Figure out a clever way to actually audit these resource types if len(active) > 1: self.debug("Non-unique resource %s is active on: %s" % (resource.id, repr(active))) else: self.debug("Non-unique resource %s is not active" % resource.id) elif len(active) > 1: self.CM.log("Resource %s is active multiple times: %s" % (resource.id, repr(active))) rc = 0 elif resource.orphan(): self.debug("Resource %s is an inactive orphan" % resource.id) elif len(self.inactive_nodes) == 0: self.CM.log("WARN: Resource %s not served anywhere" % resource.id) rc = 0 elif self.CM.Env["warn-inactive"] == 1: if quorum or not resource.needs_quorum: self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) else: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) elif quorum or not resource.needs_quorum: self.debug("Resource %s not served anywhere (Inactive nodes: %s)" % (resource.id, repr(self.inactive_nodes))) return rc def setup(self): self.target = None self.resources = [] self.constraints = [] self.active_nodes = [] self.inactive_nodes = [] for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == "up": self.active_nodes.append(node) else: self.inactive_nodes.append(node) for node in self.CM.Env["nodes"]: if self.target == None and self.CM.ShouldBeStatus[node] == "up": self.target = node if not self.target: # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource # with CIB_file=/path/to/cib.xml even when the cluster isn't running self.debug("No nodes active - skipping %s" % self.name()) return 0 (rc, lines) = self.CM.rsh(self.target, "crm_resource -c", None) for line in lines: if re.search("^Resource", line): self.resources.append(AuditResource(self.CM, line)) elif re.search("^Constraint", line): self.constraints.append(AuditConstraint(self.CM, line)) else: self.CM.log("Unknown entry: %s" % line); return 1 def __call__(self): rc = 1 if not self.setup(): return 1 quorum = self.CM.HasQuorum(None) for resource in self.resources: if resource.type == "primitive": if self.doResourceAudit(resource, quorum) == 0: rc = 0 return rc def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 class GroupAudit(PrimitiveAudit): def name(self): return "GroupAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for group in self.resources: if group.type == "group": first_match = 1 group_location = None for child in self.resources: if child.parent == group.id: nodes = self.CM.ResourceLocation(child.id) if first_match and len(nodes) > 0: group_location = nodes[0] first_match = 0 if len(nodes) > 1: rc = 0 self.CM.log("Child %s of %s is active more than once: %s" % (child.id, group.id, repr(nodes))) elif len(nodes) == 0: # Groups are allowed to be partially active # However we do need to make sure later children aren't running group_location = None self.debug("Child %s of %s is stopped" % (child.id, group.id)) elif nodes[0] != group_location: rc = 0 self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s" % (child.id, group.id, nodes[0], group_location)) else: self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) return rc class CloneAudit(PrimitiveAudit): def name(self): return "CloneAudit" def __call__(self): rc = 1 if not self.setup(): return 1 for clone in self.resources: if clone.type == "clone": for child in self.resources: if child.parent == clone.id and child.type == "primitive": self.debug("Checking child %s of %s..." % (child.id, clone.id)) # Check max and node_max # Obtain with: # crm_resource -g clone_max --meta -r child.id # crm_resource -g clone_node_max --meta -r child.id return rc class ColocationAudit(PrimitiveAudit): def name(self): return "ColocationAudit" def crm_location(self, resource): (rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, None) hosts = [] if rc == 0: for line in lines: fields = line.split() hosts.append(fields[0]) return hosts def __call__(self): rc = 1 if not self.setup(): return 1 for coloc in self.constraints: if coloc.type == "rsc_colocation": source = self.crm_location(coloc.rsc) target = self.crm_location(coloc.target) if len(source) == 0: self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) else: for node in source: if not node in target: rc = 0 self.CM.log("Colocation audit (%s): %s running on %s (not in %s)" % (coloc.id, coloc.rsc, node, repr(target))) else: self.debug("Colocation audit (%s): %s running on %s (in %s)" % (coloc.id, coloc.rsc, node, repr(target))) return rc class ControllerStateAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 up_are_down = 0 down_are_up = 0 unstable_list = [] for node in self.CM.Env["nodes"]: should_be = self.CM.ShouldBeStatus[node] rc = self.CM.test_node_CM(node) if rc > 0: if should_be == "down": down_are_up = down_are_up + 1 if rc == 1: unstable_list.append(node) elif should_be == "up": up_are_down = up_are_down + 1 if len(unstable_list) > 0: passed = 0 self.CM.log("Cluster is not stable: %d (of %d): %s" % (len(unstable_list), self.CM.upcount(), repr(unstable_list))) if up_are_down > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be up were down." % (up_are_down, len(self.CM.Env["nodes"]))) if down_are_up > 0: passed = 0 self.CM.log("%d (of %d) nodes expected to be down were up." % (down_are_up, len(self.CM.Env["nodes"]))) return passed def name(self): return "ControllerStateAudit" def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 class CIBAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 ccm_partitions = self.CM.find_partitions() if len(ccm_partitions) == 0: self.debug("\tNo partitions to audit") return 1 for partition in ccm_partitions: self.debug("\tAuditing CIB consistency for: %s" % partition) partition_passed = 0 if self.audit_cib_contents(partition) == 0: passed = 0 return passed def audit_cib_contents(self, hostlist): passed = 1 node0 = None node0_xml = None partition_hosts = hostlist.split() for node in partition_hosts: node_xml = self.store_remote_cib(node, node0) if node_xml == None: self.CM.log("Could not perform audit: No configuration from %s" % node) passed = 0 elif node0 == None: node0 = node node0_xml = node_xml elif node0_xml == None: self.CM.log("Could not perform audit: No configuration from %s" % node0) passed = 0 else: (rc, result) = self.CM.rsh( node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), None) if rc != 0: self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) passed = 0 for line in result: if not re.search("", line): passed = 0 self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) else: self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) # self.CM.rsh(node0, "rm -f %s" % node_xml) # self.CM.rsh(node0, "rm -f %s" % node0_xml) return passed def store_remote_cib(self, node, target): combined = "" filename = "/tmp/ctsaudit.%s.xml" % node if not target: target = node (rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], None) if rc != 0: self.CM.log("Could not retrieve configuration") return None self.CM.rsh("localhost", "rm -f %s" % filename) for line in lines: self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), silent=True) if self.CM.rsh.cp(filename, "root@%s:%s" % (target, filename), silent=True) != 0: self.CM.log("Could not store configuration") return None return filename def name(self): return "CibAudit" def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 class PartitionAudit(ClusterAudit): def __init__(self, cm): self.CM = cm self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} self.NodeEpoch = {} self.NodeState = {} self.NodeQuorum = {} def has_key(self, key): return key in self.Stats def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not name in self.Stats: self.Stats[name] = 0 self.Stats[name] = self.Stats[name]+1 def __call__(self): passed = 1 ccm_partitions = self.CM.find_partitions() if ccm_partitions == None or len(ccm_partitions) == 0: return 1 self.CM.cluster_stable(double_check=True) if len(ccm_partitions) != self.CM.partitions_expected: self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) passed = 0 for partition in ccm_partitions: self.CM.log("\t %s" % partition) for partition in ccm_partitions: partition_passed = 0 if self.audit_partition(partition) == 0: passed = 0 return passed def trim_string(self, avalue): if not avalue: return None if len(avalue) > 1: return avalue[:-1] def trim2int(self, avalue): if not avalue: return None if len(avalue) > 1: return int(avalue[:-1]) def audit_partition(self, partition): passed = 1 dc_found = [] dc_allowed_list = [] lowest_epoch = None node_list = partition.split() self.debug("Auditing partition: %s" % (partition)) for node in node_list: if self.CM.ShouldBeStatus[node] != "up": self.CM.log("Warn: Node %s appeared out of nowhere" % (node)) self.CM.ShouldBeStatus[node] = "up" # not in itself a reason to fail the audit (not what we're # checking for in this audit) self.NodeState[node] = self.CM.rsh(node, self.CM["StatusCmd"] % node, 1) self.NodeEpoch[node] = self.CM.rsh(node, self.CM["EpochCmd"], 1) self.NodeQuorum[node] = self.CM.rsh(node, self.CM["QuorumCmd"], 1) self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoch[node], self.NodeQuorum[node])) self.NodeState[node] = self.trim_string(self.NodeState[node]) self.NodeEpoch[node] = self.trim2int(self.NodeEpoch[node]) self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) if not self.NodeEpoch[node]: self.CM.log("Warn: Node %s dissappeared: cant determin epoch" % (node)) self.CM.ShouldBeStatus[node] = "down" # not in itself a reason to fail the audit (not what we're # checking for in this audit) elif lowest_epoch == None or self.NodeEpoch[node] < lowest_epoch: lowest_epoch = self.NodeEpoch[node] if not lowest_epoch: self.CM.log("Lowest epoch not determined in %s" % (partition)) passed = 0 for node in node_list: if self.CM.ShouldBeStatus[node] == "up": if self.CM.is_node_dc(node, self.NodeState[node]): dc_found.append(node) if self.NodeEpoch[node] == lowest_epoch: self.debug("%s: OK" % node) elif not self.NodeEpoch[node]: self.debug("Check on %s ignored: no node epoch" % node) elif not lowest_epoch: self.debug("Check on %s ignored: no lowest epoch" % node) else: self.CM.log("DC %s is not the oldest node (%d vs. %d)" % (node, self.NodeEpoch[node], lowest_epoch)) passed = 0 if len(dc_found) == 0: self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) elif len(dc_found) > 1: self.CM.log("%d DCs (%s) found in cluster partition: %s" % (len(dc_found), str(dc_found), str(node_list))) passed = 0 if passed == 0: for node in node_list: if self.CM.ShouldBeStatus[node] == "up": self.CM.log("epoch %s : %s" % (self.NodeEpoch[node], self.NodeState[node])) return passed def name(self): return "PartitionAudit" def is_applicable(self): # @TODO Due to long-ago refactoring, this name test would never match, # so this audit (and those derived from it) would never run. # Uncommenting the next lines fixes the name test, but that then # exposes pre-existing bugs that need to be fixed. #if self.CM["Name"] == "crm-corosync": # return 1 return 0 AllAuditClasses.append(DiskAudit) AllAuditClasses.append(FileAudit) AllAuditClasses.append(LogAudit) AllAuditClasses.append(ControllerStateAudit) AllAuditClasses.append(PartitionAudit) AllAuditClasses.append(PrimitiveAudit) AllAuditClasses.append(GroupAudit) AllAuditClasses.append(CloneAudit) AllAuditClasses.append(ColocationAudit) AllAuditClasses.append(CIBAudit) def AuditList(cm): result = [] for auditclass in AllAuditClasses: a = auditclass(cm) if a.is_applicable(): result.append(a) return result