Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1842018
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
114 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/group/dlm_controld/cpg.c b/group/dlm_controld/cpg.c
index a62520a2f..8e5b0b037 100644
--- a/group/dlm_controld/cpg.c
+++ b/group/dlm_controld/cpg.c
@@ -1,1730 +1,1727 @@
#include "dlm_daemon.h"
#include "config.h"
uint32_t cpgname_to_crc(const char *data, int len);
int message_flow_control_on;
static cpg_handle_t daemon_handle;
static unsigned int protocol_active[3] = {1, 0, 0};
struct member {
struct list_head list;
int nodeid;
int start; /* 1 if we received a start message for this change */
int added; /* 1 if added by this change */
int failed; /* 1 if failed in this change */
int disallowed;
uint32_t start_flags;
};
struct node {
struct list_head list;
int nodeid;
int check_fencing;
int check_quorum;
int check_fs;
int fs_notified;
uint64_t add_time;
uint32_t added_seq; /* for queries */
uint32_t removed_seq; /* for queries */
int failed_reason; /* for queries */
};
/* One of these change structs is created for every confchg a cpg gets. */
#define CGST_WAIT_CONDITIONS 1
#define CGST_WAIT_MESSAGES 2
struct change {
struct list_head list;
struct list_head members;
struct list_head removed; /* nodes removed by this change */
int member_count;
int joined_count;
int remove_count;
int failed_count;
int state;
int we_joined;
uint32_t seq; /* used as a reference for debugging, and for queries */
uint32_t combined_seq; /* for queries */
};
struct ls_info {
uint32_t ls_info_size;
uint32_t id_info_size;
uint32_t id_info_count;
uint32_t started_count;
int member_count;
int joined_count;
int remove_count;
int failed_count;
};
struct id_info {
int nodeid;
};
static void ls_info_in(struct ls_info *li)
{
li->ls_info_size = le32_to_cpu(li->ls_info_size);
li->id_info_size = le32_to_cpu(li->id_info_size);
li->id_info_count = le32_to_cpu(li->id_info_count);
li->started_count = le32_to_cpu(li->started_count);
li->member_count = le32_to_cpu(li->member_count);
li->joined_count = le32_to_cpu(li->joined_count);
li->remove_count = le32_to_cpu(li->remove_count);
li->failed_count = le32_to_cpu(li->failed_count);
}
static void id_info_in(struct id_info *id)
{
id->nodeid = le32_to_cpu(id->nodeid);
}
static void ids_in(struct ls_info *li, struct id_info *ids)
{
struct id_info *id;
int i;
id = ids;
for (i = 0; i < li->id_info_count; i++) {
id_info_in(id);
id = (struct id_info *)((char *)id + li->id_info_size);
}
}
char *msg_name(int type)
{
switch (type) {
case DLM_MSG_START:
return "start";
case DLM_MSG_PLOCK:
return "plock";
case DLM_MSG_PLOCK_OWN:
return "plock_own";
case DLM_MSG_PLOCK_DROP:
return "plock_drop";
case DLM_MSG_PLOCK_SYNC_LOCK:
return "plock_sync_lock";
case DLM_MSG_PLOCK_SYNC_WAITER:
return "plock_sync_waiter";
case DLM_MSG_PLOCKS_STORED:
return "plocks_stored";
case DLM_MSG_DEADLK_CYCLE_START:
return "deadlk_cycle_start";
case DLM_MSG_DEADLK_CYCLE_END:
return "deadlk_cycle_end";
case DLM_MSG_DEADLK_CHECKPOINT_READY:
return "deadlk_checkpoint_ready";
case DLM_MSG_DEADLK_CANCEL_LOCK:
return "deadlk_cancel_lock";
default:
return "unknown";
}
}
static int _send_message(cpg_handle_t h, void *buf, int len, int type)
{
struct iovec iov;
cpg_error_t error;
int retries = 0;
iov.iov_base = buf;
iov.iov_len = len;
retry:
error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
if (error == CPG_ERR_TRY_AGAIN) {
retries++;
usleep(1000);
if (!(retries % 100))
log_error("cpg_mcast_joined retry %d %s",
retries, msg_name(type));
goto retry;
}
if (error != CPG_OK) {
log_error("cpg_mcast_joined error %d handle %llx %s",
error, (unsigned long long)h, msg_name(type));
return -1;
}
if (retries)
log_debug("cpg_mcast_joined retried %d %s",
retries, msg_name(type));
return 0;
}
/* header fields caller needs to set: type, to_nodeid, flags, msgdata */
void dlm_send_message(struct lockspace *ls, char *buf, int len)
{
struct dlm_header *hd = (struct dlm_header *) buf;
int type = hd->type;
hd->version[0] = cpu_to_le16(protocol_active[0]);
hd->version[1] = cpu_to_le16(protocol_active[1]);
hd->version[2] = cpu_to_le16(protocol_active[2]);
hd->type = cpu_to_le16(hd->type);
hd->nodeid = cpu_to_le32(our_nodeid);
hd->to_nodeid = cpu_to_le32(hd->to_nodeid);
hd->global_id = cpu_to_le32(ls->global_id);
hd->flags = cpu_to_le32(hd->flags);
hd->msgdata = cpu_to_le32(hd->msgdata);
_send_message(ls->cpg_handle, buf, len, type);
}
static struct member *find_memb(struct change *cg, int nodeid)
{
struct member *memb;
list_for_each_entry(memb, &cg->members, list) {
if (memb->nodeid == nodeid)
return memb;
}
return NULL;
}
static struct lockspace *find_ls_handle(cpg_handle_t h)
{
struct lockspace *ls;
list_for_each_entry(ls, &lockspaces, list) {
if (ls->cpg_handle == h)
return ls;
}
return NULL;
}
static struct lockspace *find_ls_ci(int ci)
{
struct lockspace *ls;
list_for_each_entry(ls, &lockspaces, list) {
if (ls->cpg_client == ci)
return ls;
}
return NULL;
}
static void free_cg(struct change *cg)
{
struct member *memb, *safe;
list_for_each_entry_safe(memb, safe, &cg->members, list) {
list_del(&memb->list);
free(memb);
}
list_for_each_entry_safe(memb, safe, &cg->removed, list) {
list_del(&memb->list);
free(memb);
}
free(cg);
}
static void free_ls(struct lockspace *ls)
{
struct change *cg, *cg_safe;
struct node *node, *node_safe;
list_for_each_entry_safe(cg, cg_safe, &ls->changes, list) {
list_del(&cg->list);
free_cg(cg);
}
if (ls->started_change)
free_cg(ls->started_change);
list_for_each_entry_safe(node, node_safe, &ls->node_history, list) {
list_del(&node->list);
free(node);
}
free(ls);
}
/* Problem scenario:
nodes A,B,C are in fence domain
node C has gfs foo mounted
node C fails
nodes A,B begin fencing C (slow, not completed)
node B mounts gfs foo
We may end up having gfs foo mounted and being used on B before
C has been fenced. C could wake up corrupt fs.
So, we need to prevent any new gfs mounts while there are any
outstanding, incomplete fencing operations.
We also need to check that the specific failed nodes we know about have
been fenced (since fenced may not even have been notified that the node
has failed yet).
So, check that:
1. has fenced fenced the node after it joined this lockspace?
2. fenced has no outstanding fencing ops
For 1:
- record the time of the first good start message we see from node X
- node X fails
- wait for X to be removed from all dlm cpg's (probably not necessary)
- check that the fencing time is later than the recorded time above
Tracking fencing state when there are spurious partitions/merges...
from a spurious leave/join of node X, a lockspace will see:
- node X is a lockspace member
- node X fails, may be waiting for all cpgs to see failure or for fencing to
complete
- node X joins the lockspace - we want to process the change as usual, but
don't want to disrupt the code waiting for the fencing, and we want to
continue running properly once the remerged node is properly reset
ls->node_history
when we see a node not in this list, add entry for it with zero add_time
record the time we get a good start message from the node, add_time
clear add_time if the node leaves
if node fails with non-zero add_time, set check_fencing
when a node is fenced, clear add_time and clear check_fencing
if a node remerges after this, no good start message, no new add_time set
if a node fails with zero add_time, it doesn't need fencing
if a node remerges before it's been fenced, no good start message, no new
add_time set
*/
static struct node *get_node_history(struct lockspace *ls, int nodeid)
{
struct node *node;
list_for_each_entry(node, &ls->node_history, list) {
if (node->nodeid == nodeid)
return node;
}
return NULL;
}
static void node_history_init(struct lockspace *ls, int nodeid,
struct change *cg)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (node)
goto out;
node = malloc(sizeof(struct node));
if (!node)
return;
memset(node, 0, sizeof(struct node));
node->nodeid = nodeid;
node->add_time = 0;
list_add_tail(&node->list, &ls->node_history);
out:
node->added_seq = cg->seq; /* for queries */
}
static void node_history_start(struct lockspace *ls, int nodeid)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (!node) {
log_error("node_history_start no nodeid %d", nodeid);
return;
}
node->add_time = time(NULL);
}
static void node_history_left(struct lockspace *ls, int nodeid,
struct change *cg)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (!node) {
log_error("node_history_left no nodeid %d", nodeid);
return;
}
node->add_time = 0;
node->removed_seq = cg->seq; /* for queries */
}
static void node_history_fail(struct lockspace *ls, int nodeid,
struct change *cg, int reason)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (!node) {
log_error("node_history_fail no nodeid %d", nodeid);
return;
}
if (cfgd_enable_fencing && !node->add_time)
node->check_fencing = 1;
/* fenced will take care of making sure the quorum value
is adjusted for all the failures */
if (cfgd_enable_quorum && !cfgd_enable_fencing)
node->check_quorum = 1;
node->check_fs = 1;
node->removed_seq = cg->seq; /* for queries */
node->failed_reason = reason; /* for queries */
}
static int check_fencing_done(struct lockspace *ls)
{
struct node *node;
uint64_t last_fenced_time;
int in_progress, wait_count = 0;
int rv;
if (!cfgd_enable_fencing)
return 1;
list_for_each_entry(node, &ls->node_history, list) {
if (!node->check_fencing)
continue;
/* check with fenced to see if the node has been
fenced since node->add_time */
rv = fence_node_time(node->nodeid, &last_fenced_time);
if (rv < 0)
log_error("fenced_node_info error %d", rv);
if (last_fenced_time > node->add_time) {
node->check_fencing = 0;
node->add_time = 0;
} else {
log_group(ls, "check_fencing %d needs fencing",
node->nodeid);
wait_count++;
}
}
if (wait_count)
return 0;
/* now check if there are any outstanding fencing ops (for nodes
we may not have seen in any lockspace), and return 0 if there
are any */
rv = fence_in_progress(&in_progress);
if (rv < 0) {
log_error("fenced_domain_info error %d", rv);
return 0;
}
if (in_progress)
return 0;
return 1;
}
static int check_quorum_done(struct lockspace *ls)
{
struct node *node;
int wait_count = 0;
if (!cfgd_enable_quorum)
return 1;
/* wait for quorum system (cman) to see all the same nodes failed, so
we know that cluster_quorate is adjusted for the same failures we've
seen (see comment in fenced about the assumption here) */
list_for_each_entry(node, &ls->node_history, list) {
if (!node->check_quorum)
continue;
if (!is_cluster_member(node->nodeid)) {
node->check_quorum = 0;
} else {
- log_group(ls, "check_quorum %d is_cluster_member",
+ log_group(ls, "check_quorum nodeid %d is_cluster_member",
node->nodeid);
wait_count++;
}
}
if (wait_count)
return 0;
if (!cluster_quorate) {
log_group(ls, "check_quorum not quorate");
return 0;
}
log_group(ls, "check_quorum done");
return 1;
}
/* wait for local fs_controld to ack each failed node */
static int check_fs_done(struct lockspace *ls)
{
struct node *node;
int wait_count = 0;
/* no corresponding fs for this lockspace */
if (!ls->fs_registered)
return 1;
list_for_each_entry(node, &ls->node_history, list) {
if (!node->check_fs)
continue;
if (node->fs_notified) {
node->check_fs = 0;
} else {
- log_group(ls, "check_fs %d needs fs notify",
+ log_group(ls, "check_fs nodeid %d needs fs notify",
node->nodeid);
wait_count++;
}
}
if (wait_count)
return 0;
log_group(ls, "check_fs done");
return 1;
}
static int member_ids[MAX_NODES];
static int member_count;
static int renew_ids[MAX_NODES];
static int renew_count;
static void format_member_ids(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
memset(member_ids, 0, sizeof(member_ids));
member_count = 0;
list_for_each_entry(memb, &cg->members, list)
member_ids[member_count++] = memb->nodeid;
}
/* list of nodeids that have left and rejoined since last start_kernel;
is any member of startcg in the left list of any other cg's?
(if it is, then it presumably must be flagged added in another) */
static void format_renew_ids(struct lockspace *ls)
{
struct change *cg, *startcg;
struct member *memb, *leftmemb;
startcg = list_first_entry(&ls->changes, struct change, list);
memset(renew_ids, 0, sizeof(renew_ids));
renew_count = 0;
list_for_each_entry(memb, &startcg->members, list) {
list_for_each_entry(cg, &ls->changes, list) {
if (cg == startcg)
continue;
list_for_each_entry(leftmemb, &cg->removed, list) {
if (memb->nodeid == leftmemb->nodeid) {
renew_ids[renew_count++] = memb->nodeid;
}
}
}
}
}
static void start_kernel(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
if (!ls->kernel_stopped) {
- log_error("start_kernel %u not stopped", cg->seq);
+ log_error("start_kernel cg %u not stopped", cg->seq);
return;
}
- log_group(ls, "start_kernel %u member_count %d",
+ log_group(ls, "start_kernel cg %u member_count %d",
cg->seq, cg->member_count);
/* needs to happen before setting control which starts recovery */
if (ls->joining)
set_sysfs_id(ls->name, ls->global_id);
format_member_ids(ls);
format_renew_ids(ls);
set_configfs_members(ls->name, member_count, member_ids,
renew_count, renew_ids);
set_sysfs_control(ls->name, 1);
ls->kernel_stopped = 0;
if (ls->joining) {
set_sysfs_event_done(ls->name, 0);
ls->joining = 0;
}
}
static void stop_kernel(struct lockspace *ls, uint32_t seq)
{
if (!ls->kernel_stopped) {
- log_group(ls, "stop_kernel %u", seq);
+ log_group(ls, "stop_kernel cg %u", seq);
set_sysfs_control(ls->name, 0);
ls->kernel_stopped = 1;
}
}
/* the first condition is that the local lockspace is stopped which we
don't need to check for because stop_kernel(), which is synchronous,
was done when the change was created */
static int wait_conditions_done(struct lockspace *ls)
{
/* the fencing/quorum/fs conditions need to account for all the changes
that have occured since the last change applied to dlm-kernel, not
just the latest change */
if (!check_fencing_done(ls)) {
poll_fencing = 1;
return 0;
}
poll_fencing = 0;
/* even though fencing also waits for quorum, checking fencing isn't
sufficient because we don't want to start new lockspaces in an
inquorate cluster */
if (!check_quorum_done(ls)) {
poll_quorum = 1;
return 0;
}
poll_quorum = 0;
if (!check_fs_done(ls)) {
poll_fs = 1;
return 0;
}
poll_fs = 0;
return 1;
}
static int wait_messages_done(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
int need = 0, total = 0;
list_for_each_entry(memb, &cg->members, list) {
if (!memb->start)
need++;
total++;
}
if (need) {
log_group(ls, "wait_messages_done need %d of %d", need, total);
return 0;
}
log_group(ls, "wait_messages_done got all %d", total);
return 1;
}
static void cleanup_changes(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct change *safe;
list_del(&cg->list);
if (ls->started_change)
free_cg(ls->started_change);
ls->started_change = cg;
ls->started_count++;
if (!ls->started_count)
ls->started_count++;
cg->combined_seq = cg->seq; /* for queries */
list_for_each_entry_safe(cg, safe, &ls->changes, list) {
ls->started_change->combined_seq = cg->seq; /* for queries */
list_del(&cg->list);
free_cg(cg);
}
}
/* There's a stream of confchg and messages. At one of these
messages, the low node needs to store plocks and new nodes
need to begin saving plock messages. A second message is
needed to say that the plocks are ready to be read.
When the last start message is recvd for a change, the low node
stores plocks and the new nodes begin saving messages. When the
store is done, low node sends plocks_stored message. When
new nodes recv this, they read the plocks and their saved messages.
plocks_stored message should identify a specific change, like start
messages do; if it doesn't match ls->started_change, then it's ignored.
If a confchg adding a new node arrives after plocks are stored but
before plocks_stored msg recvd, then the message is ignored. The low
node will send another plocks_stored message for the latest change
(although it may be able to reuse the ckpt if no plock state has changed).
*/
static void set_plock_ckpt_node(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
int low = 0;
list_for_each_entry(memb, &cg->members, list) {
if (!(memb->start_flags & DLM_MFLG_HAVEPLOCK))
continue;
if (!low || memb->nodeid < low)
low = memb->nodeid;
}
log_group(ls, "set_plock_ckpt_node from %d to %d",
ls->plock_ckpt_node, low);
if (ls->plock_ckpt_node == our_nodeid && low != our_nodeid) {
/* Close ckpt so it will go away when the new ckpt_node
unlinks it prior to creating a new one; if we fail
our open ckpts are automatically closed. At this point
the ckpt has not been unlinked, but won't be held open by
anyone. We use the max "retentionDuration" to stop the
system from cleaning up ckpts that are open by no one. */
close_plock_checkpoint(ls);
}
ls->plock_ckpt_node = low;
}
static struct id_info *get_id_struct(struct id_info *ids, int count, int size,
int nodeid)
{
struct id_info *id = ids;
int i;
for (i = 0; i < count; i++) {
if (id->nodeid == nodeid)
return id;
id = (struct id_info *)((char *)id + size);
}
return NULL;
}
/* do the change details in the message match the details of the given change */
static int match_change(struct lockspace *ls, struct change *cg,
struct dlm_header *hd, struct ls_info *li,
struct id_info *ids)
{
struct id_info *id;
struct member *memb;
uint32_t seq = hd->msgdata;
int i, members_mismatch;
/* We can ignore messages if we're not in the list of members.
The one known time this will happen is after we've joined
the cpg, we can get messages for changes prior to the change
in which we're added. */
id = get_id_struct(ids, li->id_info_count, li->id_info_size,our_nodeid);
if (!id) {
- log_debug("match_change fail %d:%u we are not in members",
- hd->nodeid, seq);
+ log_group(ls, "match_change %d:%u skip %u we are not in members",
+ hd->nodeid, seq, cg->seq);
return 0;
}
memb = find_memb(cg, hd->nodeid);
if (!memb) {
- log_group(ls, "match_change fail %d:%u sender not member",
- hd->nodeid, seq);
+ log_group(ls, "match_change %d:%u skip %u sender not member",
+ hd->nodeid, seq, cg->seq);
return 0;
}
/* verify this is the right change by matching the counts
and the nodeids of the current members */
if (li->member_count != cg->member_count ||
li->joined_count != cg->joined_count ||
li->remove_count != cg->remove_count ||
li->failed_count != cg->failed_count) {
- log_group(ls, "match_change fail %d:%u expect counts "
- "%d %d %d %d", hd->nodeid, seq,
+ log_group(ls, "match_change %d:%u skip %u expect counts "
+ "%d %d %d %d", hd->nodeid, seq, cg->seq,
cg->member_count, cg->joined_count,
cg->remove_count, cg->failed_count);
return 0;
}
members_mismatch = 0;
id = ids;
for (i = 0; i < li->id_info_count; i++) {
memb = find_memb(cg, id->nodeid);
if (!memb) {
- log_group(ls, "match_change fail %d:%u no memb %d",
- hd->nodeid, seq, id->nodeid);
+ log_group(ls, "match_change %d:%u skip %u no memb %d",
+ hd->nodeid, seq, cg->seq, id->nodeid);
members_mismatch = 1;
break;
}
id = (struct id_info *)((char *)id + li->id_info_size);
}
if (members_mismatch)
return 0;
- log_group(ls, "match_change done %d:%u", hd->nodeid, seq);
+ log_group(ls, "match_change %d:%u matches cg %u", hd->nodeid, seq,
+ cg->seq);
return 1;
}
/* Unfortunately, there's no really simple way to match a message with the
specific change that it was sent for. We hope that by passing all the
details of the change in the message, we will be able to uniquely match the
it to the correct change. */
/* A start message will usually be for the first (current) change on our list.
In some cases it will be for a non-current change, and we can ignore it:
1. A,B,C get confchg1 adding C
2. C sends start for confchg1
3. A,B,C get confchg2 adding D
4. A,B,C,D recv start from C for confchg1 - ignored
5. C,D send start for confchg2
6. A,B send start for confchg2
7. A,B,C,D recv all start messages for confchg2, and start kernel
In step 4, how do the nodes know whether the start message from C is
for confchg1 or confchg2? Hopefully by comparing the counts and members. */
static struct change *find_change(struct lockspace *ls, struct dlm_header *hd,
struct ls_info *li, struct id_info *ids)
{
struct change *cg;
list_for_each_entry_reverse(cg, &ls->changes, list) {
if (!match_change(ls, cg, hd, li, ids))
continue;
return cg;
}
log_group(ls, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
return NULL;
}
static int is_added(struct lockspace *ls, int nodeid)
{
struct change *cg;
struct member *memb;
list_for_each_entry(cg, &ls->changes, list) {
memb = find_memb(cg, nodeid);
if (memb && memb->added)
return 1;
}
return 0;
}
static void receive_start(struct lockspace *ls, struct dlm_header *hd, int len)
{
struct change *cg;
struct member *memb;
struct ls_info *li;
struct id_info *ids;
uint32_t seq = hd->msgdata;
int added;
log_group(ls, "receive_start %d:%u len %d", hd->nodeid, seq, len);
li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
ls_info_in(li);
ids_in(li, ids);
cg = find_change(ls, hd, li, ids);
if (!cg)
return;
memb = find_memb(cg, hd->nodeid);
if (!memb) {
/* this should never happen since match_change checks it */
log_error("receive_start no member %d", hd->nodeid);
return;
}
memb->start_flags = hd->flags;
added = is_added(ls, hd->nodeid);
if (added && li->started_count) {
log_error("receive_start %d:%u add node with started_count %u",
hd->nodeid, seq, li->started_count);
- /* observe this scheme working before using it; I'm not sure
- that a joining node won't ever see an existing node as added
- under normal circumstances */
- /*
+ /* see comment in fence/fenced/cpg.c */
memb->disallowed = 1;
return;
- */
}
node_history_start(ls, hd->nodeid);
memb->start = 1;
}
static void receive_plocks_stored(struct lockspace *ls, struct dlm_header *hd,
int len)
{
struct ls_info *li;
struct id_info *ids;
log_group(ls, "receive_plocks_stored %d:%u need_plocks %d",
hd->nodeid, hd->msgdata, ls->need_plocks);
if (!ls->need_plocks)
return;
/* a confchg arrived between the last start and the plocks_stored msg,
so we ignore this plocks_stored msg and wait to read the ckpt until
the next plocks_stored msg following the current start */
if (!list_empty(&ls->changes) || !ls->started_change) {
log_group(ls, "receive_plocks_stored %d:%u ignore",
hd->nodeid, hd->msgdata);
return;
}
li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
ls_info_in(li);
ids_in(li, ids);
if (!match_change(ls, ls->started_change, hd, li, ids)) {
log_group(ls, "receive_plocks_stored %d:%u ignore no match",
hd->nodeid, hd->msgdata);
return;
}
retrieve_plocks(ls);
process_saved_plocks(ls);
ls->need_plocks = 0;
ls->save_plocks = 0;
}
static void send_info(struct lockspace *ls, int type)
{
struct change *cg;
struct dlm_header *hd;
struct ls_info *li;
struct id_info *id;
struct member *memb;
char *buf;
int len, id_count;
cg = list_first_entry(&ls->changes, struct change, list);
id_count = cg->member_count;
len = sizeof(struct dlm_header) + sizeof(struct ls_info) +
id_count * sizeof(struct id_info);
buf = malloc(len);
if (!buf) {
log_error("send_info len %d no mem", len);
return;
}
memset(buf, 0, len);
hd = (struct dlm_header *)buf;
li = (struct ls_info *)(buf + sizeof(*hd));
id = (struct id_info *)(buf + sizeof(*hd) + sizeof(*li));
/* fill in header (dlm_send_message handles part of header) */
hd->type = type;
hd->msgdata = cg->seq;
if (ls->joining)
hd->flags |= DLM_MFLG_JOINING;
if (!ls->need_plocks)
hd->flags |= DLM_MFLG_HAVEPLOCK;
/* fill in ls_info */
li->ls_info_size = cpu_to_le32(sizeof(struct ls_info));
li->id_info_size = cpu_to_le32(sizeof(struct id_info));
li->id_info_count = cpu_to_le32(id_count);
li->started_count = cpu_to_le32(ls->started_count);
li->member_count = cpu_to_le32(cg->member_count);
li->joined_count = cpu_to_le32(cg->joined_count);
li->remove_count = cpu_to_le32(cg->remove_count);
li->failed_count = cpu_to_le32(cg->failed_count);
/* fill in id_info entries */
list_for_each_entry(memb, &cg->members, list) {
id->nodeid = cpu_to_le32(memb->nodeid);
id++;
}
- log_group(ls, "send_%s %u flags %x counts %u %d %d %d %d",
+ log_group(ls, "send_%s cg %u flags %x counts %u %d %d %d %d",
type == DLM_MSG_START ? "start" : "plocks_stored",
cg->seq, hd->flags, ls->started_count, cg->member_count,
cg->joined_count, cg->remove_count, cg->failed_count);
dlm_send_message(ls, buf, len);
free(buf);
}
static void send_start(struct lockspace *ls)
{
send_info(ls, DLM_MSG_START);
}
static void send_plocks_stored(struct lockspace *ls)
{
send_info(ls, DLM_MSG_PLOCKS_STORED);
}
static int nodes_added(struct lockspace *ls)
{
struct change *cg;
list_for_each_entry(cg, &ls->changes, list) {
if (cg->joined_count)
return 1;
}
return 0;
}
static void prepare_plocks(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
if (!cfgd_enable_plock)
return;
/* if we're the only node in the lockspace, then we are the ckpt_node
and we don't need plocks */
if (cg->member_count == 1) {
list_for_each_entry(memb, &cg->members, list) {
if (memb->nodeid != our_nodeid) {
log_error("prepare_plocks other member %d",
memb->nodeid);
}
}
ls->plock_ckpt_node = our_nodeid;
ls->need_plocks = 0;
return;
}
/* the low node that indicated it had plock state in its last
start message is the ckpt_node */
set_plock_ckpt_node(ls);
/* We save all plock messages from the time that the low node saves
existing plock state in the ckpt to the time that we read that state
from the ckpt. */
if (ls->need_plocks) {
ls->save_plocks = 1;
return;
}
if (ls->plock_ckpt_node != our_nodeid)
return;
/* At each start, a ckpt is written if there have been nodes added
since the last start/ckpt. If no nodes have been added, no one
does anything with ckpts. If the node that wrote the last ckpt
is no longer the ckpt_node, the new ckpt_node will unlink and
write a new one. If the node that wrote the last ckpt is still
the ckpt_node and no plock state has changed since the last ckpt,
it will just leave the old ckpt and not write a new one.
A new ckpt_node will send a stored message even if it doesn't
write a ckpt because new nodes in the previous start may be
waiting to read the ckpt from the previous ckpt_node after ignoring
the previous stored message. They will read the ckpt from the
previous ckpt_node upon receiving the stored message from us. */
if (nodes_added(ls))
store_plocks(ls);
send_plocks_stored(ls);
}
static void apply_changes(struct lockspace *ls)
{
struct change *cg;
if (list_empty(&ls->changes))
return;
cg = list_first_entry(&ls->changes, struct change, list);
switch (cg->state) {
case CGST_WAIT_CONDITIONS:
if (wait_conditions_done(ls)) {
send_start(ls);
cg->state = CGST_WAIT_MESSAGES;
}
break;
case CGST_WAIT_MESSAGES:
if (wait_messages_done(ls)) {
start_kernel(ls);
prepare_plocks(ls);
cleanup_changes(ls);
}
break;
default:
log_error("apply_changes invalid state %d", cg->state);
}
}
void process_lockspace_changes(void)
{
struct lockspace *ls, *safe;
list_for_each_entry_safe(ls, safe, &lockspaces, list) {
if (!list_empty(&ls->changes))
apply_changes(ls);
}
}
static int add_change(struct lockspace *ls,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries,
struct change **cg_out)
{
struct change *cg;
struct member *memb;
int i, error;
cg = malloc(sizeof(struct change));
if (!cg)
goto fail_nomem;
memset(cg, 0, sizeof(struct change));
INIT_LIST_HEAD(&cg->members);
INIT_LIST_HEAD(&cg->removed);
cg->state = CGST_WAIT_CONDITIONS;
cg->seq = ++ls->change_seq;
if (!cg->seq)
cg->seq = ++ls->change_seq;
cg->member_count = member_list_entries;
cg->joined_count = joined_list_entries;
cg->remove_count = left_list_entries;
for (i = 0; i < member_list_entries; i++) {
memb = malloc(sizeof(struct member));
if (!memb)
goto fail_nomem;
memset(memb, 0, sizeof(struct member));
memb->nodeid = member_list[i].nodeid;
list_add_tail(&memb->list, &cg->members);
}
for (i = 0; i < left_list_entries; i++) {
memb = malloc(sizeof(struct member));
if (!memb)
goto fail_nomem;
memset(memb, 0, sizeof(struct member));
memb->nodeid = left_list[i].nodeid;
if (left_list[i].reason == CPG_REASON_NODEDOWN ||
left_list[i].reason == CPG_REASON_PROCDOWN) {
memb->failed = 1;
cg->failed_count++;
}
list_add_tail(&memb->list, &cg->removed);
if (memb->failed)
node_history_fail(ls, memb->nodeid, cg,
left_list[i].reason);
else
node_history_left(ls, memb->nodeid, cg);
- log_group(ls, "add_change %u nodeid %d remove reason %d",
+ log_group(ls, "add_change cg %u remove nodeid %d reason %d",
cg->seq, memb->nodeid, left_list[i].reason);
if (left_list[i].reason == CPG_REASON_PROCDOWN)
kick_node_from_cluster(memb->nodeid);
}
for (i = 0; i < joined_list_entries; i++) {
memb = find_memb(cg, joined_list[i].nodeid);
if (!memb) {
log_error("no member %d", joined_list[i].nodeid);
error = -ENOENT;
goto fail;
}
memb->added = 1;
if (memb->nodeid == our_nodeid)
cg->we_joined = 1;
else
node_history_init(ls, memb->nodeid, cg);
- log_group(ls, "add_change %u nodeid %d joined", cg->seq,
+ log_group(ls, "add_change cg %u joined nodeid %d", cg->seq,
memb->nodeid);
}
if (cg->we_joined) {
- log_group(ls, "add_change %u we joined", cg->seq);
+ log_group(ls, "add_change cg %u we joined", cg->seq);
list_for_each_entry(memb, &cg->members, list)
node_history_init(ls, memb->nodeid, cg);
}
- log_group(ls, "add_change %u member %d joined %d remove %d failed %d",
- cg->seq, cg->member_count, cg->joined_count, cg->remove_count,
- cg->failed_count);
+ log_group(ls, "add_change cg %u counts member %d joined %d remove %d "
+ "failed %d", cg->seq, cg->member_count, cg->joined_count,
+ cg->remove_count, cg->failed_count);
list_add(&cg->list, &ls->changes);
*cg_out = cg;
return 0;
fail_nomem:
log_error("no memory");
error = -ENOMEM;
fail:
free_cg(cg);
return error;
}
static int we_left(struct cpg_address *left_list, int left_list_entries)
{
int i;
for (i = 0; i < left_list_entries; i++) {
if (left_list[i].nodeid == our_nodeid)
return 1;
}
return 0;
}
static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
struct lockspace *ls;
struct change *cg;
struct member *memb;
int rv;
ls = find_ls_handle(handle);
if (!ls) {
log_error("confchg_cb no lockspace for cpg %s",
group_name->value);
return;
}
if (ls->leaving && we_left(left_list, left_list_entries)) {
/* we called cpg_leave(), and this should be the final
cpg callback we receive */
log_group(ls, "confchg for our leave");
stop_kernel(ls, 0);
set_configfs_members(ls->name, 0, NULL, 0, NULL);
set_sysfs_event_done(ls->name, 0);
cpg_finalize(ls->cpg_handle);
client_dead(ls->cpg_client);
purge_plocks(ls, our_nodeid, 1);
list_del(&ls->list);
free_ls(ls);
return;
}
rv = add_change(ls, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, &cg);
if (rv)
return;
stop_kernel(ls, cg->seq);
list_for_each_entry(memb, &cg->removed, list)
purge_plocks(ls, memb->nodeid, 0);
#if 0
/* deadlock code needs to adjust per a confchg, is this the right
way/place for this? */
deadlk_confchg(ls, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries);
#endif
}
static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *data, int len)
{
struct lockspace *ls;
struct dlm_header *hd;
ls = find_ls_handle(handle);
if (!ls) {
log_error("deliver_cb no ls for cpg %s", group_name->value);
return;
}
hd = (struct dlm_header *)data;
hd->version[0] = le16_to_cpu(hd->version[0]);
hd->version[1] = le16_to_cpu(hd->version[1]);
hd->version[2] = le16_to_cpu(hd->version[2]);
hd->type = le16_to_cpu(hd->type);
hd->nodeid = le32_to_cpu(hd->nodeid);
hd->to_nodeid = le32_to_cpu(hd->to_nodeid);
hd->global_id = le32_to_cpu(hd->global_id);
hd->flags = le32_to_cpu(hd->flags);
hd->msgdata = le32_to_cpu(hd->msgdata);
if (hd->version[0] != protocol_active[0]) {
log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
nodeid, hd->version[0], hd->version[1],
hd->version[2], protocol_active[0],
protocol_active[1], protocol_active[2]);
return;
}
if (hd->nodeid != nodeid) {
log_error("bad msg nodeid %d %d", hd->nodeid, nodeid);
return;
}
switch (hd->type) {
case DLM_MSG_START:
receive_start(ls, hd, len);
break;
case DLM_MSG_PLOCK:
receive_plock(ls, hd, len);
break;
case DLM_MSG_PLOCK_OWN:
receive_own(ls, hd, len);
break;
case DLM_MSG_PLOCK_DROP:
receive_drop(ls, hd, len);
break;
case DLM_MSG_PLOCK_SYNC_LOCK:
case DLM_MSG_PLOCK_SYNC_WAITER:
receive_sync(ls, hd, len);
break;
case DLM_MSG_PLOCKS_STORED:
receive_plocks_stored(ls, hd, len);
break;
case DLM_MSG_DEADLK_CYCLE_START:
receive_cycle_start(ls, hd, len);
break;
case DLM_MSG_DEADLK_CYCLE_END:
receive_cycle_end(ls, hd, len);
break;
case DLM_MSG_DEADLK_CHECKPOINT_READY:
receive_checkpoint_ready(ls, hd, len);
break;
case DLM_MSG_DEADLK_CANCEL_LOCK:
receive_cancel_lock(ls, hd, len);
break;
default:
log_error("unknown msg type %d", hd->type);
}
}
static cpg_callbacks_t cpg_callbacks = {
.cpg_deliver_fn = deliver_cb,
.cpg_confchg_fn = confchg_cb,
};
void update_flow_control_status(void)
{
cpg_flow_control_state_t flow_control_state;
cpg_error_t error;
error = cpg_flow_control_state_get(daemon_handle, &flow_control_state);
if (error != CPG_OK) {
log_error("cpg_flow_control_state_get %d", error);
return;
}
if (flow_control_state == CPG_FLOW_CONTROL_ENABLED) {
if (message_flow_control_on == 0) {
log_debug("flow control on");
}
message_flow_control_on = 1;
} else {
if (message_flow_control_on) {
log_debug("flow control off");
}
message_flow_control_on = 0;
}
}
static void process_lockspace_cpg(int ci)
{
struct lockspace *ls;
cpg_error_t error;
ls = find_ls_ci(ci);
if (!ls) {
log_error("process_lockspace_cpg no lockspace for ci %d", ci);
return;
}
error = cpg_dispatch(ls->cpg_handle, CPG_DISPATCH_ALL);
if (error != CPG_OK) {
log_error("cpg_dispatch error %d", error);
return;
}
apply_changes(ls);
update_flow_control_status();
}
/* received an "online" uevent from dlm-kernel */
int dlm_join_lockspace(struct lockspace *ls)
{
cpg_error_t error;
cpg_handle_t h;
struct cpg_name name;
int i = 0, fd, ci;
error = cpg_initialize(&h, &cpg_callbacks);
if (error != CPG_OK) {
log_error("cpg_initialize error %d", error);
goto fail_free;
}
cpg_fd_get(h, &fd);
ci = client_add(fd, process_lockspace_cpg, NULL);
list_add(&ls->list, &lockspaces);
ls->cpg_handle = h;
ls->cpg_client = ci;
ls->cpg_fd = fd;
ls->kernel_stopped = 1;
ls->need_plocks = 1;
ls->joining = 1;
memset(&name, 0, sizeof(name));
sprintf(name.value, "dlm:%s", ls->name);
name.length = strlen(name.value) + 1;
/* TODO: allow global_id to be set in cluster.conf? */
ls->global_id = cpgname_to_crc(name.value, name.length);
retry:
error = cpg_join(h, &name);
if (error == CPG_ERR_TRY_AGAIN) {
sleep(1);
if (!(++i % 10))
log_error("cpg_join error retrying");
goto retry;
}
if (error != CPG_OK) {
log_error("cpg_join error %d", error);
cpg_finalize(h);
goto fail;
}
return 0;
fail:
list_del(&ls->list);
client_dead(ci);
cpg_finalize(h);
fail_free:
free_ls(ls);
return error;
}
/* received an "offline" uevent from dlm-kernel */
int dlm_leave_lockspace(struct lockspace *ls)
{
cpg_error_t error;
struct cpg_name name;
int i = 0;
ls->leaving = 1;
memset(&name, 0, sizeof(name));
sprintf(name.value, "dlm:%s", ls->name);
name.length = strlen(name.value) + 1;
retry:
error = cpg_leave(ls->cpg_handle, &name);
if (error == CPG_ERR_TRY_AGAIN) {
sleep(1);
if (!(++i % 10))
log_error("cpg_leave error retrying");
goto retry;
}
if (error != CPG_OK)
log_error("cpg_leave error %d", error);
return 0;
}
int setup_cpg(void)
{
cpg_error_t error;
error = cpg_initialize(&daemon_handle, &cpg_callbacks);
if (error != CPG_OK) {
log_error("setup_cpg cpg_initialize error %d", error);
return -1;
}
/* join "dlm_controld" cpg to interact with other daemons in
the cluster before we start processing uevents? Could this
also help in handling transient partitions? */
return 0;
}
/* fs_controld has seen nodedown for nodeid; it's now ok for dlm to do
recovery for the failed node */
int set_fs_notified(struct lockspace *ls, int nodeid)
{
struct node *node;
/* this shouldn't happen */
node = get_node_history(ls, nodeid);
if (!node) {
log_error("set_fs_notified no nodeid %d", nodeid);
return -ESRCH;
}
/* this can happen, we haven't seen a nodedown for this node yet,
but we should soon */
if (!node->check_fs) {
log_group(ls, "set_fs_notified %d zero check_fs", nodeid);
return -EAGAIN;
}
node->fs_notified = 1;
return 0;
}
int set_lockspace_info(struct lockspace *ls, struct dlmc_lockspace *lockspace)
{
struct change *cg, *last = NULL;
strncpy(lockspace->name, ls->name, DLM_LOCKSPACE_LEN);
lockspace->global_id = ls->global_id;
if (ls->joining)
lockspace->flags |= DLMC_LF_JOINING;
if (ls->leaving)
lockspace->flags |= DLMC_LF_LEAVING;
if (ls->kernel_stopped)
lockspace->flags |= DLMC_LF_KERNEL_STOPPED;
if (ls->fs_registered)
lockspace->flags |= DLMC_LF_FS_REGISTERED;
if (ls->need_plocks)
lockspace->flags |= DLMC_LF_NEED_PLOCKS;
if (ls->save_plocks)
lockspace->flags |= DLMC_LF_SAVE_PLOCKS;
if (!ls->started_change)
goto next;
cg = ls->started_change;
lockspace->cg_prev.member_count = cg->member_count;
lockspace->cg_prev.joined_count = cg->joined_count;
lockspace->cg_prev.remove_count = cg->remove_count;
lockspace->cg_prev.failed_count = cg->failed_count;
lockspace->cg_prev.combined_seq = cg->combined_seq;
lockspace->cg_prev.seq = cg->seq;
next:
if (list_empty(&ls->changes))
goto out;
list_for_each_entry(cg, &ls->changes, list)
last = cg;
cg = list_first_entry(&ls->changes, struct change, list);
lockspace->cg_next.member_count = cg->member_count;
lockspace->cg_next.joined_count = cg->joined_count;
lockspace->cg_next.remove_count = cg->remove_count;
lockspace->cg_next.failed_count = cg->failed_count;
lockspace->cg_next.combined_seq = last->seq;
lockspace->cg_next.seq = cg->seq;
if (cg->state == CGST_WAIT_CONDITIONS)
lockspace->cg_next.wait_condition = 4;
if (poll_fencing)
lockspace->cg_next.wait_condition = 1;
else if (poll_quorum)
lockspace->cg_next.wait_condition = 2;
else if (poll_fs)
lockspace->cg_next.wait_condition = 3;
if (cg->state == CGST_WAIT_MESSAGES)
lockspace->cg_next.wait_messages = 1;
out:
return 0;
}
static int _set_node_info(struct lockspace *ls, struct change *cg, int nodeid,
struct dlmc_node *node)
{
struct member *m = NULL;
struct node *n;
node->nodeid = nodeid;
if (cg)
m = find_memb(cg, nodeid);
if (!m)
goto history;
node->flags |= DLMC_NF_MEMBER;
if (m->start)
node->flags |= DLMC_NF_START;
if (m->disallowed)
node->flags |= DLMC_NF_DISALLOWED;
history:
n = get_node_history(ls, nodeid);
if (!n)
goto out;
if (n->check_fencing)
node->flags |= DLMC_NF_CHECK_FENCING;
if (n->check_quorum)
node->flags |= DLMC_NF_CHECK_QUORUM;
if (n->check_fs)
node->flags |= DLMC_NF_CHECK_FS;
if (n->fs_notified)
node->flags |= DLMC_NF_FS_NOTIFIED;
node->added_seq = n->added_seq;
node->removed_seq = n->removed_seq;
node->failed_reason = n->failed_reason;
out:
return 0;
}
int set_node_info(struct lockspace *ls, int nodeid, struct dlmc_node *node)
{
struct change *cg;
if (!list_empty(&ls->changes)) {
cg = list_first_entry(&ls->changes, struct change, list);
return _set_node_info(ls, cg, nodeid, node);
}
return _set_node_info(ls, ls->started_change, nodeid, node);
}
int set_lockspaces(int *count, struct dlmc_lockspace **lss_out)
{
struct lockspace *ls;
struct dlmc_lockspace *lss, *lsp;
int ls_count = 0;
list_for_each_entry(ls, &lockspaces, list)
ls_count++;
lss = malloc(ls_count * sizeof(struct dlmc_lockspace));
if (!lss)
return -ENOMEM;
memset(lss, 0, ls_count * sizeof(struct dlmc_lockspace));
lsp = lss;
list_for_each_entry(ls, &lockspaces, list) {
set_lockspace_info(ls, lsp++);
}
*count = ls_count;
*lss_out = lss;
return 0;
}
int set_lockspace_nodes(struct lockspace *ls, int option, int *node_count,
struct dlmc_node **nodes_out)
{
struct change *cg;
struct node *n;
struct dlmc_node *nodes = NULL, *nodep;
struct member *memb;
int count = 0;
if (option == DLMC_NODES_ALL) {
if (!list_empty(&ls->changes))
cg = list_first_entry(&ls->changes, struct change,list);
else
cg = ls->started_change;
list_for_each_entry(n, &ls->node_history, list)
count++;
} else if (option == DLMC_NODES_MEMBERS) {
if (!ls->started_change)
goto out;
cg = ls->started_change;
count = cg->member_count;
} else if (option == DLMC_NODES_NEXT) {
if (list_empty(&ls->changes))
goto out;
cg = list_first_entry(&ls->changes, struct change, list);
count = cg->member_count;
} else
goto out;
nodes = malloc(count * sizeof(struct dlmc_node));
if (!nodes)
return -ENOMEM;
memset(nodes, 0, count * sizeof(struct dlmc_node));
nodep = nodes;
if (option == DLMC_NODES_ALL) {
list_for_each_entry(n, &ls->node_history, list)
_set_node_info(ls, cg, n->nodeid, nodep++);
} else {
list_for_each_entry(memb, &cg->members, list)
_set_node_info(ls, cg, memb->nodeid, nodep++);
}
out:
*node_count = count;
*nodes_out = nodes;
return 0;
}
diff --git a/group/gfs_controld/cpg-new.c b/group/gfs_controld/cpg-new.c
index 579de1e58..fc9a6c237 100644
--- a/group/gfs_controld/cpg-new.c
+++ b/group/gfs_controld/cpg-new.c
@@ -1,2636 +1,2634 @@
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2008 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
#include "gfs_daemon.h"
#include "config.h"
#include "libdlmcontrol.h"
#define MAX_JOURNALS 256
uint32_t cpgname_to_crc(const char *data, int len);
static unsigned int protocol_active[3] = {1, 0, 0};
static int dlmcontrol_fd;
/* gfs_header types */
enum {
GFS_MSG_START = 1,
GFS_MSG_MOUNT_DONE = 2,
GFS_MSG_FIRST_RECOVERY_DONE = 3,
GFS_MSG_RECOVERY_RESULT = 4,
};
/* gfs_header flags */
#define GFS_MFLG_JOINING 1 /* accompanies start, we are joining */
struct gfs_header {
uint16_t version[3];
uint16_t type; /* GFS_MSG_ */
uint32_t nodeid; /* sender */
uint32_t to_nodeid; /* recipient, 0 for all */
uint32_t global_id; /* global unique id for this lockspace */
uint32_t flags; /* GFS_MFLG_ */
uint32_t msgdata; /* in-header payload depends on MSG type */
uint32_t pad1;
uint64_t pad2;
};
/* mg_info and id_info: for syncing state in start message */
struct mg_info {
uint32_t mg_info_size;
uint32_t id_info_size;
uint32_t id_info_count;
uint32_t started_count;
int member_count;
int joined_count;
int remove_count;
int failed_count;
int first_recovery_needed;
int first_recovery_master;
};
#define IDI_NODEID_IS_MEMBER 0x00000001
#define IDI_JID_NEEDS_RECOVERY 0x00000002
#define IDI_MOUNT_DONE 0x00000008
#define IDI_MOUNT_ERROR 0x00000010
#define IDI_MOUNT_RO 0x00000020
#define IDI_MOUNT_SPECTATOR 0x00000040
struct id_info {
int nodeid;
int jid;
uint32_t flags;
};
#define JID_NONE -1
struct journal {
struct list_head list;
int jid;
int nodeid;
int failed_nodeid;
int needs_recovery;
int local_recovery_busy;
int local_recovery_done;
int local_recovery_result;
int failed_recovery_count;
};
struct node {
struct list_head list;
int nodeid;
int jid;
int ro;
int spectator;
int kernel_mount_done;
int kernel_mount_error;
int check_dlm;
int dlm_notify_callback;
int dlm_notify_result;
int failed_reason;
uint32_t added_seq;
uint32_t removed_seq;
uint64_t add_time;
};
struct member {
struct list_head list;
int nodeid;
int start; /* 1 if we received a start message for this change */
int added; /* 1 if added by this change */
int failed; /* 1 if failed in this change */
int disallowed;
char *start_msg; /* full copy of the start message from this node */
struct mg_info *mg_info; /* shortcut into started_msg */
};
/* One of these change structs is created for every confchg a cpg gets. */
#define CGST_WAIT_CONDITIONS 1
#define CGST_WAIT_MESSAGES 2
struct change {
struct list_head list;
struct list_head members;
struct list_head removed; /* nodes removed by this change */
struct list_head saved_messages; /* saved messages */
int member_count;
int joined_count;
int remove_count;
int failed_count;
int state;
int we_joined;
uint32_t seq; /* used as a reference for debugging, and for queries */
uint32_t combined_seq; /* for queries */
};
struct save_msg {
struct list_head list;
int len;
char buf[0];
};
/*
cpg confchg's arrive telling us that mountgroup members have
joined/left/failed. A "change" struct is created for each confchg,
and added to the mg->changes list.
apply_changes()
---------------
<a new node won't know whether first_recovery_needed or not, but it also
won't have any conditions to wait for, so a new node will go directly to
sending out start message regardless>
if first_recovery_needed,
(or new, where new is not having completed a start barrier yet)
all nodes: skip wait conditions
all nodes: send start message
else !first_recovery_needed,
all nodes: if failures in changes, wait for conditions:
local mount to complete if in progress, stop_kernel, dlm_notified
all nodes: send start message
<new changes that arrive result in going back to beginning; start messages
from this aborted start cycle will be ignored>
all nodes: wait for all start messages
<once all start messages are received, new changes will be handled in a
new batch after all current changes are cleared at end of sync_state>
if start cycle / start barrier completes (start messages received from
all nodes without being interrupted by a change), go on to sync_state
which puts all members (as defined by the most recent change) in sync.
"old nodes" are nodes that have completed a start cycle before (have
a non-zero started_count), and "new nodes" are nodes that have not
completed a start cycle before (they are being added by one of the
changes in this start cycle)
sync_state()
------------
if old nodes have first_recovery_needed, or all nodes are new
all nodes: mg->first_recovery_needed = 1
all nodes: mg->first_recovery_master = prev or new low nodeid
new nodes: instantiate existing state to match old nodes
old nodes: update state per the changes in the completed start cycle
all nodes: assign jids to new members
all nodes: clear all change structs
else !first_recovery_needed,
new nodes: instantiate existing state to match old nodes
old nodes: update state per the changes in the completed start cycle
all nodes: assign jids to new members
all nodes: clear all change structs
<new changes that arrive from here on result in going back to the top>
recover_and_start()
-------------------
if first_recovery_needed,
master: tells mount to run with first=1 (if not already)
all nodes: wait for first_recovery_done message
master: sends first_recovery_done message when mount is done
all nodes: mg->first_recovery_needed = 0
all nodes: start kernel / tell mount.gfs to mount(2) (master already did)
all nodes: send message with result of kernel mount
else !first_recovery_needed,
all nodes: if there are no journals to recover, goto start kernel
old nodes: tell kernel to recover jids, send message with each result
all nodes: wait for all recoveries to be done
all nodes: start kernel
new nodes: tell mount.gfs to mount(2)
new nodes: send message with result of kernel mount
[If no one can recover some journal(s), all will be left waiting, unstarted.
A new change from a new mount will result in things going back to the top,
and hopefully the new node will be successful at doing the journal
recoveries when it comes through the recover_and_start() section, which
would let everyone start again.]
*/
static void process_mountgroup(struct mountgroup *mg);
static char *msg_name(int type)
{
switch (type) {
case GFS_MSG_START:
return "start";
case GFS_MSG_MOUNT_DONE:
return "mount_done";
case GFS_MSG_FIRST_RECOVERY_DONE:
return "first_recovery_done";
case GFS_MSG_RECOVERY_RESULT:
return "recovery_result";
default:
return "unknown";
}
}
static int _send_message(cpg_handle_t h, void *buf, int len, int type)
{
struct iovec iov;
cpg_error_t error;
int retries = 0;
iov.iov_base = buf;
iov.iov_len = len;
retry:
error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
if (error == CPG_ERR_TRY_AGAIN) {
retries++;
usleep(1000);
if (!(retries % 100))
log_error("cpg_mcast_joined retry %d %s",
retries, msg_name(type));
goto retry;
}
if (error != CPG_OK) {
log_error("cpg_mcast_joined error %d handle %llx %s",
error, (unsigned long long)h, msg_name(type));
return -1;
}
if (retries)
log_debug("cpg_mcast_joined retried %d %s",
retries, msg_name(type));
return 0;
}
/* header fields caller needs to set: type, to_nodeid, flags, msgdata */
static void gfs_send_message(struct mountgroup *mg, char *buf, int len)
{
struct gfs_header *hd = (struct gfs_header *) buf;
int type = hd->type;
hd->version[0] = cpu_to_le16(protocol_active[0]);
hd->version[1] = cpu_to_le16(protocol_active[1]);
hd->version[2] = cpu_to_le16(protocol_active[2]);
hd->type = cpu_to_le16(hd->type);
hd->nodeid = cpu_to_le32(our_nodeid);
hd->to_nodeid = cpu_to_le32(hd->to_nodeid);
hd->global_id = cpu_to_le32(mg->id);
hd->flags = cpu_to_le32(hd->flags);
hd->msgdata = cpu_to_le32(hd->msgdata);
_send_message(mg->cpg_handle, buf, len, type);
}
static struct member *find_memb(struct change *cg, int nodeid)
{
struct member *memb;
list_for_each_entry(memb, &cg->members, list) {
if (memb->nodeid == nodeid)
return memb;
}
return NULL;
}
static struct mountgroup *find_mg_handle(cpg_handle_t h)
{
struct mountgroup *mg;
list_for_each_entry(mg, &mountgroups, list) {
if (mg->cpg_handle == h)
return mg;
}
return NULL;
}
static struct mountgroup *find_mg_ci(int ci)
{
struct mountgroup *mg;
list_for_each_entry(mg, &mountgroups, list) {
if (mg->cpg_client == ci)
return mg;
}
return NULL;
}
static struct journal *find_journal(struct mountgroup *mg, int jid)
{
struct journal *j;
list_for_each_entry(j, &mg->journals, list) {
if (j->jid == jid)
return j;
}
return NULL;
}
static struct journal *find_journal_by_nodeid(struct mountgroup *mg, int nodeid)
{
struct journal *j;
list_for_each_entry(j, &mg->journals, list) {
if (j->nodeid == nodeid)
return j;
}
return NULL;
}
static void free_cg(struct change *cg)
{
struct member *memb, *safe;
struct save_msg *sm, *sm2;
list_for_each_entry_safe(memb, safe, &cg->members, list) {
list_del(&memb->list);
if (memb->start_msg)
free(memb->start_msg);
free(memb);
}
list_for_each_entry_safe(memb, safe, &cg->removed, list) {
list_del(&memb->list);
if (memb->start_msg)
free(memb->start_msg);
free(memb);
}
list_for_each_entry_safe(sm, sm2, &cg->saved_messages, list) {
list_del(&sm->list);
free(sm);
}
free(cg);
}
static void free_mg(struct mountgroup *mg)
{
struct change *cg, *cg_safe;
struct node *node, *node_safe;
list_for_each_entry_safe(cg, cg_safe, &mg->changes, list) {
list_del(&cg->list);
free_cg(cg);
}
if (mg->started_change)
free_cg(mg->started_change);
list_for_each_entry_safe(node, node_safe, &mg->node_history, list) {
list_del(&node->list);
free(node);
}
free(mg);
}
static struct node *get_node_history(struct mountgroup *mg, int nodeid)
{
struct node *node;
list_for_each_entry(node, &mg->node_history, list) {
if (node->nodeid == nodeid)
return node;
}
return NULL;
}
static void node_history_init(struct mountgroup *mg, int nodeid,
struct change *cg)
{
struct node *node;
node = get_node_history(mg, nodeid);
if (node)
goto out;
node = malloc(sizeof(struct node));
if (!node)
return;
memset(node, 0, sizeof(struct node));
node->nodeid = nodeid;
node->add_time = 0;
list_add_tail(&node->list, &mg->node_history);
out:
node->added_seq = cg->seq; /* for queries */
}
static void node_history_start(struct mountgroup *mg, int nodeid)
{
struct node *node;
node = get_node_history(mg, nodeid);
if (!node) {
log_error("node_history_start no nodeid %d", nodeid);
return;
}
node->add_time = time(NULL);
}
static void node_history_left(struct mountgroup *mg, int nodeid,
struct change *cg)
{
struct node *node;
node = get_node_history(mg, nodeid);
if (!node) {
log_error("node_history_left no nodeid %d", nodeid);
return;
}
node->add_time = 0;
node->removed_seq = cg->seq; /* for queries */
}
static void node_history_fail(struct mountgroup *mg, int nodeid,
struct change *cg, int reason)
{
struct node *node;
node = get_node_history(mg, nodeid);
if (!node) {
log_error("node_history_fail no nodeid %d", nodeid);
return;
}
node->check_dlm = 1;
node->removed_seq = cg->seq; /* for queries */
node->failed_reason = reason; /* for queries */
}
static int is_added(struct mountgroup *mg, int nodeid)
{
struct change *cg;
struct member *memb;
list_for_each_entry(cg, &mg->changes, list) {
memb = find_memb(cg, nodeid);
if (memb && memb->added)
return 1;
}
return 0;
}
static int nodes_failed(struct mountgroup *mg)
{
struct change *cg;
list_for_each_entry(cg, &mg->changes, list) {
if (cg->failed_count)
return 1;
}
return 0;
}
/* find a start message from an old node to use; it doesn't matter which old
node we take the start message from, they should all be the same */
static int get_id_list(struct mountgroup *mg, struct id_info **ids,
int *count, int *size)
{
struct change *cg;
struct member *memb;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (!memb->mg_info->started_count)
continue;
*count = memb->mg_info->id_info_count;
*size = memb->mg_info->id_info_size;
*ids = (struct id_info *)(memb->start_msg +
sizeof(struct gfs_header) +
memb->mg_info->mg_info_size);
return 0;
}
return -1;
}
static struct id_info *get_id_struct(struct id_info *ids, int count, int size,
int nodeid)
{
struct id_info *id = ids;
int i;
for (i = 0; i < count; i++) {
if (id->nodeid == nodeid)
return id;
id = (struct id_info *)((char *)id + size);
}
return NULL;
}
static void start_kernel(struct mountgroup *mg)
{
struct change *cg = mg->started_change;
if (!mg->kernel_stopped) {
- log_error("start_kernel %u not stopped", cg->seq);
+ log_error("start_kernel cg %u not stopped", cg->seq);
return;
}
- log_group(mg, "start_kernel %u member_count %d",
+ log_group(mg, "start_kernel cg %u member_count %d",
cg->seq, cg->member_count);
set_sysfs(mg, "block", 0);
mg->kernel_stopped = 0;
if (mg->joining) {
client_reply_join_full(mg, 0);
mg->joining = 0;
mg->mount_client_notified = 1;
}
}
static void stop_kernel(struct mountgroup *mg)
{
if (!mg->kernel_stopped) {
log_group(mg, "stop_kernel");
set_sysfs(mg, "block", 1);
mg->kernel_stopped = 1;
}
}
void process_dlmcontrol(int ci)
{
struct mountgroup *mg;
struct node *node;
char name[GFS_MOUNTGROUP_LEN+1];
int rv, type, nodeid, result;
memset(name, 0, sizeof(name));
rv = dlmc_fs_result(dlmcontrol_fd, name, &type, &nodeid, &result);
if (rv) {
log_error("process_dlmcontrol dlmc_fs_result %d", rv);
return;
}
mg = find_mg(name);
if (!mg) {
log_error("process_dlmcontrol no mg %s", name);
return;
}
if (type == DLMC_RESULT_NOTIFIED) {
log_group(mg, "process_dlmcontrol notified nodeid %d result %d",
nodeid, result);
node = get_node_history(mg, nodeid);
if (!node) {
/* shouldn't happen */
log_error("process_dlmcontrol no nodeid %d", nodeid);
return;
}
if (mg->dlm_notify_nodeid != nodeid) {
/* shouldn't happen */
log_error("process_dlmcontrol node %d expected %d",
nodeid, mg->dlm_notify_nodeid);
return;
}
mg->dlm_notify_nodeid = 0;
node->dlm_notify_callback = 1;
node->dlm_notify_result = result;
} else if (type == DLMC_RESULT_REGISTER) {
log_group(mg, "process_dlmcontrol register nodeid %d result %d",
nodeid, result);
} else {
log_group(mg, "process_dlmcontrol unknown type %d", type);
}
poll_dlm = 0;
process_mountgroup(mg);
}
static int check_dlm_notify_done(struct mountgroup *mg)
{
struct node *node;
int rv;
/* we're waiting for a notify result from the dlm (could we fire off
all dlmc_fs_notified() calls at once instead of serially?) */
if (mg->dlm_notify_nodeid)
return 0;
list_for_each_entry(node, &mg->node_history, list) {
/* check_dlm is set when we see a node fail, and is cleared
below when we find that the dlm has also seen it fail */
if (!node->check_dlm)
continue;
/* we're in sync with the dlm for this nodeid, i.e. we've
both seen this node fail */
if (node->dlm_notify_callback && !node->dlm_notify_result) {
node->dlm_notify_callback = 0;
node->check_dlm = 0;
continue;
}
/* we're not in sync with the dlm for this nodeid, i.e.
the dlm hasn't seen this node fail yet; try calling
dlmc_fs_notified() again in a bit */
if (node->dlm_notify_callback && node->dlm_notify_result) {
log_group(mg, "check_dlm_notify result %d will retry nodeid %d",
node->dlm_notify_result, node->nodeid);
node->dlm_notify_callback = 0;
poll_dlm = 1;
return 0;
}
/* check if the dlm has seen this nodeid fail, we get the
answer asynchronously in process_dlmcontrol */
- log_group(mg, "check_dlm_notify %d begin", node->nodeid);
+ log_group(mg, "check_dlm_notify nodeid %d begin", node->nodeid);
rv = dlmc_fs_notified(dlmcontrol_fd, mg->name, node->nodeid);
if (rv) {
log_error("dlmc_fs_notified error %d", rv);
return 0;
}
mg->dlm_notify_nodeid = node->nodeid;
return 0;
}
log_group(mg, "check_dlm_notify done");
return 1;
}
static int wait_conditions_done(struct mountgroup *mg)
{
if (mg->first_recovery_needed) {
log_group(mg, "wait_conditions skip for first_recovery_needed");
return 1;
}
if (!mg->started_count) {
log_group(mg, "wait_conditions skip for zero started_count");
return 1;
}
if (!nodes_failed(mg)) {
log_group(mg, "wait_conditions skip for zero nodes_failed");
return 1;
}
if (!mg->mount_client_notified) {
log_group(mg, "wait_conditions skip mount client not notified");
return 1;
}
if (mg->kernel_mount_done && mg->kernel_mount_error) {
log_group(mg, "wait_conditions skip for kernel_mount_error");
return 1;
}
if (!mg->kernel_mount_done) {
log_group(mg, "wait_conditions need mount_done");
return 0;
}
stop_kernel(mg);
if (!check_dlm_notify_done(mg))
return 0;
return 1;
}
static int wait_messages_done(struct mountgroup *mg)
{
struct change *cg = list_first_entry(&mg->changes, struct change, list);
struct member *memb;
int need = 0, total = 0;
list_for_each_entry(memb, &cg->members, list) {
if (!memb->start)
need++;
total++;
}
if (need) {
log_group(mg, "wait_messages_done need %d of %d", need, total);
return 0;
}
log_group(mg, "wait_messages_done got all %d", total);
return 1;
}
static void cleanup_changes(struct mountgroup *mg)
{
struct change *cg = list_first_entry(&mg->changes, struct change, list);
struct change *safe;
list_del(&cg->list);
if (mg->started_change)
free_cg(mg->started_change);
mg->started_change = cg;
/* zero started_count means "never started" */
mg->started_count++;
if (!mg->started_count)
mg->started_count++;
cg->combined_seq = cg->seq; /* for queries */
list_for_each_entry_safe(cg, safe, &mg->changes, list) {
mg->started_change->combined_seq = cg->seq; /* for queries */
list_del(&cg->list);
free_cg(cg);
}
}
/* do the change details in the message match the details of the given change */
static int match_change(struct mountgroup *mg, struct change *cg,
struct gfs_header *hd, struct mg_info *mi,
struct id_info *ids)
{
struct id_info *id;
struct member *memb;
uint32_t seq = hd->msgdata;
int i, members_mismatch;
/* We can ignore messages if we're not in the list of members.
The one known time this will happen is after we've joined
the cpg, we can get messages for changes prior to the change
in which we're added. */
id = get_id_struct(ids, mi->id_info_count, mi->id_info_size,our_nodeid);
if (!id || !(id->flags & IDI_NODEID_IS_MEMBER)) {
- log_group(mg, "match_change fail %d:%u we are not in members",
- hd->nodeid, seq);
+ log_group(mg, "match_change %d:%u skip cg %u we are not in members",
+ hd->nodeid, seq, cg->seq);
return 0;
}
memb = find_memb(cg, hd->nodeid);
if (!memb) {
- log_group(mg, "match_change fail %d:%u sender not member",
- hd->nodeid, seq);
+ log_group(mg, "match_change %d:%u skip cg %u sender not member",
+ hd->nodeid, seq, cg->seq);
return 0;
}
/* verify this is the right change by matching the counts
and the nodeids of the current members */
if (mi->member_count != cg->member_count ||
mi->joined_count != cg->joined_count ||
mi->remove_count != cg->remove_count ||
mi->failed_count != cg->failed_count) {
- log_group(mg, "match_change fail %d:%u expect counts "
- "%d %d %d %d", hd->nodeid, seq,
+ log_group(mg, "match_change %d:%u skip cg %u expect counts "
+ "%d %d %d %d", hd->nodeid, seq, cg->seq,
cg->member_count, cg->joined_count,
cg->remove_count, cg->failed_count);
return 0;
}
members_mismatch = 0;
id = ids;
for (i = 0; i < mi->id_info_count; i++) {
if (id->flags & IDI_NODEID_IS_MEMBER) {
memb = find_memb(cg, id->nodeid);
if (!memb) {
- log_group(mg, "match_change fail %d:%u memb %d",
- hd->nodeid, seq, id->nodeid);
+ log_group(mg, "match_change %d:%u skip cg %u "
+ "no memb %d", hd->nodeid, seq,
+ cg->seq, id->nodeid);
members_mismatch = 1;
break;
}
}
id = (struct id_info *)((char *)id + mi->id_info_size);
}
if (members_mismatch)
return 0;
- log_group(mg, "match_change done %d:%u", hd->nodeid, seq);
+ log_group(mg, "match_change %d:%u matches cg %u", hd->nodeid, seq,
+ cg->seq);
return 1;
}
/* Unfortunately, there's no really simple way to match a message with the
specific change that it was sent for. We hope that by passing all the
details of the change in the message, we will be able to uniquely match
it to the correct change. */
/* A start message will usually be for the first (current) change on our list.
In some cases it will be for a non-current change, and we can ignore it:
1. A,B,C get confchg1 adding C
2. C sends start for confchg1
3. A,B,C get confchg2 adding D
4. A,B,C,D recv start from C for confchg1 - ignored
5. C,D send start for confchg2
6. A,B send start for confchg2
7. A,B,C,D recv all start messages for confchg2; start barrier/cycle done
In step 4, how do the nodes know whether the start message from C is
for confchg1 or confchg2? Hopefully by comparing the counts and members. */
static struct change *find_change(struct mountgroup *mg, struct gfs_header *hd,
struct mg_info *mi, struct id_info *ids)
{
struct change *cg;
list_for_each_entry_reverse(cg, &mg->changes, list) {
if (!match_change(mg, cg, hd, mi, ids))
continue;
return cg;
}
log_group(mg, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
return NULL;
}
static void mg_info_in(struct mg_info *mi)
{
mi->mg_info_size = le32_to_cpu(mi->mg_info_size);
mi->id_info_size = le32_to_cpu(mi->id_info_size);
mi->id_info_count = le32_to_cpu(mi->id_info_count);
mi->started_count = le32_to_cpu(mi->started_count);
mi->member_count = le32_to_cpu(mi->member_count);
mi->joined_count = le32_to_cpu(mi->joined_count);
mi->remove_count = le32_to_cpu(mi->remove_count);
mi->failed_count = le32_to_cpu(mi->failed_count);
mi->first_recovery_needed = le32_to_cpu(mi->first_recovery_needed);
mi->first_recovery_master = le32_to_cpu(mi->first_recovery_master);
}
static void id_info_in(struct id_info *id)
{
id->nodeid = le32_to_cpu(id->nodeid);
id->jid = le32_to_cpu(id->jid);
id->flags = le32_to_cpu(id->flags);
}
static void ids_in(struct mg_info *mi, struct id_info *ids)
{
struct id_info *id;
int i;
id = ids;
for (i = 0; i < mi->id_info_count; i++) {
id_info_in(id);
id = (struct id_info *)((char *)id + mi->id_info_size);
}
}
static void receive_start(struct mountgroup *mg, struct gfs_header *hd, int len)
{
struct change *cg;
struct member *memb;
struct mg_info *mi;
struct id_info *ids;
uint32_t seq = hd->msgdata;
int added;
log_group(mg, "receive_start %d:%u len %d", hd->nodeid, seq, len);
mi = (struct mg_info *)((char *)hd + sizeof(struct gfs_header));
ids = (struct id_info *)((char *)mi + sizeof(struct mg_info));
mg_info_in(mi);
ids_in(mi, ids);
cg = find_change(mg, hd, mi, ids);
if (!cg)
return;
memb = find_memb(cg, hd->nodeid);
if (!memb) {
/* this should never happen since match_change checks it */
log_error("receive_start no member %d", hd->nodeid);
return;
}
added = is_added(mg, hd->nodeid);
if (added && mi->started_count) {
log_error("receive_start %d:%u add node with started_count %u",
hd->nodeid, seq, mi->started_count);
- /* observe this scheme working before using it; I'm not sure
- that a joining node won't ever see an existing node as added
- under normal circumstances */
- /*
+ /* see comment in fence/fenced/cpg.c */
memb->disallowed = 1;
return;
- */
}
node_history_start(mg, hd->nodeid);
memb->start = 1;
if (memb->start_msg) {
/* shouldn't happen */
log_error("receive_start %d:%u dup start msg", hd->nodeid, seq);
return;
}
/* save a copy of each start message */
memb->start_msg = malloc(len);
if (!memb->start_msg) {
log_error("receive_start len %d no mem", len);
return;
}
memcpy(memb->start_msg, hd, len);
/* a shortcut to the saved mg_info */
memb->mg_info = (struct mg_info *)(memb->start_msg +
sizeof(struct gfs_header));
}
/* start messages are associated with a specific change and use the
find_change/match_change routines to make sure all start messages
are matched with the same change on all nodes. The current set of
changes are cleared after a completed start cycle. Other messages
happen outside the context of changes. An "incomplete" start cycle
is when a confchg arrives (adding a new change struct) before all
start messages have been received for the current change. In this
case, all members send a new start message for the latest change,
and any start messages received for the previous change(s) are ignored.
To sync state with start messages, we need to include:
- the state before applying any of the current set of queued changes
(new nodes will initialize with this)
- the essential info from changes in the set that's being started,
so nodes added by one of the queued changes can apply the same changes
to the init state that the existing nodes do. */
/* recovery_result and mount_done messages may arrive between the time
that an old node sends start and the time a new node receives it.
two old nodes may also send start before/after a recovery_result or
mount_done message, creating inconsistent data in their start messages.
Soln: a new node saves recovery_result/mount_done messages between
last confchg and final start. the new node knows that a start message
from an old node may or may not include the effects from rr/md messages
since the last confchg, but *will* include all effects from prior to
the last confchg. The saved rr/md messages can be applied on top of
the state from an old node's start message; applying them a second time
should not change anything, producing the same result. */
static int count_ids(struct mountgroup *mg)
{
struct change *cg;
struct member *memb;
struct journal *j;
int count = 0;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list)
count++;
list_for_each_entry(j, &mg->journals, list)
if (j->needs_recovery)
count++;
list_for_each_entry(cg, &mg->changes, list) {
list_for_each_entry(memb, &cg->removed, list) {
j = find_journal_by_nodeid(mg, memb->nodeid);
if (j)
count++;
}
}
return count;
}
/* old member: current member that has completed a start cycle
new member: current member that has not yet completed a start cycle */
static void send_start(struct mountgroup *mg)
{
struct change *cg, *c;
struct gfs_header *hd;
struct mg_info *mi;
struct id_info *id;
struct member *memb;
struct node *node;
struct journal *j;
char *buf;
uint32_t flags;
int len, id_count, jid;
int old_memb = 0, new_memb = 0, old_journal = 0, new_journal = 0;
cg = list_first_entry(&mg->changes, struct change, list);
id_count = count_ids(mg);
/* sanity check */
if (!mg->started_count && id_count != cg->member_count) {
log_error("send_start bad counts id_count %d member_count %d",
cg->member_count, id_count);
return;
}
len = sizeof(struct gfs_header) + sizeof(struct mg_info) +
id_count * sizeof(struct id_info);
buf = malloc(len);
if (!buf) {
log_error("send_start len %d no mem", len);
return;
}
memset(buf, 0, len);
hd = (struct gfs_header *)buf;
mi = (struct mg_info *)(buf + sizeof(*hd));
id = (struct id_info *)(buf + sizeof(*hd) + sizeof(*mi));
/* fill in header (gfs_send_message handles part of header) */
hd->type = GFS_MSG_START;
hd->msgdata = cg->seq;
hd->flags |= mg->joining ? GFS_MFLG_JOINING : 0;
/* fill in mg_info */
mi->mg_info_size = cpu_to_le32(sizeof(struct mg_info));
mi->id_info_size = cpu_to_le32(sizeof(struct id_info));
mi->id_info_count = cpu_to_le32(id_count);
mi->started_count = cpu_to_le32(mg->started_count);
mi->member_count = cpu_to_le32(cg->member_count);
mi->joined_count = cpu_to_le32(cg->joined_count);
mi->remove_count = cpu_to_le32(cg->remove_count);
mi->failed_count = cpu_to_le32(cg->failed_count);
mi->first_recovery_needed = cpu_to_le32(mg->first_recovery_needed);
mi->first_recovery_master = cpu_to_le32(mg->first_recovery_master);
/* fill in id_info entries */
/* New members send info about themselves, and empty id_info slots for
all other members. Old members send full info about all old
members, and empty id_info slots about new members. The union of
start messages from a single old node and all new nodes give a
complete picture of state for all members. In sync_state, all nodes
(old and new) make this union, and then assign jid's to new nodes. */
list_for_each_entry(memb, &cg->members, list) {
if (!mg->started_count || is_added(mg, memb->nodeid)) {
/* send empty slot for new member */
jid = JID_NONE;
flags = IDI_NODEID_IS_MEMBER;
/* include our own info which no one knows yet */
if (!mg->started_count && memb->nodeid == our_nodeid) {
flags |= mg->ro ? IDI_MOUNT_RO : 0;
flags |= mg->spectator ? IDI_MOUNT_SPECTATOR : 0;
}
new_memb++;
} else {
/* send full info for old member */
node = get_node_history(mg, memb->nodeid);
if (!node) {
- log_error("send_start no node %d",memb->nodeid);
+ log_error("send_start no nodeid %d", memb->nodeid);
continue;
}
jid = node->jid;
flags = IDI_NODEID_IS_MEMBER;
flags |= node->ro ? IDI_MOUNT_RO : 0;
flags |= node->spectator ? IDI_MOUNT_SPECTATOR : 0;
flags |= node->kernel_mount_done ? IDI_MOUNT_DONE : 0;
flags |= node->kernel_mount_error ? IDI_MOUNT_ERROR : 0;
old_memb++;
}
id->nodeid = cpu_to_le32(memb->nodeid);
id->jid = cpu_to_le32(jid);
id->flags = cpu_to_le32(flags);
id++;
}
/* journals needing recovery from previous start cycles */
list_for_each_entry(j, &mg->journals, list) {
if (j->needs_recovery) {
id->jid = cpu_to_le32(j->jid);
id->flags = cpu_to_le32(IDI_JID_NEEDS_RECOVERY);
id++;
old_journal++;
}
}
/* journals needing recovery from the current start cycle */
list_for_each_entry(c, &mg->changes, list) {
list_for_each_entry(memb, &c->removed, list) {
j = find_journal_by_nodeid(mg, memb->nodeid);
if (j) {
id->jid = cpu_to_le32(j->jid);
id->flags = cpu_to_le32(IDI_JID_NEEDS_RECOVERY);
id++;
new_journal++;
}
}
}
/* sanity check */
if (!mg->started_count && (old_memb || old_journal || new_journal)) {
- log_error("send_start %u bad counts om %d nm %d oj %d nj %d",
+ log_error("send_start cg %u bad counts om %d nm %d oj %d nj %d",
cg->seq, old_memb, new_memb, old_journal, new_journal);
return;
}
- log_group(mg, "send_start %u id_count %d om %d nm %d oj %d nj %d",
+ log_group(mg, "send_start cg %u id_count %d om %d nm %d oj %d nj %d",
cg->seq, id_count, old_memb, new_memb, old_journal,
new_journal);
gfs_send_message(mg, buf, len);
free(buf);
}
static void send_mount_done(struct mountgroup *mg, int result)
{
struct gfs_header h;
memset(&h, 0, sizeof(h));
h.type = GFS_MSG_MOUNT_DONE;
h.msgdata = result;
gfs_send_message(mg, (char *)&h, sizeof(h));
}
static void send_first_recovery_done(struct mountgroup *mg)
{
struct gfs_header h;
memset(&h, 0, sizeof(h));
h.type = GFS_MSG_FIRST_RECOVERY_DONE;
gfs_send_message(mg, (char *)&h, sizeof(h));
}
static void send_recovery_result(struct mountgroup *mg, int jid, int result)
{
struct gfs_header *hd;
char *buf;
int len, *p;
len = sizeof(struct gfs_header) + 2 * sizeof(int);
buf = malloc(len);
if (!buf) {
return;
}
memset(buf, 0, len);
hd = (struct gfs_header *)buf;
hd->type = GFS_MSG_RECOVERY_RESULT;
p = (int *)(buf + sizeof(struct gfs_header));
p[0] = cpu_to_le32(jid);
p[1] = cpu_to_le32(result);
gfs_send_message(mg, buf, len);
free(buf);
}
static void save_message(struct mountgroup *mg, struct gfs_header *hd, int len)
{
struct change *cg;
struct save_msg *sm;
cg = list_first_entry(&mg->changes, struct change, list);
sm = malloc(sizeof(struct save_msg) + len);
if (!sm) {
log_error("save_message len %d no mem", len);
return;
}
sm->len = len;
memcpy(sm->buf, hd, len);
list_add_tail(&sm->list, &cg->saved_messages);
}
void gfs_mount_done(struct mountgroup *mg)
{
send_mount_done(mg, mg->kernel_mount_error);
}
static void receive_mount_done(struct mountgroup *mg, struct gfs_header *hd,
int len)
{
struct node *node;
- log_group(mg, "receive_mount_done %d result %d",
+ log_group(mg, "receive_mount_done from %d result %d",
hd->nodeid, hd->msgdata);
node = get_node_history(mg, hd->nodeid);
if (!node) {
log_error("receive_mount_done no nodeid %d", hd->nodeid);
return;
}
node->kernel_mount_done = 1;
node->kernel_mount_error = hd->msgdata;
}
static void receive_recovery_result(struct mountgroup *mg,
struct gfs_header *hd, int len)
{
struct journal *j;
int jid, result, *p;
p = (int *)((char *)hd + sizeof(struct gfs_header));
jid = le32_to_cpu(p[0]);
result = le32_to_cpu(p[1]);
- log_group(mg, "receive_recovery_result %d jid %d result %d",
+ log_group(mg, "receive_recovery_result from %d jid %d result %d",
hd->nodeid, jid, result);
j = find_journal(mg, jid);
if (!j) {
- log_error("receive_recovery_result %d no jid %d",
+ log_error("receive_recovery_result from %d no jid %d",
hd->nodeid, jid);
return;
}
if (!j->needs_recovery)
return;
if (result == LM_RD_SUCCESS)
j->needs_recovery = 0;
else {
j->failed_recovery_count++;
log_group(mg, "jid %d failed_recovery_count %d", jid,
j->failed_recovery_count);
}
}
static void receive_first_recovery_done(struct mountgroup *mg,
struct gfs_header *hd, int len)
{
int master = mg->first_recovery_master;
- log_group(mg, "receive_first_recovery_done %d master %d "
+ log_group(mg, "receive_first_recovery_done from %d master %d "
"mount_client_notified %d",
hd->nodeid, master, mg->mount_client_notified);
if (master != hd->nodeid)
- log_error("receive_first_recovery_done %d master %d",
+ log_error("receive_first_recovery_done from %d master %d",
hd->nodeid, master);
if (list_empty(&mg->changes)) {
/* everything is idle, no changes in progress */
mg->first_recovery_needed = 0;
mg->first_recovery_master = 0;
mg->first_recovery_msg = 1;
if (master != our_nodeid)
start_kernel(mg);
} else {
/* Everyone will receive this message in the same sequence
wrt other start messages and confchgs:
- If a new confchg arrives after this message (and before
the final start message in the current start cycle),
a new start cycle will begin. All nodes before the
confchg will have frn=0 due to receiving this message,
and nodes added by the confchg will see frn=0 in all
start messages (in any_nodes_first_recovery() which
returns 0).
- If the final start message arrives after this message,
the start cycle will complete, running sync_state(), on
all current nodes with all having seen this message.
Old and new nodes in the current start cycle will see
this msg and use it (first_recovery_msg) instead of the
first_recovery_needed/master data in the start messages
(which may be inconsistent due to members sending their
start messages either before or after receiving this
message). */
mg->first_recovery_needed = 0;
mg->first_recovery_master = 0;
mg->first_recovery_msg = 1;
}
}
/* start message from all nodes shows zero started_count */
static int all_nodes_new(struct mountgroup *mg)
{
struct change *cg;
struct member *memb;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (memb->mg_info->started_count)
return 0;
}
return 1;
}
/* does start message from any node with non-zero started_count have
first_recovery_needed set? (verify that all started nodes agree on
first_recovery_needed) */
static int any_nodes_first_recovery(struct mountgroup *mg)
{
struct change *cg;
struct member *memb;
int yes = 0, no = 0, master = 0;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (!memb->mg_info->started_count)
continue;
if (memb->mg_info->first_recovery_needed)
yes++;
else
no++;
}
if (no && yes) {
/* disagreement on first_recovery_needed, shouldn't happen */
log_error("any_nodes_first_recovery no %d yes %d", no, yes);
return 1;
}
if (no)
return 0;
/* sanity check: verify agreement on the master */
list_for_each_entry(memb, &cg->members, list) {
if (!memb->mg_info->started_count)
continue;
if (!master) {
master = memb->mg_info->first_recovery_master;
continue;
}
if (master == memb->mg_info->first_recovery_master)
continue;
/* disagreement on master, shouldn't happen */
log_error("any_nodes_first_recovery master %d vs %d",
master, memb->mg_info->first_recovery_master);
}
return 1;
}
/* If all nodes new, there's no previous master, pick low nodeid;
if not all nodes new, there will be a previous master, use that one unless
it's no longer a member; if master is no longer a member pick low nodeid.
The current master will already be set in mg->first_recovery_master for old
nodes, but new nodes will need to look in the start messages to find it. */
static int pick_first_recovery_master(struct mountgroup *mg, int all_new)
{
struct change *cg;
struct member *memb;
int old = 0, low = 0;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (memb->mg_info->started_count)
old = memb->mg_info->first_recovery_master;
if (!low)
low = memb->nodeid;
else if (memb->nodeid < low)
low = memb->nodeid;
}
memb = find_memb(cg, old);
if (!memb || all_new) {
log_group(mg, "pick_first_recovery_master low %d old %d",
low, old);
return low;
}
log_group(mg, "pick_first_recovery_master old %d", old);
return old;
}
/* use a start message from an old node to create node info for each old node */
static void create_old_nodes(struct mountgroup *mg)
{
struct change *cg;
struct member *memb;
struct node *node;
struct journal *j;
struct id_info *ids, *id;
int id_count, id_size, rv;
/* get ids from a start message of an old node */
rv = get_id_list(mg, &ids, &id_count, &id_size);
if (rv) {
/* all new nodes, no old nodes */
log_group(mg, "create_old_nodes all new");
return;
}
/* use id list to set info for all old nodes */
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (!memb->mg_info->started_count)
continue;
node = get_node_history(mg, memb->nodeid);
id = get_id_struct(ids, id_count, id_size, memb->nodeid);
if (!node || !id) {
/* shouldn't happen */
log_error("create_old_nodes %d node %d id %d",
memb->nodeid, !!node, !!id);
return;
}
if (!(id->flags & IDI_NODEID_IS_MEMBER) ||
(id->flags & IDI_JID_NEEDS_RECOVERY)) {
/* shouldn't happen */
log_error("create_old_nodes %d bad flags %x",
memb->nodeid, id->flags);
return;
}
node->jid = id->jid;
node->kernel_mount_done = !!(id->flags & IDI_MOUNT_DONE);
node->kernel_mount_error = !!(id->flags & IDI_MOUNT_ERROR);
node->ro = !!(id->flags & IDI_MOUNT_RO);
node->spectator = !!(id->flags & IDI_MOUNT_SPECTATOR);
j = malloc(sizeof(struct journal));
if (!j) {
log_error("create_old_nodes no mem");
return;
}
memset(j, 0, sizeof(struct journal));
j->nodeid = node->nodeid;
j->jid = node->jid;
list_add(&j->list, &mg->journals);
log_group(mg, "create_old_nodes %d jid %d ro %d spect %d "
"kernel_mount_done %d error %d",
node->nodeid, node->jid, node->ro, node->spectator,
node->kernel_mount_done, node->kernel_mount_error);
}
}
/* use start messages from new nodes to create node info for each new node */
static void create_new_nodes(struct mountgroup *mg)
{
struct change *cg;
struct member *memb;
struct id_info *ids, *id;
struct node *node;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (memb->mg_info->started_count)
continue;
node = get_node_history(mg, memb->nodeid);
if (!node) {
/* shouldn't happen */
log_error("create_new_nodes %d no node", memb->nodeid);
return;
}
ids = (struct id_info *)(memb->start_msg +
sizeof(struct gfs_header) +
memb->mg_info->mg_info_size);
id = get_id_struct(ids, memb->mg_info->id_info_count,
memb->mg_info->id_info_size, memb->nodeid);
if (!(id->flags & IDI_NODEID_IS_MEMBER) ||
(id->flags & IDI_JID_NEEDS_RECOVERY)) {
/* shouldn't happen */
log_error("create_new_nodes %d bad flags %x",
memb->nodeid, id->flags);
return;
}
node->jid = JID_NONE;
node->ro = !!(id->flags & IDI_MOUNT_RO);
node->spectator = !!(id->flags & IDI_MOUNT_SPECTATOR);
log_group(mg, "create_new_nodes %d ro %d spect %d",
node->nodeid, node->ro, node->spectator);
}
}
static void create_failed_journals(struct mountgroup *mg)
{
struct journal *j;
struct id_info *ids, *id;
int id_count, id_size;
int rv, i;
rv = get_id_list(mg, &ids, &id_count, &id_size);
if (rv) {
/* all new nodes, no old nodes */
return;
}
id = ids;
for (i = 0; i < id_count; i++) {
if (!(id->flags & IDI_JID_NEEDS_RECOVERY))
continue;
j = malloc(sizeof(struct journal));
if (!j) {
log_error("create_failed_journals no mem");
return;
}
memset(j, 0, sizeof(struct journal));
j->jid = id->jid;
j->needs_recovery = 1;
list_add(&j->list, &mg->journals);
id = (struct id_info *)((char *)id + id_size);
log_group(mg, "create_failed_journals jid %d", j->jid);
}
}
static void set_failed_journals(struct mountgroup *mg)
{
struct change *cg;
struct member *memb;
struct journal *j;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(cg, &mg->changes, list) {
list_for_each_entry(memb, &cg->removed, list) {
if (!memb->failed)
continue;
j = find_journal_by_nodeid(mg, memb->nodeid);
if (j) {
j->needs_recovery = 1;
j->failed_nodeid = j->nodeid;
j->nodeid = 0;
log_group(mg, "set_failed_journals jid %d "
"nodeid %d", j->jid, memb->nodeid);
} else {
log_group(mg, "set_failed_journals no journal "
"for nodeid %d ", memb->nodeid);
}
}
}
}
/* returns nodeid of new member with the next highest nodeid */
static int next_new_nodeid(struct mountgroup *mg, int prev)
{
struct change *cg;
struct member *memb;
int low = 0;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry(memb, &cg->members, list) {
if (memb->mg_info->started_count)
continue;
if (memb->nodeid <= prev)
continue;
if (!low)
low = memb->nodeid;
else if (memb->nodeid < low)
low = memb->nodeid;
}
return low;
}
/* returns lowest unused jid */
static int next_free_jid(struct mountgroup *mg)
{
int i;
for (i = 0; i < MAX_JOURNALS; i++) {
if (!find_journal(mg, i))
return i;
}
return -1;
}
static void create_new_journals(struct mountgroup *mg)
{
struct journal *j, *safe;
struct change *cg;
struct node *node;
int nodeid = 0;
cg = list_first_entry(&mg->changes, struct change, list);
/* first get rid of journal structs that are no longer used
or dirty, i.e. from nodes that have unmounted/left, or
journals that have been recovered */
list_for_each_entry_safe(j, safe, &mg->journals, list) {
if (j->needs_recovery)
continue;
if (find_memb(cg, j->nodeid))
continue;
list_del(&j->list);
free(j);
}
while (1) {
nodeid = next_new_nodeid(mg, nodeid);
if (!nodeid)
break;
node = get_node_history(mg, nodeid);
if (!node) {
/* shouldn't happen */
log_error("create_new_journals no nodeid %d", nodeid);
continue;
}
if (node->spectator)
node->jid = JID_NONE;
else
node->jid = next_free_jid(mg);
if (node->nodeid == our_nodeid)
mg->our_jid = node->jid;
log_group(mg, "create_new_journals %d gets jid %d",
node->nodeid, node->jid);
if (node->jid == JID_NONE)
continue;
j = malloc(sizeof(struct journal));
if (!j) {
log_error("create_new_journals no mem");
continue;
}
memset(j, 0, sizeof(struct journal));
j->nodeid = nodeid;
j->jid = node->jid;
list_add(&j->list, &mg->journals);
}
}
/* recovery_result and mount_done messages are saved by new members until
they've completed the start cycle and have member state to apply them to.
The start messages from old nodes may not reflect the rr/md updates. */
static void apply_saved_messages(struct mountgroup *mg)
{
struct change *cg;
struct save_msg *sm, *safe;
struct gfs_header *hd;
cg = list_first_entry(&mg->changes, struct change, list);
list_for_each_entry_safe(sm, safe, &cg->saved_messages, list) {
hd = (struct gfs_header *)sm->buf;
switch (hd->type) {
case GFS_MSG_MOUNT_DONE:
receive_mount_done(mg, hd, sm->len);
break;
case GFS_MSG_RECOVERY_RESULT:
receive_recovery_result(mg, hd, sm->len);
break;
}
list_del(&sm->list);
free(sm);
}
}
/* this is run immediately after receiving the final start message in a start
cycle, so all nodes will run this in the same sequence wrt other messages
and confchgs */
static void sync_state(struct mountgroup *mg)
{
/* This is needed for the case where the first_recovery_done message
arrives while a change/start cycle is in progress. The
first_recovery data in the start messages (used by new nodes in this
cycle to determine the first_recovery state) may be inconsistent in
different start messages (because nodes sent their start messages at
different times wrt the first_recovery_done message.) But, in the
case where the new nodes received the first_recovery_done message,
they can just use that and don't need the (possibly inconsistent)
first recovery data in the start messages. */
if (mg->first_recovery_msg) {
if (mg->first_recovery_needed || mg->first_recovery_master) {
/* shouldn't happen */
log_error("sync_state first_recovery_msg needed %d "
"master %d", mg->first_recovery_needed,
mg->first_recovery_master);
}
log_group(mg, "sync_state first_recovery_msg");
goto out;
}
/* This is the path the initial start cycle for the group always
follows. It's the case where one or more nodes are all starting up
for the first time. No one has completed a start cycle yet because
everyone is joining, and one node needs to do first recovery. */
if (all_nodes_new(mg)) {
if (mg->first_recovery_needed || mg->first_recovery_master) {
/* shouldn't happen */
log_error("sync_state all_nodes_new first_recovery "
"needed %d master %d",
mg->first_recovery_needed,
mg->first_recovery_master);
}
mg->first_recovery_needed = 1;
mg->first_recovery_master = pick_first_recovery_master(mg, 1);
log_group(mg, "sync_state all_nodes_new first_recovery_needed "
"master %d", mg->first_recovery_master);
goto out;
}
/* This is for the case where new nodes are added to existing members
that have first_recovery_needed set. */
if (any_nodes_first_recovery(mg)) {
mg->first_recovery_needed = 1;
mg->first_recovery_master = pick_first_recovery_master(mg, 0);
log_group(mg, "sync_state first_recovery_needed master %d",
mg->first_recovery_master);
goto out;
}
/* Normal case where nodes join an established group that completed
first recovery sometime in the past. Existing nodes that weren't
around during first recovery come through here, and new nodes
being added in this cycle come through here. */
if (mg->first_recovery_needed) {
/* shouldn't happen */
log_error("sync_state frn should not be set");
}
out:
if (!mg->started_count) {
create_old_nodes(mg);
create_new_nodes(mg);
create_failed_journals(mg);
apply_saved_messages(mg);
create_new_journals(mg);
} else {
create_new_nodes(mg);
set_failed_journals(mg);
create_new_journals(mg);
}
}
static void apply_changes(struct mountgroup *mg)
{
struct change *cg;
cg = list_first_entry(&mg->changes, struct change, list);
switch (cg->state) {
case CGST_WAIT_CONDITIONS:
if (wait_conditions_done(mg)) {
send_start(mg);
cg->state = CGST_WAIT_MESSAGES;
}
break;
case CGST_WAIT_MESSAGES:
if (wait_messages_done(mg)) {
sync_state(mg);
cleanup_changes(mg);
}
break;
default:
log_error("apply_changes invalid state %d", cg->state);
}
}
/* We send messages with the info from kernel uevents or mount.gfs ipc,
and then process the uevent/ipc upon receiving the message for it, so
that it can be processed in the same order by all nodes. */
void process_recovery_uevent(char *table)
{
struct mountgroup *mg;
struct journal *j;
char *name = strstr(table, ":") + 1;
int jid, recover_status, first_done;
int rv;
mg = find_mg(name);
if (!mg) {
log_error("process_recovery_uevent mg not found %s", table);
return;
}
rv = read_sysfs_int(mg, "recover_done", &jid);
if (rv < 0) {
log_error("process_recovery_uevent recover_done read %d", rv);
return;
}
rv = read_sysfs_int(mg, "recover_status", &recover_status);
if (rv < 0) {
log_error("process_recovery_uevent recover_status read %d", rv);
return;
}
if (!mg->first_recovery_needed) {
if (!mg->local_recovery_busy) {
/* This will happen in two known situations:
- we get a recovery_done uevent for our own journal
when we mount (jid == mg->our_jid)
- the first mounter will read first_done and clear
first_recovery_needed before seeing the change
uevent from others_may_mount */
log_group(mg, "process_recovery_uevent jid %d ignore",
jid);
return;
}
mg->local_recovery_busy = 0;
if (mg->local_recovery_jid != jid) {
log_error("process_recovery_uevent jid %d exp %d",
jid, mg->local_recovery_jid);
return;
}
j = find_journal(mg, jid);
if (!j) {
log_error("process_recovery_uevent no journal %d", jid);
return;
}
log_group(mg, "process_recovery_uevent jid %d status %d",
jid, recover_status);
j->local_recovery_done = 1;
j->local_recovery_result = recover_status;
/* j->needs_recovery will be cleared when we receive this
recovery_result message */
send_recovery_result(mg, jid, recover_status);
} else {
/*
* Assumption here is that only the first mounter will get
* uevents when first_recovery_needed is set.
*/
/* make a local record of jid and recover_status; we may want
to check below that we've seen uevents for all jids
during first recovery before sending first_recovery_done. */
log_group(mg, "process_recovery_uevent jid %d status %d "
"ignore during first_recovery", jid, recover_status);
rv = read_sysfs_int(mg, "first_done", &first_done);
if (rv < 0) {
log_error("process_recovery_uevent first_done read %d",
rv);
return;
}
if (first_done) {
log_group(mg, "process_recovery_uevent first_done");
send_first_recovery_done(mg);
}
}
process_mountgroup(mg);
}
static void start_journal_recovery(struct mountgroup *mg, int jid)
{
int rv;
log_group(mg, "start_journal_recovery jid %d", jid);
rv = set_sysfs(mg, "recover", jid);
if (rv < 0) {
log_error("start_journal_recovery %d error %d", jid, rv);
return;
}
mg->local_recovery_busy = 1;
mg->local_recovery_jid = jid;
}
static int wait_recoveries_done(struct mountgroup *mg)
{
struct journal *j;
int wait_count = 0;
list_for_each_entry(j, &mg->journals, list) {
if (j->needs_recovery) {
log_group(mg, "wait_recoveries jid %d unrecovered",
j->jid);
wait_count++;
}
}
if (wait_count)
return 0;
log_group(mg, "wait_recoveries done");
return 1;
}
/* pick a jid that has not been successfully recovered by someone else
(received recovery_result success message) and hasn't been recovered
by us (local record); if nothing to recover, return 0 */
static int pick_journal_to_recover(struct mountgroup *mg, int *jid)
{
struct journal *j;
list_for_each_entry(j, &mg->journals, list) {
if (j->needs_recovery && !j->local_recovery_done) {
*jid = j->jid;
return 1;
}
}
return 0;
}
/* processing that happens after all changes have been dealt with */
static void recover_and_start(struct mountgroup *mg)
{
int jid;
if (mg->first_recovery_needed) {
if (mg->first_recovery_master == our_nodeid &&
!mg->mount_client_notified) {
log_group(mg, "recover_and_start first start_kernel");
mg->first_mounter = 1; /* adds first=1 to hostdata */
start_kernel(mg); /* includes reply to mount.gfs */
}
return;
}
/* The normal non-first-recovery mode. When a recovery_done message
is received, check whether any more journals need recovery. If
so, start recovery on the next one, if not, start the kernel. */
if (!wait_recoveries_done(mg)) {
if (!mg->kernel_mount_done || mg->kernel_mount_error)
return;
if (mg->spectator)
return;
if (mg->local_recovery_busy)
return;
if (pick_journal_to_recover(mg, &jid))
start_journal_recovery(mg, jid);
} else {
if (!mg->kernel_stopped)
return;
log_group(mg, "recover_and_start start_kernel");
start_kernel(mg);
}
}
static void process_mountgroup(struct mountgroup *mg)
{
if (!list_empty(&mg->changes))
apply_changes(mg);
if (mg->started_change && list_empty(&mg->changes))
recover_and_start(mg);
}
void process_mountgroups(void)
{
struct mountgroup *mg, *safe;
list_for_each_entry_safe(mg, safe, &mountgroups, list)
process_mountgroup(mg);
}
static int add_change(struct mountgroup *mg,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries,
struct change **cg_out)
{
struct change *cg;
struct member *memb;
int i, error;
cg = malloc(sizeof(struct change));
if (!cg)
goto fail_nomem;
memset(cg, 0, sizeof(struct change));
INIT_LIST_HEAD(&cg->members);
INIT_LIST_HEAD(&cg->removed);
INIT_LIST_HEAD(&cg->saved_messages);
cg->state = CGST_WAIT_CONDITIONS;
cg->seq = ++mg->change_seq;
if (!cg->seq)
cg->seq = ++mg->change_seq;
cg->member_count = member_list_entries;
cg->joined_count = joined_list_entries;
cg->remove_count = left_list_entries;
for (i = 0; i < member_list_entries; i++) {
memb = malloc(sizeof(struct member));
if (!memb)
goto fail_nomem;
memset(memb, 0, sizeof(struct member));
memb->nodeid = member_list[i].nodeid;
list_add_tail(&memb->list, &cg->members);
}
for (i = 0; i < left_list_entries; i++) {
memb = malloc(sizeof(struct member));
if (!memb)
goto fail_nomem;
memset(memb, 0, sizeof(struct member));
memb->nodeid = left_list[i].nodeid;
if (left_list[i].reason == CPG_REASON_NODEDOWN ||
left_list[i].reason == CPG_REASON_PROCDOWN) {
memb->failed = 1;
cg->failed_count++;
}
list_add_tail(&memb->list, &cg->removed);
if (memb->failed)
node_history_fail(mg, memb->nodeid, cg,
left_list[i].reason);
else
node_history_left(mg, memb->nodeid, cg);
- log_group(mg, "add_change %u nodeid %d remove reason %d",
+ log_group(mg, "add_change cg %u remove nodeid %d reason %d",
cg->seq, memb->nodeid, left_list[i].reason);
if (left_list[i].reason == CPG_REASON_PROCDOWN)
kick_node_from_cluster(memb->nodeid);
}
for (i = 0; i < joined_list_entries; i++) {
memb = find_memb(cg, joined_list[i].nodeid);
if (!memb) {
log_error("no member %d", joined_list[i].nodeid);
error = -ENOENT;
goto fail;
}
memb->added = 1;
if (memb->nodeid == our_nodeid)
cg->we_joined = 1;
else
node_history_init(mg, memb->nodeid, cg);
- log_group(mg, "add_change %u nodeid %d joined", cg->seq,
+ log_group(mg, "add_change cg %u joined nodeid %d", cg->seq,
memb->nodeid);
}
if (cg->we_joined) {
- log_group(mg, "add_change %u we joined", cg->seq);
+ log_group(mg, "add_change cg %u we joined", cg->seq);
list_for_each_entry(memb, &cg->members, list)
node_history_init(mg, memb->nodeid, cg);
}
- log_group(mg, "add_change %u member %d joined %d remove %d failed %d",
- cg->seq, cg->member_count, cg->joined_count, cg->remove_count,
- cg->failed_count);
+ log_group(mg, "add_change cg %u counts member %d joined %d remove %d "
+ "failed %d", cg->seq, cg->member_count, cg->joined_count,
+ cg->remove_count, cg->failed_count);
list_add(&cg->list, &mg->changes);
*cg_out = cg;
return 0;
fail_nomem:
log_error("no memory");
error = -ENOMEM;
fail:
free_cg(cg);
return error;
}
static int we_left(struct cpg_address *left_list, int left_list_entries)
{
int i;
for (i = 0; i < left_list_entries; i++) {
if (left_list[i].nodeid == our_nodeid)
return 1;
}
return 0;
}
static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
struct mountgroup *mg;
struct change *cg;
int rv;
mg = find_mg_handle(handle);
if (!mg) {
log_error("confchg_cb no mountgroup for cpg %s",
group_name->value);
return;
}
if (mg->leaving && we_left(left_list, left_list_entries)) {
/* we called cpg_leave(), and this should be the final
cpg callback we receive */
log_group(mg, "confchg for our leave");
dlmc_fs_unregister(dlmcontrol_fd, mg->name);
cpg_finalize(mg->cpg_handle);
client_dead(mg->cpg_client);
list_del(&mg->list);
free_mg(mg);
return;
}
rv = add_change(mg, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, &cg);
if (rv)
return;
process_mountgroup(mg);
}
static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *data, int len)
{
struct mountgroup *mg;
struct gfs_header *hd;
mg = find_mg_handle(handle);
if (!mg) {
log_error("deliver_cb no mg for cpg %s", group_name->value);
return;
}
hd = (struct gfs_header *)data;
hd->version[0] = le16_to_cpu(hd->version[0]);
hd->version[1] = le16_to_cpu(hd->version[1]);
hd->version[2] = le16_to_cpu(hd->version[2]);
hd->type = le16_to_cpu(hd->type);
hd->nodeid = le32_to_cpu(hd->nodeid);
hd->to_nodeid = le32_to_cpu(hd->to_nodeid);
hd->global_id = le32_to_cpu(hd->global_id);
hd->flags = le32_to_cpu(hd->flags);
hd->msgdata = le32_to_cpu(hd->msgdata);
if (hd->version[0] != protocol_active[0]) {
log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
nodeid, hd->version[0], hd->version[1],
hd->version[2], protocol_active[0],
protocol_active[1], protocol_active[2]);
return;
}
if (hd->nodeid != nodeid) {
log_error("bad msg nodeid %d %d", hd->nodeid, nodeid);
return;
}
switch (hd->type) {
case GFS_MSG_START:
receive_start(mg, hd, len);
break;
case GFS_MSG_MOUNT_DONE:
if (!mg->started_count)
save_message(mg, hd, len);
else
receive_mount_done(mg, hd, len);
break;
case GFS_MSG_FIRST_RECOVERY_DONE:
receive_first_recovery_done(mg, hd, len);
break;
case GFS_MSG_RECOVERY_RESULT:
if (!mg->started_count)
save_message(mg, hd, len);
else
receive_recovery_result(mg, hd, len);
break;
default:
log_error("unknown msg type %d", hd->type);
}
process_mountgroup(mg);
}
static cpg_callbacks_t cpg_callbacks = {
.cpg_deliver_fn = deliver_cb,
.cpg_confchg_fn = confchg_cb,
};
static void process_mountgroup_cpg(int ci)
{
struct mountgroup *mg;
cpg_error_t error;
mg = find_mg_ci(ci);
if (!mg) {
log_error("process_mountgroup_cpg no mountgroup for ci %d", ci);
return;
}
error = cpg_dispatch(mg->cpg_handle, CPG_DISPATCH_ALL);
if (error != CPG_OK) {
log_error("cpg_dispatch error %d", error);
return;
}
update_flow_control_status();
}
int gfs_join_mountgroup(struct mountgroup *mg)
{
cpg_error_t error;
cpg_handle_t h;
struct cpg_name name;
int i = 0, fd, ci, rv;
/* I think this registration with dlm_controld could be done
just about anywhere before we do the mount(2). */
rv = dlmc_fs_register(dlmcontrol_fd, mg->name);
if (rv) {
log_error("dlmc_fs_register failed %d", rv);
return rv;
}
error = cpg_initialize(&h, &cpg_callbacks);
if (error != CPG_OK) {
log_error("cpg_initialize error %d", error);
goto fail;
}
cpg_fd_get(h, &fd);
ci = client_add(fd, process_mountgroup_cpg, NULL);
mg->cpg_handle = h;
mg->cpg_client = ci;
mg->cpg_fd = fd;
mg->kernel_stopped = 1;
mg->joining = 1;
memset(&name, 0, sizeof(name));
sprintf(name.value, "gfs:%s", mg->name);
name.length = strlen(name.value) + 1;
/* TODO: allow global_id to be set in cluster.conf? */
mg->id = cpgname_to_crc(name.value, name.length);
retry:
error = cpg_join(h, &name);
if (error == CPG_ERR_TRY_AGAIN) {
sleep(1);
if (!(++i % 10))
log_error("cpg_join error retrying");
goto retry;
}
if (error != CPG_OK) {
log_error("cpg_join error %d", error);
cpg_finalize(h);
goto fail_client;
}
return 0;
fail_client:
client_dead(ci);
cpg_finalize(h);
fail:
dlmc_fs_unregister(dlmcontrol_fd, mg->name);
return -ENOTCONN;
}
void gfs_leave_mountgroup(char *mgname, int mnterr)
{
struct mountgroup *mg;
cpg_error_t error;
struct cpg_name name;
int i = 0;
mg = find_mg(mgname);
if (!mg) {
log_error("leave: %s not found", mgname);
return;
}
mg->leaving = 1;
memset(&name, 0, sizeof(name));
sprintf(name.value, "gfs:%s", mg->name);
name.length = strlen(name.value) + 1;
retry:
error = cpg_leave(mg->cpg_handle, &name);
if (error == CPG_ERR_TRY_AGAIN) {
sleep(1);
if (!(++i % 10))
log_error("cpg_leave error retrying");
goto retry;
}
if (error != CPG_OK)
log_error("cpg_leave error %d", error);
}
int setup_cpg(void)
{
cpg_error_t error;
error = cpg_initialize(&libcpg_handle, &cpg_callbacks);
if (error != CPG_OK) {
log_error("setup_cpg cpg_initialize error %d", error);
return -1;
}
/* join "gfs_controld" cpg to interact with other daemons in
the cluster before we start processing uevents? Could this
also help in handling transient partitions? */
return 0;
}
int setup_dlmcontrol(void)
{
dlmcontrol_fd = dlmc_fs_connect();
return dlmcontrol_fd;
}
int set_mountgroup_info(struct mountgroup *mg, struct gfsc_mountgroup *out)
{
struct change *cg, *last = NULL;
strncpy(out->name, mg->name, GFS_MOUNTGROUP_LEN);
out->global_id = mg->id;
if (mg->joining)
out->flags |= GFSC_MF_JOINING;
if (mg->leaving)
out->flags |= GFSC_MF_LEAVING;
if (mg->kernel_stopped)
out->flags |= GFSC_MF_KERNEL_STOPPED;
if (mg->kernel_mount_done)
out->flags |= GFSC_MF_KERNEL_MOUNT_DONE;
if (mg->kernel_mount_error)
out->flags |= GFSC_MF_KERNEL_MOUNT_ERROR;
if (mg->first_recovery_needed)
out->flags |= GFSC_MF_FIRST_RECOVERY_NEEDED;
if (mg->first_recovery_msg)
out->flags |= GFSC_MF_FIRST_RECOVERY_MSG;
if (mg->local_recovery_busy)
out->flags |= GFSC_MF_LOCAL_RECOVERY_BUSY;
if (!mg->started_change)
goto next;
cg = mg->started_change;
out->cg_prev.member_count = cg->member_count;
out->cg_prev.joined_count = cg->joined_count;
out->cg_prev.remove_count = cg->remove_count;
out->cg_prev.failed_count = cg->failed_count;
out->cg_prev.combined_seq = cg->combined_seq;
out->cg_prev.seq = cg->seq;
next:
if (list_empty(&mg->changes))
goto out;
list_for_each_entry(cg, &mg->changes, list)
last = cg;
cg = list_first_entry(&mg->changes, struct change, list);
out->cg_next.member_count = cg->member_count;
out->cg_next.joined_count = cg->joined_count;
out->cg_next.remove_count = cg->remove_count;
out->cg_next.failed_count = cg->failed_count;
out->cg_next.combined_seq = last->seq;
out->cg_next.seq = cg->seq;
/* FIXME: use real definitions for these conditions
(also in dlm_controld) */
if (cg->state == CGST_WAIT_CONDITIONS)
out->cg_next.wait_condition = 4;
if (!mg->kernel_mount_done)
out->cg_next.wait_condition = 1;
if (mg->dlm_notify_nodeid)
out->cg_next.wait_condition = 2;
if (poll_dlm)
out->cg_next.wait_condition = 3;
if (cg->state == CGST_WAIT_MESSAGES)
out->cg_next.wait_messages = 1;
out:
return 0;
}
static int _set_node_info(struct mountgroup *mg, struct change *cg, int nodeid,
struct gfsc_node *node)
{
struct member *m = NULL;
struct node *n;
node->nodeid = nodeid;
if (cg)
m = find_memb(cg, nodeid);
if (!m)
goto history;
node->flags |= GFSC_NF_MEMBER;
if (m->start)
node->flags |= GFSC_NF_START;
if (m->disallowed)
node->flags |= GFSC_NF_DISALLOWED;
history:
n = get_node_history(mg, nodeid);
if (!n)
goto out;
node->jid = n->jid;
if (n->kernel_mount_done)
node->flags |= GFSC_NF_KERNEL_MOUNT_DONE;
if (n->kernel_mount_error)
node->flags |= GFSC_NF_KERNEL_MOUNT_ERROR;
if (n->check_dlm)
node->flags |= GFSC_NF_CHECK_DLM;
if (n->ro)
node->flags |= GFSC_NF_READONLY;
if (n->spectator)
node->flags |= GFSC_NF_SPECTATOR;
node->added_seq = n->added_seq;
node->removed_seq = n->removed_seq;
node->failed_reason = n->failed_reason;
out:
return 0;
}
int set_node_info(struct mountgroup *mg, int nodeid, struct gfsc_node *node)
{
struct change *cg;
if (!list_empty(&mg->changes)) {
cg = list_first_entry(&mg->changes, struct change, list);
return _set_node_info(mg, cg, nodeid, node);
}
return _set_node_info(mg, mg->started_change, nodeid, node);
}
int set_mountgroups(int *count, struct gfsc_mountgroup **mgs_out)
{
struct mountgroup *mg;
struct gfsc_mountgroup *mgs, *mgp;
int mg_count = 0;
list_for_each_entry(mg, &mountgroups, list)
mg_count++;
mgs = malloc(mg_count * sizeof(struct gfsc_mountgroup));
if (!mgs)
return -ENOMEM;
memset(mgs, 0, mg_count * sizeof(struct gfsc_mountgroup));
mgp = mgs;
list_for_each_entry(mg, &mountgroups, list) {
set_mountgroup_info(mg, mgp++);
}
*count = mg_count;
*mgs_out = mgs;
return 0;
}
int set_mountgroup_nodes(struct mountgroup *mg, int option, int *node_count,
struct gfsc_node **nodes_out)
{
struct change *cg;
struct node *n;
struct gfsc_node *nodes = NULL, *nodep;
struct member *memb;
int count = 0;
if (option == GFSC_NODES_ALL) {
if (!list_empty(&mg->changes))
cg = list_first_entry(&mg->changes, struct change,list);
else
cg = mg->started_change;
list_for_each_entry(n, &mg->node_history, list)
count++;
} else if (option == GFSC_NODES_MEMBERS) {
if (!mg->started_change)
goto out;
cg = mg->started_change;
count = cg->member_count;
} else if (option == GFSC_NODES_NEXT) {
if (list_empty(&mg->changes))
goto out;
cg = list_first_entry(&mg->changes, struct change, list);
count = cg->member_count;
} else
goto out;
nodes = malloc(count * sizeof(struct gfsc_node));
if (!nodes)
return -ENOMEM;
memset(nodes, 0, count * sizeof(struct gfsc_node));
nodep = nodes;
if (option == GFSC_NODES_ALL) {
list_for_each_entry(n, &mg->node_history, list)
_set_node_info(mg, cg, n->nodeid, nodep++);
} else {
list_for_each_entry(memb, &cg->members, list)
_set_node_info(mg, cg, memb->nodeid, nodep++);
}
out:
*node_count = count;
*nodes_out = nodes;
return 0;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 11:07 AM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018640
Default Alt Text
(114 KB)
Attached To
Mode
rR Resource Agents
Attached
Detach File
Event Timeline
Log In to Comment