Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3152214
cman.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
11 KB
Referenced Files
None
Subscribers
None
cman.c
View Options
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include "gd_internal.h"
#include "libcman.h"
/* FIXME: should be in libcman.h */
#define BARRIER_SETATTR_TIMEOUT 6
extern struct list_head gd_nodes;
extern int gd_node_count;
extern int gd_member_count;
extern int gd_quorate;
extern int gd_nodeid;
extern int gd_barrier_time;
extern struct list_head gd_barriers;
static cman_handle_t ch;
static cman_node_t cluster_nodes[MAX_NODES];
static int cluster_count;
static int cluster_generation;
static int member_cb;
static int member_reason;
static int message_cb;
static int message_nodeid;
static int message_len;
static char message_buf[MAX_MSGLEN];
struct barrier_wait {
struct list_head list;
group_t *group;
char name[MAX_BARRIERLEN];
int type;
};
node_t *find_node(int nodeid)
{
node_t *node;
list_for_each_entry(node, &gd_nodes, list) {
if (node->id == nodeid)
return node;
}
return NULL;
}
static int wait_for_groupd(int nodeid)
{
cman_node_t cn;
int rv;
while (1) {
if (cman_is_listening(ch, nodeid, GROUPD_PORT)) {
rv = 0;
break;
}
rv = cman_get_node(ch, nodeid, &cn);
if (rv < 0) {
log_print("no status for new node %d", nodeid);
break;
}
if (!cn.cn_member) {
log_print("new member %d failed", nodeid);
rv = -1;
break;
}
log_print("waiting for groupd on new member %d", nodeid);
sleep(1);
}
return rv;
}
static cman_node_t *find_cluster_node(int nodeid)
{
int i;
for (i = 0; i < cluster_count; i++) {
if (cluster_nodes[i].cn_nodeid == nodeid)
return &cluster_nodes[i];
}
return NULL;
}
static int process_cluster_nodes(void)
{
node_t *node;
cman_node_t *cn;
int i, rv, sub = 0, add = 0;
/* find who's gone */
list_for_each_entry(node, &gd_nodes, list) {
cn = find_cluster_node(node->id);
if (cn && cn->cn_member) {
if (!test_bit(NFL_CLUSTER_MEMBER, &node->flags)) {
/* former member is back */
set_bit(NFL_CLUSTER_MEMBER, &node->flags);
node->incarnation = cn->cn_incarnation;
add++;
gd_member_count++;
log_debug("member re-added %d", node->id);
} else {
/* current member is still alive - if the
incarnation number is different it died and
returned between checks */
if (node->incarnation != cn->cn_incarnation) {
set_bit(NFL_NEED_RECOVERY,&node->flags);
node->incarnation = cn->cn_incarnation;
sub++;
log_debug("member in/out %d", node->id);
}
}
} else {
/* current member has died */
if (test_bit(NFL_CLUSTER_MEMBER, &node->flags)) {
clear_bit(NFL_CLUSTER_MEMBER, &node->flags);
set_bit(NFL_NEED_RECOVERY, &node->flags);
sub++;
gd_member_count--;
log_debug("member removed %d", node->id);
}
}
/* make it easier to find who's new next */
if (cn)
cn->cn_member = 0;
}
/* find who's new */
for (i = 0; i < cluster_count; i++) {
if (!cluster_nodes[i].cn_member)
continue;
/* this is a bit lame, but for now we require a new
member to start up groupd right away because we
don't have a good way of dealing with cluster
members who aren't running groupd. */
rv = wait_for_groupd(cluster_nodes[i].cn_nodeid);
if (rv)
continue;
node = new_node(cluster_nodes[i].cn_nodeid);
node->incarnation = cluster_nodes[i].cn_incarnation;
set_bit(NFL_CLUSTER_MEMBER, &node->flags);
add++;
list_add_tail(&node->list, &gd_nodes);
gd_node_count++;
gd_member_count++;
log_debug("member added %d", node->id);
}
return sub;
}
static void process_member(void)
{
cman_cluster_t info1, info2;
node_t *node;
int rv, quorate, count, gone;
/* FIXME: PORTCLOSED indicates the failure of a remote groupd.
We should treat this like the complete failure of that node */
retry:
rv = cman_get_cluster(ch, &info1);
if (rv < 0) {
log_print("cman_get_cluster error %d %d", rv, errno);
return;
}
quorate = cman_is_quorate(ch);
count = 0;
memset(&cluster_nodes, 0, sizeof(cluster_nodes));
rv = cman_get_nodes(ch, MAX_NODES, &count, cluster_nodes);
if (rv < 0) {
log_print("cman_get_nodes error %d %d", rv, errno);
return;
}
rv = cman_get_cluster(ch, &info2);
if (rv < 0) {
log_print("cman_get_cluster error %d %d", rv, errno);
return;
}
if (info1.ci_generation != info2.ci_generation) {
log_print("generation mismatch %d %d",
info1.ci_generation, info2.ci_generation);
sleep(1);
goto retry;
}
cluster_generation = info1.ci_generation;
gd_quorate = quorate;
cluster_count = count;
log_debug("member reason %d quorate %d generation %d nodes %d",
member_reason, quorate, cluster_generation, count);
/* Update our own gd_nodes list and determine if any nodes are gone.
If so, process_nodechange() applies these changes to groups */
gone = process_cluster_nodes();
if (gone > 0)
process_nodechange();
list_for_each_entry(node, &gd_nodes, list)
clear_bit(NFL_NEED_RECOVERY, &node->flags);
}
static void member_callback(cman_handle_t h, void *private, int reason, int arg)
{
log_in("member callback reason %d", reason);
member_cb = 1;
member_reason = reason;
}
static void message_callback(cman_handle_t h, void *private, char *buf,
int len, uint8_t port, int nodeid)
{
log_in("message callback nodeid %d len %d", nodeid, len);
message_cb = 1;
memcpy(message_buf, buf, len);
message_len = len;
message_nodeid = nodeid;
}
int process_member_message(void)
{
while (1) {
cman_dispatch(ch, CMAN_DISPATCH_ONE);
if (member_cb) {
member_cb = 0;
process_member();
} else if (message_cb) {
message_cb = 0;
process_message(message_buf, message_len,
message_nodeid);
} else
break;
}
return 0;
}
int setup_member_message(void)
{
cman_node_t node;
int rv, fd;
INIT_LIST_HEAD(&gd_barriers);
INIT_LIST_HEAD(&gd_nodes);
gd_node_count = 0;
gd_member_count = 0;
ch = cman_init(NULL);
if (!ch) {
log_print("cman_init error %d %d", (int) ch, errno);
return -ENOTCONN;
}
rv = cman_start_notification(ch, member_callback);
if (rv < 0) {
log_print("cman_start_notification error %d %d", rv, errno);
cman_finish(ch);
return rv;
}
rv = cman_start_recv_data(ch, message_callback, GROUPD_PORT);
if (rv < 0) {
log_print("cman_start_recv_data error %d %d", rv, errno);
cman_stop_notification(ch);
cman_finish(ch);
return rv;
}
fd = cman_get_fd(ch);
/* FIXME: wait here for us to be a member of the cluster */
rv = cman_get_node(ch, CMAN_NODEID_US, &node);
if (rv < 0) {
log_print("cman_get_node us error %d %d", rv, errno);
cman_end_recv_data(ch);
cman_stop_notification(ch);
cman_finish(ch);
fd = rv;
goto out;
}
gd_nodeid = node.cn_nodeid;
log_in("member our nodeid %d", gd_nodeid);
/* this will just initialize gd_nodes, etc */
member_reason = CMAN_REASON_STATECHANGE;
process_member();
out:
return fd;
}
int send_nodeid_message(char *buf, int len, int nodeid)
{
msg_t *msg = (msg_t *) buf;
int error = 0;
msg->ms_to_nodeid = nodeid;
if (nodeid == gd_nodeid) {
process_message(buf, len, nodeid);
goto out;
}
error = cman_send_data(ch, buf, len, 0, GROUPD_PORT, nodeid);
if (error < 0)
log_print("send_nodeid_message error %d to %u", error, nodeid);
else
error = 0;
out:
return error;
}
int send_broadcast_message(char *buf, int len)
{
int error;
error = cman_send_data(ch, buf, len, 0, GROUPD_PORT, 0);
if (error < 0)
log_print("send_broadcast_message error %d", error);
else
error = 0;
process_message(buf, len, gd_nodeid);
out:
return error;
}
int send_members_message(group_t *g, char *buf, int len)
{
node_t *node;
int error = 0;
list_for_each_entry(node, &g->memb, list) {
error = send_nodeid_message(buf, len, node->id);
if (error < 0)
log_group(g, "send to %d error %d", node->id, error);
}
return error;
}
int send_members_message_ev(group_t *g, char *buf, int len, event_t *ev)
{
int error;
msg_t *msg = (msg_t *) buf;
/* set_allowed_msgtype(sev, msg->ms_type); */
ev->reply_count = 0;
error = send_members_message(g, buf, len);
/*
if (error < 0)
clear_allowed_msgtype(sev, msg->ms_type);
*/
return error;
}
int send_broadcast_message_ev(char *buf, int len, event_t *ev)
{
int error;
msg_t *msg = (msg_t *) buf;
/* set_allowed_msgtype(sev, msg->ms_type); */
ev->reply_count = 0;
error = send_broadcast_message(buf, len);
/*
if (error < 0)
clear_allowed_msgtype(sev, msg->ms_type);
*/
return error;
}
int do_barrier(group_t *g, char *name, int count, int type)
{
struct barrier_wait *bw;
int error;
error = cman_barrier_register(ch, name, 0, count);
if (error < 0)
return error;
cman_barrier_change(ch, name, BARRIER_SETATTR_TIMEOUT, gd_barrier_time);
log_group(g, "do_barrier count %d type %d: %s", count, type, name);
error = cman_barrier_wait(ch, name);
log_group(g, "do_barrier error %d errno %d", error, errno);
if (!error)
cman_barrier_delete(ch, name);
else if (error == -1 && (errno == ETIMEDOUT || errno == ESRCH)) {
error = 1;
bw = malloc(sizeof(struct barrier_wait));
if (!bw) {
cman_barrier_delete(ch, name);
return -ENOMEM;
}
bw->group = g;
memcpy(bw->name, name, MAX_BARRIERLEN);
bw->type = type;
list_add(&bw->list, &gd_barriers);
} else {
log_error(g, "cman_barrier_wait errno %d", errno);
cman_barrier_delete(ch, name);
}
return error;
}
void cancel_recover_barrier(group_t *g)
{
cman_barrier_delete(ch, g->recover_barrier);
}
void cancel_update_barrier(group_t *g)
{
update_t *up = g->update;
char bname[MAX_BARRIERLEN];
clear_bit(UFL_ALLOW_BARRIER, &up->flags);
memset(bname, 0, MAX_BARRIERLEN);
snprintf(bname, MAX_BARRIERLEN, "sm.%u.%u.%u.%u",
g->global_id, up->nodeid, up->remote_seid, g->memb_count);
cman_barrier_delete(ch, bname);
}
static void complete_startdone_barrier_new(group_t *g, int status)
{
event_t *ev = g->event;
if (!test_bit(EFL_ALLOW_BARRIER, &ev->flags)) {
log_group(g, "ignore barrier complete status %d", status);
return;
}
clear_bit(EFL_ALLOW_BARRIER, &ev->flags);
ev->barrier_status = status;
ev->state = EST_BARRIER_DONE;
}
static void complete_startdone_barrier(group_t *g, int status)
{
update_t *up = g->update;
if (!test_bit(UFL_ALLOW_BARRIER, &up->flags)) {
log_group(g, "ignore barrier complete status %d", status);
return;
}
clear_bit(UFL_ALLOW_BARRIER, &up->flags);
up->barrier_status = status;
up->state = UST_BARRIER_DONE;
}
static void complete_recovery_barrier(group_t *g, int status)
{
if (status) {
log_error(g, "complete_recovery_barrier status=%d", status);
return;
}
if (g->state != GST_RECOVER || g->recover_state != RECOVER_BARRIERWAIT){
log_error(g, "complete_recovery_barrier state %d recover %d",
g->state, g->recover_state);
return;
}
if (!g->recover_stop)
g->recover_state = RECOVER_STOP;
else
g->recover_state = RECOVER_BARRIERDONE;
}
int process_barriers(void)
{
struct barrier_wait *bw, *safe;
int error, rv = 0;
list_for_each_entry_safe(bw, safe, &gd_barriers, list) {
log_group(bw->group, "barrier_wait: %s", bw->name);
error = cman_barrier_wait(ch, bw->name);
log_group(bw->group, "barrier_wait error %d errno %d",
error, errno);
if (!error) {
list_del(&bw->list);
switch (bw->type) {
case GD_BARRIER_STARTDONE:
complete_startdone_barrier(bw->group, 0);
break;
case GD_BARRIER_STARTDONE_NEW:
complete_startdone_barrier_new(bw->group, 0);
break;
case GD_BARRIER_RECOVERY:
complete_recovery_barrier(bw->group, 0);
break;
}
cman_barrier_delete(ch, bw->name);
free(bw);
rv++;
} else {
if (errno != ETIMEDOUT && errno != ESRCH)
log_error(bw->group, "barrier errno %d", errno);
}
}
return rv;
}
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Mon, Feb 24, 5:52 PM (3 h, 34 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464384
Default Alt Text
cman.c (11 KB)
Attached To
Mode
rF Fence Agents
Attached
Detach File
Event Timeline
Log In to Comment