diff --git a/cts/corolab.py b/cts/corolab.py index 9bc7bd65..8f1ebb5e 100755 --- a/cts/corolab.py +++ b/cts/corolab.py @@ -1,325 +1,335 @@ #!/usr/bin/python '''CTS: Cluster Testing System: Lab environment module ''' __copyright__=''' Copyright (c) 2010 Red Hat, Inc. ''' # All rights reserved. # # Author: Angus Salkeld # # This software licensed under BSD license, the text of which follows: # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # - Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # - Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # - Neither the name of the MontaVista Software, Inc. nor the names of its # contributors may be used to endorse or promote products derived from this # software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # THE POSSIBILITY OF SUCH DAMAGE. import sys from cts.CTSscenarios import * +from cts.logging import * from corotests import CoroTestList from corosync import * sys.path.append("/usr/share/pacemaker/tests/cts") # So that things work from the source directory try: from CTSlab import * except ImportError: sys.stderr.write("abort: couldn't find CTSLab in [%s]\n" % ' '.join(sys.path)) sys.stderr.write("(check your install and PYTHONPATH)\n") sys.exit(-1) tests = None cm = None old_handler = None DefaultFacility = "daemon" def usage(arg): print "Illegal argument " + arg print "usage: " + sys.argv[0] +" [options] number-of-iterations" print "\nCommon options: " print "\t [--at-boot (1|0)], does the cluster software start at boot time" print "\t [--nodes 'node list'], list of cluster nodes separated by whitespace" print "\t [--limit-nodes max], only use the first 'max' cluster nodes supplied with --nodes" print "\t [--logfile path], where should the test software look for logs from cluster nodes" print "\t [--rrp-bindaddr addr], extra interface used for rrp, provide the bindaddr" print "\t [--outputfile path], optional location for the test software to write logs to" print "\t [--syslog-facility name], which syslog facility should the test software log to" print "\t [--choose testcase-name], run only the named test" print "\t [--list-tests], list the valid tests" print "\t [--benchmark], add the timing information" print "\t " print "Additional (less common) options: " print "\t [--trunc (truncate logfile before starting)]" print "\t [--xmit-loss lost-rate(0.0-1.0)]" print "\t [--recv-loss lost-rate(0.0-1.0)]" print "\t [--standby (1 | 0 | yes | no)]" print "\t [--fencing (1 | 0 | yes | no)]" print "\t [--once], run all valid tests once" print "\t [--no-loop-tests], dont run looping/time-based tests" print "\t [--no-unsafe-tests], dont run tests that are unsafe for use with ocfs2/drbd" print "\t [--valgrind-tests], include tests using valgrind" print "\t [--experimental-tests], include experimental tests" print "\t [--oprofile 'node list'], list of cluster nodes to run oprofile on]" print "\t [--qarsh] Use the QARSH backdoor to access nodes instead of SSH" print "\t [--seed random_seed]" print "\t [--set option=value]" sys.exit(1) class CoroLabEnvironment(CtsLab): def __init__(self): CtsLab.__init__(self) # Get a random seed for the random number generator. self["DoStonith"] = 0 self["DoStandby"] = 0 self["DoFencing"] = 0 self["XmitLoss"] = "0.0" self["RecvLoss"] = "0.0" self["IPBase"] = "127.0.0.10" self["ClobberCIB"] = 0 self["CIBfilename"] = None self["CIBResource"] = 0 self["DoBSC"] = 0 self["use_logd"] = 0 self["oprofile"] = [] self["RrpBindAddr"] = None self["warn-inactive"] = 0 self["ListTests"] = 0 self["benchmark"] = 0 self["logrestartcmd"] = "systemctl restart rsyslog.service 2>&1 > /dev/null" self["syslogd"] ="rsyslog" self["Schema"] = "corosync 2.0" - self["Stack"] = "corosync (needle)" + self["Stack"] = "corosync" self['CMclass'] = corosync_needle self["stonith-type"] = "external/ssh" self["stonith-params"] = "hostlist=all,livedangerously=yes" self["at-boot"] = 0 # Does the cluster software start automatically when the node boot - self["logger"] = ([StdErrLog(self)]) + self["logger"] = ([StdErrLog(self, 'corosync.log')]) self["loop-minutes"] = 60 self["valgrind-prefix"] = None self["valgrind-procs"] = "corosync" self["valgrind-opts"] = """--leak-check=full --show-reachable=yes --trace-children=no --num-callers=25 --gen-suppressions=all --suppressions="""+CTSvars.CTS_home+"""/cts.supp""" self["experimental-tests"] = 0 self["valgrind-tests"] = 0 self["unsafe-tests"] = 0 self["loop-tests"] = 0 self["all-once"] = 0 self["LogWatcher"] = "remote" self["SyslogFacility"] = DefaultFacility self["stats"] = 0 + self.log = LogFactory().log # # Main entry into the test system. # if __name__ == '__main__': Environment = CoroLabEnvironment() NumIter = 0 Version = 1 LimitNodes = 0 TestCase = None TruncateLog = 0 ListTests = 0 HaveSeed = 0 node_list = '' # # The values of the rest of the parameters are now properly derived from # the configuration files. # # Set the signal handler signal.signal(15, sig_handler) signal.signal(10, sig_handler) # Process arguments... skipthis=None args=sys.argv[1:] for i in range(0, len(args)): if skipthis: skipthis=None continue elif args[i] == "-l" or args[i] == "--limit-nodes": skipthis=1 LimitNodes = int(args[i+1]) elif args[i] == "-L" or args[i] == "--logfile": skipthis=1 Environment["LogFileName"] = args[i+1] elif args[i] == "--outputfile": skipthis=1 Environment["OutputFile"] = args[i+1] elif args[i] == "--rrp-bindaddr": skipthis=1 Environment["RrpBindAddr"] = args[i+1] elif args[i] == "--oprofile": skipthis=1 Environment["oprofile"] = args[i+1].split(' ') elif args[i] == "--trunc": Environment["TruncateLog"]=1 elif args[i] == "--list-tests": Environment["ListTests"]=1 elif args[i] == "--benchmark": Environment["benchmark"]=1 elif args[i] == "--qarsh": Environment.rsh.enable_qarsh() elif args[i] == "--fencing": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": Environment["DoFencing"] = 1 elif args[i+1] == "0" or args[i+1] == "no": Environment["DoFencing"] = 0 else: usage(args[i+1]) elif args[i] == "--xmit-loss": try: float(args[i+1]) except ValueError: print ("--xmit-loss parameter should be float") usage(args[i+1]) skipthis=1 Environment["XmitLoss"] = args[i+1] elif args[i] == "--recv-loss": try: float(args[i+1]) except ValueError: print ("--recv-loss parameter should be float") usage(args[i+1]) skipthis=1 Environment["RecvLoss"] = args[i+1] elif args[i] == "--choose": skipthis=1 TestCase = args[i+1] elif args[i] == "--nodes": skipthis=1 node_list = args[i+1].split(' ') elif args[i] == "--at-boot" or args[i] == "--cluster-starts-at-boot": skipthis=1 if args[i+1] == "1" or args[i+1] == "yes": Environment["at-boot"] = 1 elif args[i+1] == "0" or args[i+1] == "no": Environment["at-boot"] = 0 else: usage(args[i+1]) elif args[i] == "--set": skipthis=1 (name, value) = args[i+1].split('=') Environment[name] = value + elif args[i] == "--once": + skipthis=1 + Environment["all-once"]=1 + + elif args[i] == "--stack": + skipthis=1 + Environment["Stack"] = args[i+1] + else: try: NumIter=int(args[i]) except ValueError: usage(args[i]) if Environment["OutputFile"]: Environment["logger"].append(FileLog(Environment, Environment["OutputFile"])) if len(node_list) < 1: print "No nodes specified!" sys.exit(1) if LimitNodes > 0: if len(node_list) > LimitNodes: print("Limiting the number of nodes configured=%d (max=%d)" %(len(node_list), LimitNodes)) while len(node_list) > LimitNodes: node_list.pop(len(node_list)-1) Environment["nodes"] = node_list # Create the Cluster Manager object cm = Environment['CMclass'](Environment) Audits = CoroAuditList(cm) if Environment["ListTests"] == 1 : Tests = CoroTestList(cm, Audits) Environment.log("Total %d tests"%len(Tests)) for test in Tests : Environment.log(str(test.name)); sys.exit(0) if TruncateLog: Environment.log("Truncating %s" % LogFile) lf = open(LogFile, "w"); if lf != None: lf.truncate(0) lf.close() if TestCase != None: for test in CoroTestList(cm, Audits): if test.name == TestCase: Tests.append(test) if Tests == []: usage("--choose: No applicable/valid tests chosen") else: Tests = CoroTestList(cm, Audits) # Scenario selection if Environment["DoBSC"]: scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests) elif Environment["all-once"] or NumIter == 0: NumIter = len(Tests) scenario = AllOnce( cm, [ BootCluster(Environment), TestAgentComponent(Environment), PacketLoss(Environment) ], Audits, Tests) else: scenario = RandomTests( cm, [ BootCluster(Environment), TestAgentComponent(Environment), PacketLoss(Environment) ], Audits, Tests) Environment.log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TESTS ") Environment.log("Stack: %s" % Environment["Stack"]) Environment.log("Schema: %s" % Environment["Schema"]) Environment.log("Scenario: %s" % scenario.__doc__) Environment.log("Random Seed: %s" % Environment["RandSeed"]) Environment.log("System log files: %s" % Environment["LogFileName"]) Environment.dump() rc = Environment.run(scenario, NumIter) sys.exit(rc) diff --git a/cts/corosync.py b/cts/corosync.py index 47276935..21898511 100644 --- a/cts/corosync.py +++ b/cts/corosync.py @@ -1,670 +1,673 @@ '''CTS: Cluster Testing System: corosync... ''' __copyright__=''' Copyright (c) 2010 Red Hat, Inc. ''' # All rights reserved. # # Author: Angus Salkeld # # This software licensed under BSD license, the text of which follows: # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # - Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # - Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # - Neither the name of the MontaVista Software, Inc. nor the names of its # contributors may be used to endorse or promote products derived from this # software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # THE POSSIBILITY OF SUCH DAMAGE. import os import sys import time import socket import shutil import string import augeas from cts.CTS import ClusterManager from cts.CTSscenarios import ScenarioComponent -from cts.CTS import RemoteExec +from cts.remote import RemoteExec +from cts.remote import RemoteFactory +from cts.logging import * from cts.CTSvars import CTSvars from cts.CTSaudits import ClusterAudit from cts.CTSaudits import LogAudit ################################################################### class CoroConfig(object): def __init__(self, corobase=None): self.base = "/files/etc/corosync/corosync.conf/" self.new_root = "/tmp/aug-root/" if corobase == None: self.corobase = os.getcwd() + "/.." else: self.corobase = corobase example = self.corobase + "/conf/corosync.conf.example" if os.path.isdir(self.new_root): shutil.rmtree (self.new_root) os.makedirs (self.new_root + "/etc/corosync") shutil.copy (example, self.new_root + "/etc/corosync/corosync.conf") self.aug = augeas.Augeas (root=self.new_root, loadpath=self.corobase + "/conf/lenses") self.original = {} # store the original values (of totem), so we can restore them in # apply_default_config() totem = self.aug.match('/files/etc/corosync/corosync.conf/totem/*') for c in totem: # /files/etc/corosync/corosync.conf/ short_name = c[len(self.base):] self.original[short_name] = self.aug.get(c) interface = self.aug.match('/files/etc/corosync/corosync.conf/totem/interface/*') for c in interface: short_name = c[len(self.base):] self.original[short_name] = self.aug.get(c) def get (self, name): return self.aug.get (self.base + name) def set (self, name, value): token = self.aug.set (self.base + name, str(value)) def save (self): self.aug.save() def get_filename(self): return self.new_root + "/etc/corosync/corosync.conf" ################################################################### class corosync_needle(ClusterManager): ''' bla ''' def __init__(self, Environment, randseed=None): ClusterManager.__init__(self, Environment, randseed) self.update({ "Name" : "corosync(needle)", "StartCmd" : "service corosync start", "StopCmd" : "service corosync stop", "RereadCmd" : "service corosync reload", "StatusCmd" : "service corosync status", "DeadTime" : 30, "StartTime" : 15, # Max time to start up "StableTime" : 10, "BreakCommCmd" : "/usr/share/corosync/tests/net_breaker.sh BreakCommCmd %s", "FixCommCmd" : "/usr/share/corosync/tests/net_breaker.sh FixCommCmd %s", "QuorumCmd" : "corosync-quorumtool -s", "Pat:We_stopped" : "%s.*Corosync Cluster Engine exiting.*", "Pat:They_stopped" : "%s.*Member left:.*%s.*", "Pat:They_dead" : "corosync:.*Node %s is now: lost", "Pat:Local_starting" : "%s.*Initializing transport", "Pat:Local_started" : "%s.*Initializing transport", "Pat:Master_started" : "%s.*Completed service synchronization, ready to provide service.", "Pat:Slave_started" : "%s.*Completed service synchronization, ready to provide service.", "Pat:ChildKilled" : "%s corosync.*Child process %s terminated with signal 9", "Pat:ChildRespawn" : "%s corosync.*Respawning failed child process: %s", "Pat:ChildExit" : "Child process .* exited", "Pat:DC_IDLE" : ".*A new membership.*was formed.", # Bad news Regexes. Should never occur. "BadRegexes" : ( r"ERROR:", r"CRIT:", r"Shutting down\.", r"Forcing shutdown\.", r"core dump", r"Could not bind AF_UNIX", r"Too many open files", r"Address already in use", ), "LogFileName" : Environment["LogFileName"], }) self.start_cpg = True self.cpg_agent = {} self.sam_agent = {} self.votequorum_agent = {} self.config = CoroConfig () self.node_to_ip = {} self.new_config = {} self.applied_config = {} for n in self.Env["nodes"]: ip = socket.gethostbyname(n) ips = ip.split('.') ips[3] = '0' ip_mask = '.'.join(ips) self.new_config['totem/interface/bindnetaddr'] = str(ip_mask) return def apply_default_config(self): for c in self.applied_config: if 'bindnetaddr' in c: continue elif not self.config.original.has_key(c): # new config option (non default) pass elif self.applied_config[c] is not self.config.original[c]: # reset to the original self.new_config[c] = self.config.original[c] if len(self.new_config) > 0: self.debug('applying default config') self.stopall() def apply_new_config(self, need_all_up=True): if len(self.new_config) > 0: self.debug('applying new config') self.stopall() if need_all_up: self.startall() def install_all_config(self): tmp1 = {} sorted_keys = sorted(self.new_config.keys()) for c in sorted_keys: self.log('configuring: ' + c + ' = '+ str(self.new_config[c])) self.config.set (c, self.new_config[c]) self.applied_config[c] = self.new_config[c] tmp1[c] = self.new_config[c] for c in tmp1: del self.new_config[c] self.config.save() src_file = self.config.get_filename() for node in self.Env["nodes"]: self.rsh.cp(src_file, "%s:%s" % (node, "/etc/corosync/")) def install_config(self, node): # install gets new_config and installs it, then moves the # config to applied_config if len(self.new_config) > 0: self.install_all_config() def key_for_node(self, node): if not self.node_to_ip.has_key(node): self.node_to_ip[node] = socket.gethostbyname (node) return self.node_to_ip[node] def StartaCM(self, node, verbose=False): if not self.ShouldBeStatus.has_key(node): self.ShouldBeStatus[node] = "down" if self.ShouldBeStatus[node] != "down": return 1 self.debug('starting corosync on : ' + node) ret = ClusterManager.StartaCM(self, node) if self.start_cpg: if self.cpg_agent.has_key(node): self.cpg_agent[node].restart() else: self.cpg_agent[node] = CpgTestAgent(node, self.Env) self.cpg_agent[node].start() if self.sam_agent.has_key(node): self.sam_agent[node].restart() # votequorum agent started as needed. if self.applied_config.has_key('quorum/provider'): if self.applied_config['quorum/provider'] is 'corosync_votequorum': if self.votequorum_agent.has_key(node): self.votequorum_agent[node].restart() else: self.votequorum_agent[node] = VoteQuorumTestAgent(node, self.Env) self.votequorum_agent[node].start() return ret def StopaCM(self, node, verbose=False): if self.ShouldBeStatus[node] != "up": return 1 self.debug('stoping corosync on : ' + node) if self.cpg_agent.has_key(node): self.cpg_agent[node].stop() if self.sam_agent.has_key(node): self.sam_agent[node].stop() if self.votequorum_agent.has_key(node): self.votequorum_agent[node].stop() return ClusterManager.StopaCM(self, node) def test_node_CM(self, node): # 2 - up and stable # 1 - unstable # 0 - down (rc, lines) = self.rsh(node, self["StatusCmd"], stdout=2) out = str(lines) if 'systemd' in out: if 'running' in out: ret = 2 else: ret = 0 else: is_stopped = string.find(out, 'stopped') is_dead = string.find(out, 'dead') ret = (is_dead is -1 and is_stopped is -1) try: if ret: ret = 2 if self.ShouldBeStatus[node] == "down": self.log( "Node status for %s is %s but we think it should be %s" % (node, "up", self.ShouldBeStatus[node])) else: if self.ShouldBeStatus[node] == "up": self.log( "Node status for %s is %s but we think it should be %s" % (node, "down", self.ShouldBeStatus[node])) except KeyError: pass if ret: self.ShouldBeStatus[node] = "up" else: self.ShouldBeStatus[node] = "down" return ret def StataCM(self, node): '''Report the status of corosync on a given node''' if self.test_node_CM(node) > 0: return 1 else: return None def RereadCM(self, node): self.log('reloading corosync on : ' + node) return ClusterManager.RereadCM(self, node) def find_partitions(self): ccm_partitions = [] return ccm_partitions def prepare(self): '''Finish the Initialization process. Prepare to test...''' self.partitions_expected = 1 for node in self.Env["nodes"]: self.ShouldBeStatus[node] = "" self.unisolate_node(node) self.StataCM(node) def HasQuorum(self, node_list): # If we are auditing a partition, then one side will # have quorum and the other not. # So the caller needs to tell us which we are checking # If no value for node_list is specified... assume all nodes if not node_list: node_list = self.Env["nodes"] for node in node_list: if self.ShouldBeStatus[node] == "up": (quorum, qout) = self.rsh(node, self["QuorumCmd"], stdout=2) if quorum == 1: return 1 elif quorum == 0: return 0 else: self.log("WARN: Unexpected quorum test result from %s : %d" % (node, quorum)) return 0 def Components(self): return None class ShmLeakAudit(ClusterAudit): def __init__(self, cm): self.CM = cm def name(self): return "ShmLeakAudit" def is_applicable(self): return 1 def __call__(self): rc = 1 for node in self.CM.Env["nodes"]: (res, lines) = self.CM.rsh(node, "/usr/share/corosync/tests/shm_leak_audit.sh", None) for line in lines: self.CM.log("%s leaked %s" % (node, line)) rc = 0 return rc ################################################################### class TestAgentComponent(ScenarioComponent): def __init__(self, Env): self.Env = Env def IsApplicable(self): '''Return TRUE if the current ScenarioComponent is applicable in the given LabEnvironment given to the constructor. ''' return True def SetUp(self, CM): '''Set up the given ScenarioComponent''' self.CM = CM for node in self.Env["nodes"]: if not CM.StataCM(node): raise RuntimeError ("corosync not up") if self.CM.start_cpg: if self.CM.cpg_agent.has_key(node): self.CM.cpg_agent[node].clean_start() else: self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env) self.CM.cpg_agent[node].start() if self.CM.sam_agent.has_key(node): self.CM.sam_agent[node].clean_start() else: self.CM.sam_agent[node] = SamTestAgent(node, CM.Env) self.CM.sam_agent[node].start() # votequorum agent started as needed. if self.CM.applied_config.has_key('quorum/provider'): if CM.applied_config['quorum/provider'] is 'corosync_votequorum': self.CM.votequorum_agent[node] = VoteQuorumTestAgent(node, CM.Env) self.CM.votequorum_agent[node].start() return 1 def TearDown(self, CM): '''Tear down (undo) the given ScenarioComponent''' self.CM = CM for node in self.Env["nodes"]: if self.CM.cpg_agent.has_key(node): self.CM.cpg_agent[node].stop() self.CM.sam_agent[node].stop() if self.CM.votequorum_agent.has_key(node): self.CM.votequorum_agent[node].stop() ################################################################### class TestAgent(object): - def __init__(self, binary, node, port, env=None): + def __init__(self, binary, node, port, Env=None): self.node = node self.node_address = None self.port = port self.sock = None self.binary = binary self.started = False - self.rsh = RemoteExec(Env=env) + resh = RemoteFactory.rsh + self.rsh = RemoteExec(resh) self.func_name = None self.used = False - self.env = env + self.env = Env self.send_recv = False def restart(self): - self.env.debug('%s:%s restarting' % (self.node, self.binary)) + LogFactory().debug('%s:%s restarting' % (self.node, self.binary)) self.stop() self.start() def clean_start(self): if self.used or not self.status(): - self.env.debug('%s:%s cleaning' % (self.node, self.binary)) + LogFactory().debug('%s:%s cleaning' % (self.node, self.binary)) self.stop() self.start() def status(self): if not self.started: return False try: self.send_internal(["are_you_ok_dude"]) self.read() self.started = True return True except RuntimeError, msg: self.started = False return False def start(self): '''Set up the given ScenarioComponent''' if self.status(): return - self.env.debug('%s:%s starting ' % (self.node, self.binary)) + LogFactory().debug('%s:%s starting ' % (self.node, self.binary)) self.rsh(self.node, self.binary, synchronous=False) self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM) ip = socket.gethostbyname(self.node) is_connected = False retries = 0 while not is_connected: try: retries = retries + 1 self.sock.connect ((ip, self.port)) is_connected = True except socket.error, msg: if retries > 10: - self.env.log("%s:%s Tried connecting %d times. %s" % (self.node, self.binary, retries, str(msg))) + LogFactory().log("%s:%s Tried connecting %d times. %s" % (self.node, self.binary, retries, str(msg))) if retries > 30: raise RuntimeError("%s:%s can't connect" % (self.node, self.binary)) time.sleep(1) self.started = True self.used = False def stop(self): '''Tear down (undo) the given ScenarioComponent''' - self.env.debug('%s:%s stopping' % (self.binary, self.node)) + LogFactory().debug('%s:%s stopping' % (self.binary, self.node)) self.rsh(self.node, "killall " + self.binary + " 2>/dev/null") if self.sock: self.sock.close () del self.sock self.sock = None while self.getpid() != '': time.sleep(1) self.started = False def kill(self): '''Tear down (undo) the given ScenarioComponent''' - self.env.log('%s:%s killing' % (self.node, self.binary)) + LogFactory().log('%s:%s killing' % (self.node, self.binary)) self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null") self.started = False def getpid(self): return self.rsh(self.node, 'pidof ' + self.binary, 1) def send_internal(self, args): real_msg = str (len (args)) for a in args: a_str = str(a) real_msg += ":" + str (len (a_str)) + ":" + a_str real_msg += ";" try: return self.sock.send (real_msg) except socket.error, msg: - self.env.log("send(%s): %s; error: %s" % (self.node, real_msg, msg)) + LogFactory().log("send(%s): %s; error: %s" % (self.node, real_msg, msg)) return 0 def send (self, args): if not self.started: self.start() sent = self.send_internal(args) if sent == 0: raise RuntimeError ("socket connection broken") self.used = True def __getattribute__(self,name): try: return object.__getattribute__(self, name) except: self.func_name = name if self.send_recv: return self.send_recv_dynamic else: return self.send_dynamic def send_recv_dynamic (self, *args): self.send_dynamic (args) try: res = self.read () except RuntimeError, msg: res = None - self.env.log("send_recv_dynamic: %s(); error: %s" % (self.func_name, msg)) + LogFactory().log("send_recv_dynamic: %s(); error: %s" % (self.func_name, msg)) return res def send_dynamic (self, *args): if not self.started: raise RuntimeError ("agent not started") # number of args+func real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name for a in args: a_str = str(a) real_msg += ":" + str (len (a_str)) + ":" + a_str real_msg += ";" sent = 0 try: sent = self.sock.send (real_msg) except socket.error, msg: - self.env.log("send_dynamic(%s): %s; error: %s" % (self.node, real_msg, msg)) + LogFactory().log("send_dynamic(%s): %s; error: %s" % (self.node, real_msg, msg)) if sent == 0: raise RuntimeError ("socket connection broken") self.used = True def read (self): try: msg = self.sock.recv (4096) except socket.error, msg: raise RuntimeError(msg) if msg == '': raise RuntimeError("socket connection broken") return msg class CpgConfigEvent: def __init__(self, msg): info = msg.split(',') self.group_name = info[0] self.node_id = info[1] self.node = None self.pid = info[2] if "left" in info[3]: self.is_member = False else: self.is_member = True def __str__ (self): str = self.group_name + "," + self.node_id + "," + self.pid + "," if self.is_member: return str + "joined" else: return str + "left" ################################################################### class CpgTestAgent(TestAgent): def __init__(self, node, Env=None): - TestAgent.__init__(self, "cpg_test_agent", node, 9034, env=Env) + TestAgent.__init__(self, "cpg_test_agent", node, 9034, Env) self.nodeid = None def start(self): if not self.status(): TestAgent.start(self) self.cpg_initialize() self.used = False def cpg_local_get(self): if self.nodeid == None: self.send (["cpg_local_get"]) self.nodeid = self.read () return self.nodeid def record_config_events(self, truncate=True): if truncate: self.send (["record_config_events", "truncate"]) else: self.send (["record_config_events", "append"]) return self.read () def read_config_event(self): self.send (["read_config_event"]) msg = self.read () if "None" in msg: return None else: return CpgConfigEvent(msg) def read_messages(self, atmost): self.send (["read_messages", atmost]) msg = self.read () if "None" in msg: return None else: return msg def context_test(self): self.send (["context_test"]) return self.read () ################################################################### class SamTestAgent(TestAgent): def __init__(self, node, Env=None): - TestAgent.__init__(self, "sam_test_agent", node, 9036, env=Env) + TestAgent.__init__(self, "sam_test_agent", node, 9036, Env) self.nodeid = None self.send_recv = True ################################################################### class VoteQuorumTestAgent(TestAgent): def __init__(self, node, Env=None): - TestAgent.__init__(self, "votequorum_test_agent", node, 9037, env=Env) + TestAgent.__init__(self, "votequorum_test_agent", node, 9037, Env) self.nodeid = None self.send_recv = True AllAuditClasses = [] AllAuditClasses.append(LogAudit) AllAuditClasses.append(ShmLeakAudit) def CoroAuditList(cm): result = [] for auditclass in AllAuditClasses: a = auditclass(cm) if a.is_applicable(): result.append(a) return result