Page MenuHomeClusterLabs Projects

corosync.py
No OneTemporary

corosync.py

'''CTS: Cluster Testing System: corosync...
'''
__copyright__='''
Copyright (c) 2010 Red Hat, Inc.
'''
# All rights reserved.
#
# Author: Angus Salkeld <asalkeld@redhat.com>
#
# This software licensed under BSD license, the text of which follows:
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# - Neither the name of the MontaVista Software, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import time
import socket
import shutil
import string
import augeas
from cts.CTS import ClusterManager
from cts.CTSscenarios import ScenarioComponent
from cts.CTS import RemoteExec
from cts.CTSvars import CTSvars
###################################################################
class CoroConfig(object):
def __init__(self, corobase=None):
self.base = "/files/etc/corosync/corosync.conf/"
self.new_root = "/tmp/aug-root/"
if corobase == None:
self.corobase = os.getcwd() + "/.."
else:
self.corobase = corobase
example = self.corobase + "/conf/corosync.conf.example"
if os.path.isdir(self.new_root):
shutil.rmtree (self.new_root)
os.makedirs (self.new_root + "/etc/corosync")
shutil.copy (example, self.new_root + "/etc/corosync/corosync.conf")
self.aug = augeas.Augeas (root=self.new_root,
loadpath=self.corobase + "/conf/lenses")
self.original = {}
# store the original values (of totem), so we can restore them in
# apply_default_config()
totem = self.aug.match('/files/etc/corosync/corosync.conf/totem/*')
for c in totem:
# /files/etc/corosync/corosync.conf/
short_name = c[len(self.base):]
self.original[short_name] = self.aug.get(c)
interface = self.aug.match('/files/etc/corosync/corosync.conf/totem/interface/*')
for c in interface:
short_name = c[len(self.base):]
self.original[short_name] = self.aug.get(c)
def get (self, name):
return self.aug.get (self.base + name)
def set (self, name, value):
token = self.aug.set (self.base + name, str(value))
def save (self):
self.aug.save()
def get_filename(self):
return self.new_root + "/etc/corosync/corosync.conf"
###################################################################
class corosync_flatiron(ClusterManager):
'''
bla
'''
def __init__(self, Environment, randseed=None):
ClusterManager.__init__(self, Environment, randseed)
self.update({
"Name" : "corosync(flatiron)",
"StartCmd" : CTSvars.INITDIR+"/corosync start",
"StopCmd" : CTSvars.INITDIR+"/corosync stop",
"RereadCmd" : CTSvars.INITDIR+"/corosync reload",
"StatusCmd" : CTSvars.INITDIR+"/corosync status %s",
"DeadTime" : 30,
"StartTime" : 15, # Max time to start up
"StableTime" : 10,
"BreakCommCmd" : "/usr/share/corosync/tests/net_breaker.sh BreakCommCmd %s",
"FixCommCmd" : "/usr/share/corosync/tests/net_breaker.sh FixCommCmd %s",
"Pat:We_stopped" : "%s.*Corosync Cluster Engine exiting with status.*",
"Pat:They_stopped" : "%s.*Member left:.*%s.*",
"Pat:They_dead" : "corosync:.*Node %s is now: lost",
"Pat:Local_starting" : "%s.*started and ready to provide service.",
"Pat:Local_started" : "%s.*started and ready to provide service.",
"Pat:Master_started" : "%s.*Completed service synchronization, ready to provide service.",
"Pat:Slave_started" : "%s.*Completed service synchronization, ready to provide service.",
"Pat:ChildKilled" : "%s corosync.*Child process %s terminated with signal 9",
"Pat:ChildRespawn" : "%s corosync.*Respawning failed child process: %s",
"Pat:ChildExit" : "Child process .* exited",
"Pat:DC_IDLE" : ".*A processor joined or left the membership and a new membership was formed.",
# Bad news Regexes. Should never occur.
"BadRegexes" : (
r"ERROR:",
r"CRIT:",
r"Shutting down\.",
r"Forcing shutdown\.",
r"core dump",
r"Could not bind AF_UNIX",
),
"LogFileName" : Environment["LogFileName"],
})
self.start_cpg = True
self.cpg_agent = {}
self.confdb_agent = {}
self.sam_agent = {}
self.votequorum_agent = {}
self.config = CoroConfig ()
self.node_to_ip = {}
self.new_config = {}
self.new_config['service[1]/name'] = 'corosync_tst_sv2'
self.new_config['service[1]/ver'] = '0'
self.applied_config = {}
for n in self.Env["nodes"]:
ip = socket.gethostbyname(n)
ips = ip.split('.')
ips[3] = '0'
ip_mask = '.'.join(ips)
self.new_config['totem/interface/bindnetaddr'] = str(ip_mask)
return
def apply_default_config(self):
for c in self.applied_config:
if 'bindnetaddr' in c:
continue
elif not self.config.original.has_key(c):
# new config option (non default)
pass
elif self.applied_config[c] is not self.config.original[c]:
# reset to the original
self.new_config[c] = self.config.original[c]
if len(self.new_config) > 0:
self.debug('applying default config')
self.stopall()
def apply_new_config(self):
if len(self.new_config) > 0:
self.debug('applying new config')
self.stopall()
self.startall()
def install_all_config(self):
tmp1 = {}
for c in self.new_config:
self.log('configuring: ' + c + ' = '+ str(self.new_config[c]))
self.config.set (c, self.new_config[c])
self.applied_config[c] = self.new_config[c]
tmp1[c] = self.new_config[c]
for c in tmp1:
del self.new_config[c]
self.config.save()
src_file = self.config.get_filename()
for node in self.Env["nodes"]:
self.rsh.cp(src_file, "%s:%s" % (node, "/etc/corosync/"))
def install_config(self, node):
# install gets new_config and installs it, then moves the
# config to applied_config
if len(self.new_config) > 0:
self.install_all_config()
def key_for_node(self, node):
if not self.node_to_ip.has_key(node):
self.node_to_ip[node] = socket.gethostbyname (node)
return self.node_to_ip[node]
def StartaCM(self, node):
if not self.ShouldBeStatus.has_key(node):
self.ShouldBeStatus[node] = "down"
if self.ShouldBeStatus[node] != "down":
return 1
self.debug('starting corosync on : ' + node)
ret = ClusterManager.StartaCM(self, node)
if self.start_cpg:
if self.cpg_agent.has_key(node):
self.cpg_agent[node].restart()
else:
self.cpg_agent[node] = CpgTestAgent(node, self.Env)
self.cpg_agent[node].start()
if self.confdb_agent.has_key(node):
self.confdb_agent[node].restart()
if self.sam_agent.has_key(node):
self.sam_agent[node].restart()
# votequorum agent started as needed.
if self.applied_config.has_key('quorum/provider'):
if self.applied_config['quorum/provider'] is 'corosync_votequorum':
if self.votequorum_agent.has_key(node):
self.votequorum_agent[node].restart()
else:
self.votequorum_agent[node] = VoteQuorumTestAgent(node, self.Env)
self.votequorum_agent[node].start()
return ret
def StopaCM(self, node):
if self.ShouldBeStatus[node] != "up":
return 1
self.debug('stoping corosync on : ' + node)
if self.cpg_agent.has_key(node):
self.cpg_agent[node].stop()
if self.sam_agent.has_key(node):
self.sam_agent[node].stop()
if self.votequorum_agent.has_key(node):
self.votequorum_agent[node].stop()
return ClusterManager.StopaCM(self, node)
def test_node_CM(self, node):
# 2 - up and stable
# 1 - unstable
# 0 - down
out = self.rsh(node, self["StatusCmd"], 1)
is_stopped = string.find(out, 'stopped')
is_dead = string.find(out, 'dead')
ret = (is_dead is -1 and is_stopped is -1)
try:
if ret:
ret = 2
if self.ShouldBeStatus[node] == "down":
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, "up", self.ShouldBeStatus[node]))
else:
if self.ShouldBeStatus[node] == "up":
self.log(
"Node status for %s is %s but we think it should be %s"
% (node, "down", self.ShouldBeStatus[node]))
except KeyError: pass
if ret: self.ShouldBeStatus[node] = "up"
else: self.ShouldBeStatus[node] = "down"
return ret
def StataCM(self, node):
'''Report the status of corosync on a given node'''
if self.test_node_CM(node) > 0:
return 1
else:
return None
def RereadCM(self, node):
self.log('reloading corosync on : ' + node)
return ClusterManager.RereadCM(self, node)
def find_partitions(self):
ccm_partitions = []
return ccm_partitions
def prepare(self):
'''Finish the Initialization process. Prepare to test...'''
self.partitions_expected = 1
for node in self.Env["nodes"]:
self.ShouldBeStatus[node] = ""
self.unisolate_node(node)
self.StataCM(node)
def HasQuorum(self, node_list):
# If we are auditing a partition, then one side will
# have quorum and the other not.
# So the caller needs to tell us which we are checking
# If no value for node_list is specified... assume all nodes
if not node_list:
node_list = self.Env["nodes"]
for node in node_list:
if self.ShouldBeStatus[node] == "up":
quorum = self.rsh(node, self["QuorumCmd"], 1)
if string.find(quorum, "1") != -1:
return 1
elif string.find(quorum, "0") != -1:
return 0
else:
self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum)
return 0
def Components(self):
return None
###################################################################
class TestAgentComponent(ScenarioComponent):
def __init__(self, Env):
self.Env = Env
def IsApplicable(self):
'''Return TRUE if the current ScenarioComponent is applicable
in the given LabEnvironment given to the constructor.
'''
return True
def SetUp(self, CM):
'''Set up the given ScenarioComponent'''
self.CM = CM
for node in self.Env["nodes"]:
if not CM.StataCM(node):
raise RuntimeError ("corosync not up")
if self.CM.start_cpg:
self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
self.CM.cpg_agent[node].start()
self.CM.confdb_agent[node] = ConfdbTestAgent(node, CM.Env)
self.CM.confdb_agent[node].start()
self.CM.sam_agent[node] = SamTestAgent(node, CM.Env)
self.CM.sam_agent[node].start()
# votequorum agent started as needed.
if self.CM.applied_config.has_key('quorum/provider'):
if CM.applied_config['quorum/provider'] is 'corosync_votequorum':
self.CM.votequorum_agent[node] = VoteQuorumTestAgent(node, CM.Env)
self.CM.votequorum_agent[node].start()
return 1
def TearDown(self, CM):
'''Tear down (undo) the given ScenarioComponent'''
self.CM = CM
for node in self.Env["nodes"]:
if self.CM.cpg_agent.has_key(node):
self.CM.cpg_agent[node].stop()
self.CM.confdb_agent[node].stop()
self.CM.sam_agent[node].stop()
if self.CM.votequorum_agent.has_key(node):
self.CM.votequorum_agent[node].stop()
###################################################################
class TestAgent(object):
def __init__(self, binary, node, port, env=None):
self.node = node
self.node_address = None
self.port = port
self.sock = None
self.binary = binary
self.started = False
self.rsh = RemoteExec(Env=env)
self.func_name = None
self.used = False
self.env = env
self.send_recv = False
def restart(self):
self.stop()
self.start()
def clean_start(self):
if self.used or not self.status():
self.env.debug('test agent: cleaning %s on node %s' % (self.binary, self.node))
self.stop()
self.start()
def status(self):
if not self.started:
return False
try:
self.send (["are_you_ok_dude"])
self.read ()
self.started = True
return True
except RuntimeError, msg:
self.started = False
return False
def start(self):
'''Set up the given ScenarioComponent'''
self.env.debug('test agent: starting %s on node %s' % (self.binary, self.node))
self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
ip = socket.gethostbyname(self.node)
self.rsh(self.node, self.binary, blocking=0)
is_connected = False
retries = 0
while not is_connected:
try:
retries = retries + 1
self.sock.connect ((ip, self.port))
is_connected = True
except socket.error, msg:
if retries > 5:
self.env.log("Retried " + str(retries) + " times. Error: " + str(msg))
time.sleep(1)
self.started = True
self.used = False
def stop(self):
'''Tear down (undo) the given ScenarioComponent'''
self.env.debug('test agent: stopping %s on node %s' % (self.binary, self.node))
self.sock.close ()
self.rsh(self.node, "killall " + self.binary + " 2>/dev/null")
self.started = False
def kill(self):
'''Tear down (undo) the given ScenarioComponent'''
self.env.debug('test agent: killing %s on node %s' % (self.binary, self.node))
self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null")
self.started = False
def getpid(self):
return self.rsh(self.node, 'pidof ' + self.binary, 1)
def send (self, args):
if not self.started:
self.start()
real_msg = str (len (args))
for a in args:
a_str = str(a)
real_msg += ":" + str (len (a_str)) + ":" + a_str
real_msg += ";"
sent = 0
try:
sent = self.sock.send (real_msg)
except socket.error, msg:
self.env.debug("send(%s): %s; error: %s" % (self.node, real_msg, msg))
if sent == 0:
raise RuntimeError ("socket connection broken")
self.used = True
def __getattribute__(self,name):
try:
return object.__getattribute__(self, name)
except:
self.func_name = name
if self.send_recv:
return self.send_recv_dynamic
else:
return self.send_dynamic
def send_recv_dynamic (self, *args):
self.send_dynamic (args)
try:
res = self.read ()
except RuntimeError, msg:
self.env.log("send_recv_dynamic: %s(); error: %s" % (self.func_name, msg))
return res
def send_dynamic (self, *args):
if not self.started:
self.start()
# number of args+func
real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name
for a in args:
a_str = str(a)
real_msg += ":" + str (len (a_str)) + ":" + a_str
real_msg += ";"
sent = 0
try:
sent = self.sock.send (real_msg)
except socket.error, msg:
self.env.debug("send_dynamic(%s): %s; error: %s" % (self.node, real_msg, msg))
if sent == 0:
raise RuntimeError ("socket connection broken")
self.used = True
def read (self):
msg = self.sock.recv (4096)
if msg == '':
raise RuntimeError("socket connection broken")
return msg
class CpgConfigEvent:
def __init__(self, msg):
info = msg.split(',')
self.group_name = info[0]
self.node_id = info[1]
self.node = None
self.pid = info[2]
if "left" in info[3]:
self.is_member = False
else:
self.is_member = True
def __str__ (self):
str = self.group_name + "," + self.node_id + "," + self.pid + ","
if self.is_member:
return str + "joined"
else:
return str + "left"
###################################################################
class CpgTestAgent(TestAgent):
def __init__(self, node, Env=None):
TestAgent.__init__(self, "cpg_test_agent", node, 9034, env=Env)
self.initialized = False
self.nodeid = None
def start(self):
if not self.started:
TestAgent.start(self)
self.cpg_initialize()
self.used = False
def stop(self):
try:
if self.started:
self.cpg_finalize()
except RuntimeError, msg:
# if cpg_agent is down, we are not going to stress
self.env.debug("CpgTestAgent::cpg_finalize() - %s" % msg)
TestAgent.stop(self)
def cpg_local_get(self):
if self.nodeid == None:
self.send (["cpg_local_get"])
self.nodeid = self.read ()
return self.nodeid
def record_config_events(self, truncate=True):
if truncate:
self.send (["record_config_events", "truncate"])
else:
self.send (["record_config_events", "append"])
return self.read ()
def read_config_event(self):
self.send (["read_config_event"])
msg = self.read ()
if "None" in msg:
return None
else:
return CpgConfigEvent(msg)
def read_messages(self, atmost):
self.send (["read_messages", atmost])
msg = self.read ()
if "None" in msg:
return None
else:
return msg
def context_test(self):
self.send (["context_test"])
return self.read ()
###################################################################
class ConfdbTestAgent(TestAgent):
def __init__(self, node, Env=None):
TestAgent.__init__(self, "confdb_test_agent", node, 9035, env=Env)
self.initialized = False
self.nodeid = None
self.send_recv = True
###################################################################
class SamTestAgent(TestAgent):
def __init__(self, node, Env=None):
TestAgent.__init__(self, "sam_test_agent", node, 9036, env=Env)
self.initialized = False
self.nodeid = None
self.send_recv = True
###################################################################
class VoteQuorumTestAgent(TestAgent):
def __init__(self, node, Env=None):
TestAgent.__init__(self, "votequorum_test_agent", node, 9037, env=Env)
self.initialized = False
self.nodeid = None
self.send_recv = True
def start(self):
if not self.started:
TestAgent.start(self)
self.init()
self.used = False

File Metadata

Mime Type
text/x-script.python
Expires
Sat, Jan 25, 6:51 AM (1 d, 8 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1312251
Default Alt Text
corosync.py (21 KB)

Event Timeline