Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F1841965
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
101 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/group/dlm_controld/action.c b/group/dlm_controld/action.c
index 11f6b078a..aaf183ad2 100644
--- a/group/dlm_controld/action.c
+++ b/group/dlm_controld/action.c
@@ -1,800 +1,831 @@
#include "dlm_daemon.h"
#include "config.h"
static int dir_members[MAX_NODES];
static int dir_members_count;
static int comms_nodes[MAX_NODES];
static int comms_nodes_count;
static char mg_name[DLM_LOCKSPACE_LEN+1];
#define DLM_SYSFS_DIR "/sys/kernel/dlm"
#define CLUSTER_DIR "/sys/kernel/config/dlm/cluster"
#define SPACES_DIR "/sys/kernel/config/dlm/cluster/spaces"
#define COMMS_DIR "/sys/kernel/config/dlm/cluster/comms"
/* look for an id that matches in e.g. /sys/fs/gfs/bull\:x/lock_module/id
and then extract the "x" as the name */
static int get_mountgroup_name(uint32_t mg_id)
{
char path[PATH_MAX];
char *fsname, *fsdir;
DIR *d;
FILE *file;
struct dirent *de;
uint32_t id;
int retry_gfs2 = 1;
int rv, error;
fsdir = "/sys/fs/gfs";
retry:
rv = -1;
d = opendir(fsdir);
if (!d) {
log_debug("%s: opendir failed: %d", path, errno);
goto out;
}
while ((de = readdir(d))) {
if (de->d_name[0] == '.')
continue;
id = 0;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/lock_module/id",
fsdir, de->d_name);
file = fopen(path, "r");
if (!file) {
log_error("can't open %s %d", path, errno);
continue;
}
error = fscanf(file, "%u", &id);
fclose(file);
if (error != 1) {
log_error("bad read %s %d", path, errno);
continue;
}
if (id != mg_id) {
log_debug("get_mountgroup_name skip %x %s",
id, de->d_name);
continue;
}
/* take the fsname out of clustername:fsname */
fsname = strstr(de->d_name, ":");
if (!fsname) {
log_debug("get_mountgroup_name skip2 %x %s",
id, de->d_name);
continue;
}
fsname++;
log_debug("get_mountgroup_name found %x %s %s",
id, de->d_name, fsname);
strncpy(mg_name, fsname, sizeof(mg_name));
rv = 0;
break;
}
closedir(d);
out:
if (rv && retry_gfs2) {
retry_gfs2 = 0;
fsdir = "/sys/fs/gfs2";
goto retry;
}
return rv;
}
+/* This is for the case where dlm_controld exits/fails, abandoning dlm
+ lockspaces in the kernel, and then dlm_controld is restarted. When
+ dlm_controld exits and abandons lockspaces, that node needs to be
+ rebooted to clear the uncontrolled lockspaces from the kernel. */
+
+int check_uncontrolled_lockspaces(void)
+{
+ DIR *d;
+ struct dirent *de;
+ int count = 0;
+
+ d = opendir(DLM_SYSFS_DIR);
+ if (!d)
+ return 0;
+
+ while ((de = readdir(d))) {
+ if (de->d_name[0] == '.')
+ continue;
+
+ log_error("found uncontrolled lockspace %s", de->d_name);
+ count++;
+ }
+ closedir(d);
+
+ if (count) {
+ kick_node_from_cluster(our_nodeid);
+ return -1;
+ }
+ return 0;
+}
+
/* find the mountgroup with "mg_id" in sysfs, get it's name, then look for
the ls with with the same name in lockspaces list, return its id */
void set_associated_id(uint32_t mg_id)
{
struct lockspace *ls;
int rv;
log_debug("set_associated_id mg_id %x %d", mg_id, mg_id);
memset(&mg_name, 0, sizeof(mg_name));
rv = get_mountgroup_name(mg_id);
if (rv) {
log_error("no mountgroup found with id %x", mg_id);
return;
}
ls = find_ls(mg_name);
if (!ls) {
log_error("no lockspace found with name %s for mg_id %x",
mg_name, mg_id);
return;
}
log_debug("set_associated_id mg %x is ls %x", mg_id, ls->global_id);
ls->associated_mg_id = mg_id;
}
static int do_sysfs(char *name, char *file, char *val)
{
char fname[512];
int rv, fd;
sprintf(fname, "%s/%s/%s", DLM_SYSFS_DIR, name, file);
fd = open(fname, O_WRONLY);
if (fd < 0) {
log_error("open \"%s\" error %d %d", fname, fd, errno);
return -1;
}
log_debug("write \"%s\" to \"%s\"", val, fname);
rv = do_write(fd, val, strlen(val) + 1);
close(fd);
return rv;
}
int set_sysfs_control(char *name, int val)
{
char buf[32];
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%d", val);
return do_sysfs(name, "control", buf);
}
int set_sysfs_event_done(char *name, int val)
{
char buf[32];
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%d", val);
return do_sysfs(name, "event_done", buf);
}
int set_sysfs_id(char *name, uint32_t id)
{
char buf[32];
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%u", id);
return do_sysfs(name, "id", buf);
}
static int update_dir_members(char *name)
{
char path[PATH_MAX];
DIR *d;
struct dirent *de;
int i = 0;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/nodes", SPACES_DIR, name);
d = opendir(path);
if (!d) {
log_debug("%s: opendir failed: %d", path, errno);
return -1;
}
memset(dir_members, 0, sizeof(dir_members));
dir_members_count = 0;
/* FIXME: we should probably read the nodeid in each dir instead */
while ((de = readdir(d))) {
if (de->d_name[0] == '.')
continue;
dir_members[i++] = atoi(de->d_name);
log_debug("dir_member %d", dir_members[i-1]);
}
closedir(d);
dir_members_count = i;
return 0;
}
static int id_exists(int id, int count, int *array)
{
int i;
for (i = 0; i < count; i++) {
if (array[i] == id)
return 1;
}
return 0;
}
static int create_path(char *path)
{
mode_t old_umask;
int rv;
old_umask = umask(0022);
rv = mkdir(path, 0777);
umask(old_umask);
if (rv < 0) {
log_error("%s: mkdir failed: %d", path, errno);
if (errno == EEXIST)
rv = 0;
}
return rv;
}
static int path_exists(const char *path)
{
struct stat buf;
if (stat(path, &buf) < 0) {
if (errno != ENOENT)
log_error("%s: stat failed: %d", path, errno);
return 0;
}
return 1;
}
/* The "renew" nodes are those that have left and rejoined since the last
call to set_members(). We rmdir/mkdir for these nodes so dlm-kernel
can notice they've left and rejoined. */
int set_configfs_members(char *name, int new_count, int *new_members,
int renew_count, int *renew_members)
{
char path[PATH_MAX];
char buf[32];
int i, w, fd, rv, id, old_count, *old_members;
int do_renew;
/*
* create lockspace dir if it doesn't exist yet
*/
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s", SPACES_DIR, name);
if (!path_exists(path)) {
if (create_path(path))
return -1;
}
/*
* remove/add lockspace members
*/
rv = update_dir_members(name);
if (rv)
return rv;
old_members = dir_members;
old_count = dir_members_count;
for (i = 0; i < old_count; i++) {
id = old_members[i];
if (id_exists(id, new_count, new_members))
continue;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/nodes/%d",
SPACES_DIR, name, id);
log_debug("set_members rmdir \"%s\"", path);
rv = rmdir(path);
if (rv) {
log_error("%s: rmdir failed: %d", path, errno);
goto out;
}
}
/*
* remove lockspace dir after we've removed all the nodes
* (when we're shutting down and adding no new nodes)
*/
if (!new_count) {
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s", SPACES_DIR, name);
log_debug("set_members lockspace rmdir \"%s\"", path);
rv = rmdir(path);
if (rv)
log_error("%s: rmdir failed: %d", path, errno);
}
for (i = 0; i < new_count; i++) {
id = new_members[i];
do_renew = 0;
if (id_exists(id, renew_count, renew_members))
do_renew = 1;
else if (id_exists(id, old_count, old_members))
continue;
if (!is_cman_member(id))
cman_statechange();
/*
* create node's dir
*/
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/nodes/%d",
SPACES_DIR, name, id);
if (do_renew) {
log_debug("set_members renew rmdir \"%s\"", path);
rv = rmdir(path);
if (rv) {
log_error("%s: renew rmdir failed: %d",
path, errno);
goto out;
}
}
log_debug("set_members mkdir \"%s\"", path);
rv = create_path(path);
if (rv)
goto out;
/*
* set node's nodeid
*/
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/nodes/%d/nodeid",
SPACES_DIR, name, id);
rv = fd = open(path, O_WRONLY);
if (rv < 0) {
log_error("%s: open failed: %d", path, errno);
goto out;
}
memset(buf, 0, 32);
snprintf(buf, 32, "%d", id);
rv = do_write(fd, buf, strlen(buf));
if (rv < 0) {
log_error("%s: write failed: %d, %s", path, errno, buf);
close(fd);
goto out;
}
close(fd);
/*
* set node's weight
*/
w = get_weight(id, name);
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/nodes/%d/weight",
SPACES_DIR, name, id);
rv = fd = open(path, O_WRONLY);
if (rv < 0) {
log_error("%s: open failed: %d", path, errno);
goto out;
}
memset(buf, 0, 32);
snprintf(buf, 32, "%d", w);
rv = do_write(fd, buf, strlen(buf));
if (rv < 0) {
log_error("%s: write failed: %d, %s", path, errno, buf);
close(fd);
goto out;
}
close(fd);
}
rv = 0;
out:
return rv;
}
#if 0
char *str_ip(char *addr)
{
static char ip[256];
struct sockaddr_in *sin = (struct sockaddr_in *) addr;
memset(ip, 0, sizeof(ip));
inet_ntop(AF_INET, &sin->sin_addr, ip, 256);
return ip;
}
#endif
static char *str_ip(char *addr)
{
static char str_ip_buf[INET6_ADDRSTRLEN];
struct sockaddr_storage *ss = (struct sockaddr_storage *)addr;
struct sockaddr_in *sin = (struct sockaddr_in *)addr;
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr;
void *saddr;
if (ss->ss_family == AF_INET6)
saddr = &sin6->sin6_addr;
else
saddr = &sin->sin_addr;
inet_ntop(ss->ss_family, saddr, str_ip_buf, sizeof(str_ip_buf));
return str_ip_buf;
}
/* record the nodeids that are currently listed under
config/dlm/cluster/comms/ so that we can remove all of them */
static int update_comms_nodes(void)
{
char path[PATH_MAX];
DIR *d;
struct dirent *de;
int i = 0;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, COMMS_DIR);
d = opendir(path);
if (!d) {
log_debug("%s: opendir failed: %d", path, errno);
return -1;
}
memset(comms_nodes, 0, sizeof(comms_nodes));
comms_nodes_count = 0;
while ((de = readdir(d))) {
if (de->d_name[0] == '.')
continue;
comms_nodes[i++] = atoi(de->d_name);
}
closedir(d);
comms_nodes_count = i;
return 0;
}
/* clear out everything under config/dlm/cluster/comms/ */
static void clear_configfs_comms(void)
{
char path[PATH_MAX];
int i, rv;
rv = update_comms_nodes();
if (rv < 0)
return;
for (i = 0; i < comms_nodes_count; i++) {
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d", COMMS_DIR, comms_nodes[i]);
log_debug("clear_configfs_nodes rmdir \"%s\"", path);
rv = rmdir(path);
if (rv)
log_error("%s: rmdir failed: %d", path, errno);
}
}
static void clear_configfs_space_nodes(char *name)
{
char path[PATH_MAX];
int i, rv;
rv = update_dir_members(name);
if (rv < 0)
return;
for (i = 0; i < dir_members_count; i++) {
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s/nodes/%d",
SPACES_DIR, name, dir_members[i]);
log_debug("clear_configfs_space_nodes rmdir \"%s\"", path);
rv = rmdir(path);
if (rv)
log_error("%s: rmdir failed: %d", path, errno);
}
}
/* clear out everything under config/dlm/cluster/spaces/ */
static void clear_configfs_spaces(void)
{
char path[PATH_MAX];
DIR *d;
struct dirent *de;
int rv;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s", SPACES_DIR);
d = opendir(path);
if (!d) {
log_debug("%s: opendir failed: %d", path, errno);
return;
}
while ((de = readdir(d))) {
if (de->d_name[0] == '.')
continue;
clear_configfs_space_nodes(de->d_name);
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%s", SPACES_DIR, de->d_name);
log_debug("clear_configfs_spaces rmdir \"%s\"", path);
rv = rmdir(path);
if (rv)
log_error("%s: rmdir failed: %d", path, errno);
}
closedir(d);
}
static int add_configfs_base(void)
{
int rv = 0;
if (!path_exists("/sys/kernel/config")) {
log_error("No /sys/kernel/config, is configfs loaded?");
return -1;
}
if (!path_exists("/sys/kernel/config/dlm")) {
log_error("No /sys/kernel/config/dlm, is the dlm loaded?");
return -1;
}
if (!path_exists("/sys/kernel/config/dlm/cluster"))
rv = create_path("/sys/kernel/config/dlm/cluster");
return rv;
}
int add_configfs_node(int nodeid, char *addr, int addrlen, int local)
{
char path[PATH_MAX];
char padded_addr[sizeof(struct sockaddr_storage)];
char buf[32];
int rv, fd;
log_debug("set_configfs_node %d %s local %d",
nodeid, str_ip(addr), local);
/*
* create comm dir for this node
*/
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d", COMMS_DIR, nodeid);
rv = create_path(path);
if (rv)
return -1;
/*
* set the nodeid
*/
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d/nodeid", COMMS_DIR, nodeid);
fd = open(path, O_WRONLY);
if (fd < 0) {
log_error("%s: open failed: %d", path, errno);
return -1;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%d", nodeid);
rv = do_write(fd, buf, strlen(buf));
if (rv < 0) {
log_error("%s: write failed: %d, %s", path, errno, buf);
close(fd);
return -1;
}
close(fd);
/*
* set the address
*/
memset(padded_addr, 0, sizeof(padded_addr));
memcpy(padded_addr, addr, addrlen);
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d/addr", COMMS_DIR, nodeid);
fd = open(path, O_WRONLY);
if (fd < 0) {
log_error("%s: open failed: %d", path, errno);
return -1;
}
rv = do_write(fd, padded_addr, sizeof(struct sockaddr_storage));
if (rv < 0) {
log_error("%s: write failed: %d %d", path, errno, rv);
close(fd);
return -1;
}
close(fd);
/*
* set local
*/
if (!local)
goto out;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d/local", COMMS_DIR, nodeid);
fd = open(path, O_WRONLY);
if (fd < 0) {
log_error("%s: open failed: %d", path, errno);
return -1;
}
rv = do_write(fd, "1", strlen("1"));
if (rv < 0) {
log_error("%s: write failed: %d", path, errno);
close(fd);
return -1;
}
close(fd);
out:
return 0;
}
void del_configfs_node(int nodeid)
{
char path[PATH_MAX];
int rv;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d", COMMS_DIR, nodeid);
log_debug("del_configfs_node rmdir \"%s\"", path);
rv = rmdir(path);
if (rv)
log_error("%s: rmdir failed: %d", path, errno);
}
static int set_configfs_protocol(int proto)
{
char path[PATH_MAX];
char buf[32];
int fd, rv;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/protocol", CLUSTER_DIR);
fd = open(path, O_WRONLY);
if (fd < 0) {
log_error("%s: open failed: %d", path, errno);
return fd;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%d", proto);
rv = do_write(fd, buf, strlen(buf));
if (rv < 0) {
log_error("%s: write failed: %d", path, errno);
return rv;
}
close(fd);
log_debug("set protocol %d", proto);
return 0;
}
static int set_configfs_timewarn(int cs)
{
char path[PATH_MAX];
char buf[32];
int fd, rv;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/timewarn_cs", CLUSTER_DIR);
fd = open(path, O_WRONLY);
if (fd < 0) {
log_error("%s: open failed: %d", path, errno);
return fd;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%d", cs);
rv = do_write(fd, buf, strlen(buf));
if (rv < 0) {
log_error("%s: write failed: %d", path, errno);
return rv;
}
close(fd);
log_debug("set timewarn_cs %d", cs);
return 0;
}
static int set_configfs_debug(int val)
{
char path[PATH_MAX];
char buf[32];
int fd, rv;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/log_debug", CLUSTER_DIR);
fd = open(path, O_WRONLY);
if (fd < 0) {
log_error("%s: open failed: %d", path, errno);
return fd;
}
memset(buf, 0, sizeof(buf));
snprintf(buf, 32, "%d", val);
rv = do_write(fd, buf, strlen(buf));
if (rv < 0) {
log_error("%s: write failed: %d", path, errno);
return rv;
}
close(fd);
log_debug("set log_debug %d", val);
return 0;
}
void clear_configfs(void)
{
clear_configfs_comms();
clear_configfs_spaces();
rmdir("/sys/kernel/config/dlm/cluster");
}
int setup_configfs(void)
{
int rv;
clear_configfs();
rv = add_configfs_base();
if (rv < 0)
return rv;
/* add configfs entries for existing nodes */
cman_statechange();
/* the kernel has its own defaults for these values which we
don't want to change unless these have been set; -1 means
they have not been set on command line or config file */
if (cfgk_debug != -1)
set_configfs_debug(cfgk_debug);
if (cfgk_timewarn != -1)
set_configfs_timewarn(cfgk_timewarn);
if (cfgk_protocol != -1)
set_configfs_protocol(cfgk_protocol);
return 0;
}
diff --git a/group/dlm_controld/cpg.c b/group/dlm_controld/cpg.c
index 6aa340220..f80bf91bb 100644
--- a/group/dlm_controld/cpg.c
+++ b/group/dlm_controld/cpg.c
@@ -1,1727 +1,1730 @@
#include "dlm_daemon.h"
#include "config.h"
#include "libfenced.h"
uint32_t cpgname_to_crc(const char *data, int len);
int message_flow_control_on;
static cpg_handle_t daemon_handle;
static unsigned int protocol_active[3] = {1, 0, 0};
struct member {
struct list_head list;
int nodeid;
int start; /* 1 if we received a start message for this change */
int added; /* 1 if added by this change */
int failed; /* 1 if failed in this change */
int disallowed;
uint32_t start_flags;
};
struct node {
struct list_head list;
int nodeid;
int check_fencing;
int check_quorum;
int check_fs;
int fs_notified;
uint64_t add_time;
uint32_t added_seq; /* for queries */
uint32_t removed_seq; /* for queries */
int failed_reason; /* for queries */
};
/* One of these change structs is created for every confchg a cpg gets. */
#define CGST_WAIT_CONDITIONS 1
#define CGST_WAIT_MESSAGES 2
struct change {
struct list_head list;
struct list_head members;
struct list_head removed; /* nodes removed by this change */
int member_count;
int joined_count;
int remove_count;
int failed_count;
int state;
int we_joined;
uint32_t seq; /* used as a reference for debugging, and for queries */
uint32_t combined_seq; /* for queries */
};
struct ls_info {
uint32_t ls_info_size;
uint32_t id_info_size;
uint32_t id_info_count;
uint32_t started_count;
int member_count;
int joined_count;
int remove_count;
int failed_count;
};
struct id_info {
int nodeid;
};
static void ls_info_in(struct ls_info *li)
{
li->ls_info_size = le32_to_cpu(li->ls_info_size);
li->id_info_size = le32_to_cpu(li->id_info_size);
li->id_info_count = le32_to_cpu(li->id_info_count);
li->started_count = le32_to_cpu(li->started_count);
li->member_count = le32_to_cpu(li->member_count);
li->joined_count = le32_to_cpu(li->joined_count);
li->remove_count = le32_to_cpu(li->remove_count);
li->failed_count = le32_to_cpu(li->failed_count);
}
static void id_info_in(struct id_info *id)
{
id->nodeid = le32_to_cpu(id->nodeid);
}
static void ids_in(struct ls_info *li, struct id_info *ids)
{
struct id_info *id;
int i;
id = ids;
for (i = 0; i < li->id_info_count; i++) {
id_info_in(id);
id = (struct id_info *)((char *)id + li->id_info_size);
}
}
char *msg_name(int type)
{
switch (type) {
case DLM_MSG_START:
return "start";
case DLM_MSG_PLOCK:
return "plock";
case DLM_MSG_PLOCK_OWN:
return "plock_own";
case DLM_MSG_PLOCK_DROP:
return "plock_drop";
case DLM_MSG_PLOCK_SYNC_LOCK:
return "plock_sync_lock";
case DLM_MSG_PLOCK_SYNC_WAITER:
return "plock_sync_waiter";
case DLM_MSG_PLOCKS_STORED:
return "plocks_stored";
case DLM_MSG_DEADLK_CYCLE_START:
return "deadlk_cycle_start";
case DLM_MSG_DEADLK_CYCLE_END:
return "deadlk_cycle_end";
case DLM_MSG_DEADLK_CHECKPOINT_READY:
return "deadlk_checkpoint_ready";
case DLM_MSG_DEADLK_CANCEL_LOCK:
return "deadlk_cancel_lock";
default:
return "unknown";
}
}
static int _send_message(cpg_handle_t h, void *buf, int len, int type)
{
struct iovec iov;
cpg_error_t error;
int retries = 0;
iov.iov_base = buf;
iov.iov_len = len;
retry:
error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
if (error == CPG_ERR_TRY_AGAIN) {
retries++;
usleep(1000);
if (!(retries % 100))
log_error("cpg_mcast_joined retry %d %s",
retries, msg_name(type));
goto retry;
}
if (error != CPG_OK) {
log_error("cpg_mcast_joined error %d handle %llx %s",
error, (unsigned long long)h, msg_name(type));
return -1;
}
if (retries)
log_debug("cpg_mcast_joined retried %d %s",
retries, msg_name(type));
return 0;
}
/* header fields caller needs to set: type, to_nodeid, flags, msgdata */
void dlm_send_message(struct lockspace *ls, char *buf, int len)
{
struct dlm_header *hd = (struct dlm_header *) buf;
int type = hd->type;
hd->version[0] = cpu_to_le16(protocol_active[0]);
hd->version[1] = cpu_to_le16(protocol_active[1]);
hd->version[2] = cpu_to_le16(protocol_active[2]);
hd->type = cpu_to_le16(hd->type);
hd->nodeid = cpu_to_le32(our_nodeid);
hd->to_nodeid = cpu_to_le32(hd->to_nodeid);
hd->global_id = cpu_to_le32(ls->global_id);
hd->flags = cpu_to_le32(hd->flags);
hd->msgdata = cpu_to_le32(hd->msgdata);
_send_message(ls->cpg_handle, buf, len, type);
}
static struct member *find_memb(struct change *cg, int nodeid)
{
struct member *memb;
list_for_each_entry(memb, &cg->members, list) {
if (memb->nodeid == nodeid)
return memb;
}
return NULL;
}
static struct lockspace *find_ls_handle(cpg_handle_t h)
{
struct lockspace *ls;
list_for_each_entry(ls, &lockspaces, list) {
if (ls->cpg_handle == h)
return ls;
}
return NULL;
}
static struct lockspace *find_ls_ci(int ci)
{
struct lockspace *ls;
list_for_each_entry(ls, &lockspaces, list) {
if (ls->cpg_client == ci)
return ls;
}
return NULL;
}
static void free_cg(struct change *cg)
{
struct member *memb, *safe;
list_for_each_entry_safe(memb, safe, &cg->members, list) {
list_del(&memb->list);
free(memb);
}
list_for_each_entry_safe(memb, safe, &cg->removed, list) {
list_del(&memb->list);
free(memb);
}
free(cg);
}
static void free_ls(struct lockspace *ls)
{
struct change *cg, *cg_safe;
struct node *node, *node_safe;
list_for_each_entry_safe(cg, cg_safe, &ls->changes, list) {
list_del(&cg->list);
free_cg(cg);
}
if (ls->started_change)
free_cg(ls->started_change);
list_for_each_entry_safe(node, node_safe, &ls->node_history, list) {
list_del(&node->list);
free(node);
}
free(ls);
}
/* Problem scenario:
nodes A,B,C are in fence domain
node C has gfs foo mounted
node C fails
nodes A,B begin fencing C (slow, not completed)
node B mounts gfs foo
We may end up having gfs foo mounted and being used on B before
C has been fenced. C could wake up corrupt fs.
So, we need to prevent any new gfs mounts while there are any
outstanding, incomplete fencing operations.
We also need to check that the specific failed nodes we know about have
been fenced (since fenced may not even have been notified that the node
has failed yet).
So, check that:
1. has fenced fenced the node after it joined this lockspace?
2. fenced has no outstanding fencing ops
For 1:
- record the time of the first good start message we see from node X
- node X fails
- wait for X to be removed from all dlm cpg's (probably not necessary)
- check that the fencing time is later than the recorded time above
Tracking fencing state when there are spurious partitions/merges...
from a spurious leave/join of node X, a lockspace will see:
- node X is a lockspace member
- node X fails, may be waiting for all cpgs to see failure or for fencing to
complete
- node X joins the lockspace - we want to process the change as usual, but
don't want to disrupt the code waiting for the fencing, and we want to
continue running properly once the remerged node is properly reset
ls->node_history
when we see a node not in this list, add entry for it with zero add_time
record the time we get a good start message from the node, add_time
clear add_time if the node leaves
if node fails with non-zero add_time, set check_fencing
when a node is fenced, clear add_time and clear check_fencing
if a node remerges after this, no good start message, no new add_time set
if a node fails with zero add_time, it doesn't need fencing
if a node remerges before it's been fenced, no good start message, no new
add_time set
*/
static struct node *get_node_history(struct lockspace *ls, int nodeid)
{
struct node *node;
list_for_each_entry(node, &ls->node_history, list) {
if (node->nodeid == nodeid)
return node;
}
return NULL;
}
static void node_history_init(struct lockspace *ls, int nodeid,
struct change *cg)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (node)
goto out;
node = malloc(sizeof(struct node));
if (!node)
return;
memset(node, 0, sizeof(struct node));
node->nodeid = nodeid;
node->add_time = 0;
list_add_tail(&node->list, &ls->node_history);
out:
node->added_seq = cg->seq; /* for queries */
}
static void node_history_start(struct lockspace *ls, int nodeid)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (!node) {
log_error("node_history_start no nodeid %d", nodeid);
return;
}
node->add_time = time(NULL);
}
static void node_history_left(struct lockspace *ls, int nodeid,
struct change *cg)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (!node) {
log_error("node_history_left no nodeid %d", nodeid);
return;
}
node->add_time = 0;
node->removed_seq = cg->seq; /* for queries */
}
static void node_history_fail(struct lockspace *ls, int nodeid,
struct change *cg, int reason)
{
struct node *node;
node = get_node_history(ls, nodeid);
if (!node) {
log_error("node_history_fail no nodeid %d", nodeid);
return;
}
if (cfgd_enable_fencing && !node->add_time)
node->check_fencing = 1;
/* fenced will take care of making sure the quorum value
is adjusted for all the failures */
if (cfgd_enable_quorum && !cfgd_enable_fencing)
node->check_quorum = 1;
node->check_fs = 1;
node->removed_seq = cg->seq; /* for queries */
node->failed_reason = reason; /* for queries */
}
static int check_fencing_done(struct lockspace *ls)
{
struct node *node;
struct fenced_node nodeinfo;
struct fenced_domain domain;
int wait_count = 0;
int rv;
if (!cfgd_enable_fencing)
return 1;
list_for_each_entry(node, &ls->node_history, list) {
if (!node->check_fencing)
continue;
/* check with fenced to see if the node has been
fenced since node->add_time */
memset(&nodeinfo, 0, sizeof(nodeinfo));
rv = fenced_node_info(node->nodeid, &nodeinfo);
if (rv < 0)
log_error("fenced_node_info error %d", rv);
if (nodeinfo.last_fenced_time > node->add_time) {
node->check_fencing = 0;
node->add_time = 0;
} else {
log_group(ls, "check_fencing %d needs fencing",
node->nodeid);
wait_count++;
}
}
if (wait_count)
return 0;
/* now check if there are any outstanding fencing ops (for nodes
we may not have seen in any lockspace), and return 0 if there
are any */
rv = fenced_domain_info(&domain);
if (rv < 0) {
log_error("fenced_domain_info error %d", rv);
return 0;
}
if (domain.victim_count)
return 0;
return 1;
}
static int check_quorum_done(struct lockspace *ls)
{
struct node *node;
int wait_count = 0;
if (!cfgd_enable_quorum)
return 1;
/* wait for cman to see all the same nodes failed, so we know that
cman_quorate is adjusted for the same failures we've seen
(see comment in fenced about the assumption here) */
list_for_each_entry(node, &ls->node_history, list) {
if (!node->check_quorum)
continue;
if (!is_cman_member(node->nodeid)) {
node->check_quorum = 0;
} else {
log_group(ls, "check_quorum %d is_cman_member",
node->nodeid);
wait_count++;
}
}
if (wait_count)
return 0;
if (!cman_quorate) {
log_group(ls, "check_quorum not quorate");
return 0;
}
log_group(ls, "check_quorum done");
return 1;
}
/* wait for local fs_controld to ack each failed node */
static int check_fs_done(struct lockspace *ls)
{
struct node *node;
int wait_count = 0;
/* no corresponding fs for this lockspace */
if (!ls->fs_registered)
return 1;
list_for_each_entry(node, &ls->node_history, list) {
if (!node->check_fs)
continue;
if (node->fs_notified) {
node->check_fs = 0;
} else {
log_group(ls, "check_fs %d needs fs notify",
node->nodeid);
wait_count++;
}
}
if (wait_count)
return 0;
log_group(ls, "check_fs done");
return 1;
}
static int member_ids[MAX_NODES];
static int member_count;
static int renew_ids[MAX_NODES];
static int renew_count;
static void format_member_ids(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
memset(member_ids, 0, sizeof(member_ids));
member_count = 0;
list_for_each_entry(memb, &cg->members, list)
member_ids[member_count++] = memb->nodeid;
}
/* list of nodeids that have left and rejoined since last start_kernel;
is any member of startcg in the left list of any other cg's?
(if it is, then it presumably must be flagged added in another) */
static void format_renew_ids(struct lockspace *ls)
{
struct change *cg, *startcg;
struct member *memb, *leftmemb;
startcg = list_first_entry(&ls->changes, struct change, list);
memset(renew_ids, 0, sizeof(renew_ids));
renew_count = 0;
list_for_each_entry(memb, &startcg->members, list) {
list_for_each_entry(cg, &ls->changes, list) {
if (cg == startcg)
continue;
list_for_each_entry(leftmemb, &cg->removed, list) {
if (memb->nodeid == leftmemb->nodeid) {
renew_ids[renew_count++] = memb->nodeid;
}
}
}
}
}
static void start_kernel(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
if (!ls->kernel_stopped) {
log_error("start_kernel %u not stopped", cg->seq);
return;
}
log_group(ls, "start_kernel %u member_count %d",
cg->seq, cg->member_count);
/* needs to happen before setting control which starts recovery */
if (ls->joining)
set_sysfs_id(ls->name, ls->global_id);
format_member_ids(ls);
format_renew_ids(ls);
set_configfs_members(ls->name, member_count, member_ids,
renew_count, renew_ids);
set_sysfs_control(ls->name, 1);
ls->kernel_stopped = 0;
if (ls->joining) {
set_sysfs_event_done(ls->name, 0);
ls->joining = 0;
}
}
static void stop_kernel(struct lockspace *ls, uint32_t seq)
{
if (!ls->kernel_stopped) {
log_group(ls, "stop_kernel %u", seq);
set_sysfs_control(ls->name, 0);
ls->kernel_stopped = 1;
}
}
/* the first condition is that the local lockspace is stopped which we
don't need to check for because stop_kernel(), which is synchronous,
was done when the change was created */
static int wait_conditions_done(struct lockspace *ls)
{
/* the fencing/quorum/fs conditions need to account for all the changes
that have occured since the last change applied to dlm-kernel, not
just the latest change */
if (!check_fencing_done(ls)) {
poll_fencing = 1;
return 0;
}
poll_fencing = 0;
/* even though fencing also waits for quorum, checking fencing isn't
sufficient because we don't want to start new lockspaces in an
inquorate cluster */
if (!check_quorum_done(ls)) {
poll_quorum = 1;
return 0;
}
poll_quorum = 0;
if (!check_fs_done(ls)) {
poll_fs = 1;
return 0;
}
poll_fs = 0;
return 1;
}
static int wait_messages_done(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
int need = 0, total = 0;
list_for_each_entry(memb, &cg->members, list) {
if (!memb->start)
need++;
total++;
}
if (need) {
log_group(ls, "wait_messages_done need %d of %d", need, total);
return 0;
}
log_group(ls, "wait_messages_done got all %d", total);
return 1;
}
static void cleanup_changes(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct change *safe;
list_del(&cg->list);
if (ls->started_change)
free_cg(ls->started_change);
ls->started_change = cg;
ls->started_count++;
if (!ls->started_count)
ls->started_count++;
cg->combined_seq = cg->seq; /* for queries */
list_for_each_entry_safe(cg, safe, &ls->changes, list) {
ls->started_change->combined_seq = cg->seq; /* for queries */
list_del(&cg->list);
free_cg(cg);
}
}
/* There's a stream of confchg and messages. At one of these
messages, the low node needs to store plocks and new nodes
need to begin saving plock messages. A second message is
needed to say that the plocks are ready to be read.
When the last start message is recvd for a change, the low node
stores plocks and the new nodes begin saving messages. When the
store is done, low node sends plocks_stored message. When
new nodes recv this, they read the plocks and their saved messages.
plocks_stored message should identify a specific change, like start
messages do; if it doesn't match ls->started_change, then it's ignored.
If a confchg adding a new node arrives after plocks are stored but
before plocks_stored msg recvd, then the message is ignored. The low
node will send another plocks_stored message for the latest change
(although it may be able to reuse the ckpt if no plock state has changed).
*/
static void set_plock_ckpt_node(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
int low = 0;
list_for_each_entry(memb, &cg->members, list) {
if (!(memb->start_flags & DLM_MFLG_HAVEPLOCK))
continue;
if (!low || memb->nodeid < low)
low = memb->nodeid;
}
log_group(ls, "set_plock_ckpt_node from %d to %d",
ls->plock_ckpt_node, low);
if (ls->plock_ckpt_node == our_nodeid && low != our_nodeid) {
/* Close ckpt so it will go away when the new ckpt_node
unlinks it prior to creating a new one; if we fail
our open ckpts are automatically closed. At this point
the ckpt has not been unlinked, but won't be held open by
anyone. We use the max "retentionDuration" to stop the
system from cleaning up ckpts that are open by no one. */
close_plock_checkpoint(ls);
}
ls->plock_ckpt_node = low;
}
static struct id_info *get_id_struct(struct id_info *ids, int count, int size,
int nodeid)
{
struct id_info *id = ids;
int i;
for (i = 0; i < count; i++) {
if (id->nodeid == nodeid)
return id;
id = (struct id_info *)((char *)id + size);
}
return NULL;
}
/* do the change details in the message match the details of the given change */
static int match_change(struct lockspace *ls, struct change *cg,
struct dlm_header *hd, struct ls_info *li,
struct id_info *ids)
{
struct id_info *id;
struct member *memb;
uint32_t seq = hd->msgdata;
int i, members_mismatch;
/* We can ignore messages if we're not in the list of members.
The one known time this will happen is after we've joined
the cpg, we can get messages for changes prior to the change
in which we're added. */
id = get_id_struct(ids, li->id_info_count, li->id_info_size,our_nodeid);
if (!id) {
log_debug("match_change fail %d:%u we are not in members",
hd->nodeid, seq);
return 0;
}
memb = find_memb(cg, hd->nodeid);
if (!memb) {
log_group(ls, "match_change fail %d:%u sender not member",
hd->nodeid, seq);
return 0;
}
/* verify this is the right change by matching the counts
and the nodeids of the current members */
if (li->member_count != cg->member_count ||
li->joined_count != cg->joined_count ||
li->remove_count != cg->remove_count ||
li->failed_count != cg->failed_count) {
log_group(ls, "match_change fail %d:%u expect counts "
"%d %d %d %d", hd->nodeid, seq,
cg->member_count, cg->joined_count,
cg->remove_count, cg->failed_count);
return 0;
}
members_mismatch = 0;
id = ids;
for (i = 0; i < li->id_info_count; i++) {
memb = find_memb(cg, id->nodeid);
if (!memb) {
log_group(ls, "match_change fail %d:%u no memb %d",
hd->nodeid, seq, id->nodeid);
members_mismatch = 1;
break;
}
id = (struct id_info *)((char *)id + li->id_info_size);
}
if (members_mismatch)
return 0;
log_group(ls, "match_change done %d:%u", hd->nodeid, seq);
return 1;
}
/* Unfortunately, there's no really simple way to match a message with the
specific change that it was sent for. We hope that by passing all the
details of the change in the message, we will be able to uniquely match the
it to the correct change. */
/* A start message will usually be for the first (current) change on our list.
In some cases it will be for a non-current change, and we can ignore it:
1. A,B,C get confchg1 adding C
2. C sends start for confchg1
3. A,B,C get confchg2 adding D
4. A,B,C,D recv start from C for confchg1 - ignored
5. C,D send start for confchg2
6. A,B send start for confchg2
7. A,B,C,D recv all start messages for confchg2, and start kernel
In step 4, how do the nodes know whether the start message from C is
for confchg1 or confchg2? Hopefully by comparing the counts and members. */
static struct change *find_change(struct lockspace *ls, struct dlm_header *hd,
struct ls_info *li, struct id_info *ids)
{
struct change *cg;
list_for_each_entry_reverse(cg, &ls->changes, list) {
if (!match_change(ls, cg, hd, li, ids))
continue;
return cg;
}
log_group(ls, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
return NULL;
}
static int is_added(struct lockspace *ls, int nodeid)
{
struct change *cg;
struct member *memb;
list_for_each_entry(cg, &ls->changes, list) {
memb = find_memb(cg, nodeid);
if (memb && memb->added)
return 1;
}
return 0;
}
static void receive_start(struct lockspace *ls, struct dlm_header *hd, int len)
{
struct change *cg;
struct member *memb;
struct ls_info *li;
struct id_info *ids;
uint32_t seq = hd->msgdata;
int added;
log_group(ls, "receive_start %d:%u len %d", hd->nodeid, seq, len);
li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
ls_info_in(li);
ids_in(li, ids);
cg = find_change(ls, hd, li, ids);
if (!cg)
return;
memb = find_memb(cg, hd->nodeid);
if (!memb) {
/* this should never happen since match_change checks it */
log_error("receive_start no member %d", hd->nodeid);
return;
}
memb->start_flags = hd->flags;
added = is_added(ls, hd->nodeid);
if (added && li->started_count) {
log_error("receive_start %d:%u add node with started_count %u",
hd->nodeid, seq, li->started_count);
/* observe this scheme working before using it; I'm not sure
that a joining node won't ever see an existing node as added
under normal circumstances */
/*
memb->disallowed = 1;
return;
*/
}
node_history_start(ls, hd->nodeid);
memb->start = 1;
}
static void receive_plocks_stored(struct lockspace *ls, struct dlm_header *hd,
int len)
{
struct ls_info *li;
struct id_info *ids;
log_group(ls, "receive_plocks_stored %d:%u need_plocks %d",
hd->nodeid, hd->msgdata, ls->need_plocks);
if (!ls->need_plocks)
return;
/* a confchg arrived between the last start and the plocks_stored msg,
so we ignore this plocks_stored msg and wait to read the ckpt until
the next plocks_stored msg following the current start */
if (!list_empty(&ls->changes) || !ls->started_change) {
log_group(ls, "receive_plocks_stored %d:%u ignore",
hd->nodeid, hd->msgdata);
return;
}
li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
ls_info_in(li);
ids_in(li, ids);
if (!match_change(ls, ls->started_change, hd, li, ids)) {
log_group(ls, "receive_plocks_stored %d:%u ignore no match",
hd->nodeid, hd->msgdata);
return;
}
retrieve_plocks(ls);
process_saved_plocks(ls);
ls->need_plocks = 0;
ls->save_plocks = 0;
}
static void send_info(struct lockspace *ls, int type)
{
struct change *cg;
struct dlm_header *hd;
struct ls_info *li;
struct id_info *id;
struct member *memb;
char *buf;
int len, id_count;
cg = list_first_entry(&ls->changes, struct change, list);
id_count = cg->member_count;
len = sizeof(struct dlm_header) + sizeof(struct ls_info) +
id_count * sizeof(struct id_info);
buf = malloc(len);
if (!buf) {
log_error("send_info len %d no mem", len);
return;
}
memset(buf, 0, len);
hd = (struct dlm_header *)buf;
li = (struct ls_info *)(buf + sizeof(*hd));
id = (struct id_info *)(buf + sizeof(*hd) + sizeof(*li));
/* fill in header (dlm_send_message handles part of header) */
hd->type = type;
hd->msgdata = cg->seq;
if (ls->joining)
hd->flags |= DLM_MFLG_JOINING;
if (!ls->need_plocks)
hd->flags |= DLM_MFLG_HAVEPLOCK;
/* fill in ls_info */
li->ls_info_size = cpu_to_le32(sizeof(struct ls_info));
li->id_info_size = cpu_to_le32(sizeof(struct id_info));
li->id_info_count = cpu_to_le32(id_count);
li->started_count = cpu_to_le32(ls->started_count);
li->member_count = cpu_to_le32(cg->member_count);
li->joined_count = cpu_to_le32(cg->joined_count);
li->remove_count = cpu_to_le32(cg->remove_count);
li->failed_count = cpu_to_le32(cg->failed_count);
/* fill in id_info entries */
list_for_each_entry(memb, &cg->members, list) {
id->nodeid = cpu_to_le32(memb->nodeid);
id++;
}
log_group(ls, "send_%s %u flags %x counts %u %d %d %d %d",
type == DLM_MSG_START ? "start" : "plocks_stored",
cg->seq, hd->flags, ls->started_count, cg->member_count,
cg->joined_count, cg->remove_count, cg->failed_count);
dlm_send_message(ls, buf, len);
free(buf);
}
static void send_start(struct lockspace *ls)
{
send_info(ls, DLM_MSG_START);
}
static void send_plocks_stored(struct lockspace *ls)
{
send_info(ls, DLM_MSG_PLOCKS_STORED);
}
static int nodes_added(struct lockspace *ls)
{
struct change *cg;
list_for_each_entry(cg, &ls->changes, list) {
if (cg->joined_count)
return 1;
}
return 0;
}
static void prepare_plocks(struct lockspace *ls)
{
struct change *cg = list_first_entry(&ls->changes, struct change, list);
struct member *memb;
if (!cfgd_enable_plock)
return;
/* if we're the only node in the lockspace, then we are the ckpt_node
and we don't need plocks */
if (cg->member_count == 1) {
list_for_each_entry(memb, &cg->members, list) {
if (memb->nodeid != our_nodeid) {
log_error("prepare_plocks other member %d",
memb->nodeid);
}
}
ls->plock_ckpt_node = our_nodeid;
ls->need_plocks = 0;
return;
}
/* the low node that indicated it had plock state in its last
start message is the ckpt_node */
set_plock_ckpt_node(ls);
/* We save all plock messages from the time that the low node saves
existing plock state in the ckpt to the time that we read that state
from the ckpt. */
if (ls->need_plocks) {
ls->save_plocks = 1;
return;
}
if (ls->plock_ckpt_node != our_nodeid)
return;
/* At each start, a ckpt is written if there have been nodes added
since the last start/ckpt. If no nodes have been added, no one
does anything with ckpts. If the node that wrote the last ckpt
is no longer the ckpt_node, the new ckpt_node will unlink and
write a new one. If the node that wrote the last ckpt is still
the ckpt_node and no plock state has changed since the last ckpt,
it will just leave the old ckpt and not write a new one.
A new ckpt_node will send a stored message even if it doesn't
write a ckpt because new nodes in the previous start may be
waiting to read the ckpt from the previous ckpt_node after ignoring
the previous stored message. They will read the ckpt from the
previous ckpt_node upon receiving the stored message from us. */
if (nodes_added(ls))
store_plocks(ls);
send_plocks_stored(ls);
}
static void apply_changes(struct lockspace *ls)
{
struct change *cg;
if (list_empty(&ls->changes))
return;
cg = list_first_entry(&ls->changes, struct change, list);
switch (cg->state) {
case CGST_WAIT_CONDITIONS:
if (wait_conditions_done(ls)) {
send_start(ls);
cg->state = CGST_WAIT_MESSAGES;
}
break;
case CGST_WAIT_MESSAGES:
if (wait_messages_done(ls)) {
start_kernel(ls);
prepare_plocks(ls);
cleanup_changes(ls);
}
break;
default:
log_error("apply_changes invalid state %d", cg->state);
}
}
void process_lockspace_changes(void)
{
struct lockspace *ls, *safe;
list_for_each_entry_safe(ls, safe, &lockspaces, list) {
if (!list_empty(&ls->changes))
apply_changes(ls);
}
}
static int add_change(struct lockspace *ls,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries,
struct change **cg_out)
{
struct change *cg;
struct member *memb;
int i, error;
cg = malloc(sizeof(struct change));
if (!cg)
goto fail_nomem;
memset(cg, 0, sizeof(struct change));
INIT_LIST_HEAD(&cg->members);
INIT_LIST_HEAD(&cg->removed);
cg->state = CGST_WAIT_CONDITIONS;
cg->seq = ++ls->change_seq;
if (!cg->seq)
cg->seq = ++ls->change_seq;
cg->member_count = member_list_entries;
cg->joined_count = joined_list_entries;
cg->remove_count = left_list_entries;
for (i = 0; i < member_list_entries; i++) {
memb = malloc(sizeof(struct member));
if (!memb)
goto fail_nomem;
memset(memb, 0, sizeof(struct member));
memb->nodeid = member_list[i].nodeid;
list_add_tail(&memb->list, &cg->members);
}
for (i = 0; i < left_list_entries; i++) {
memb = malloc(sizeof(struct member));
if (!memb)
goto fail_nomem;
memset(memb, 0, sizeof(struct member));
memb->nodeid = left_list[i].nodeid;
if (left_list[i].reason == CPG_REASON_NODEDOWN ||
left_list[i].reason == CPG_REASON_PROCDOWN) {
memb->failed = 1;
cg->failed_count++;
}
list_add_tail(&memb->list, &cg->removed);
if (memb->failed)
node_history_fail(ls, memb->nodeid, cg,
left_list[i].reason);
else
node_history_left(ls, memb->nodeid, cg);
log_group(ls, "add_change %u nodeid %d remove reason %d",
cg->seq, memb->nodeid, left_list[i].reason);
+
+ if (left_list[i].reason == CPG_REASON_PROCDOWN)
+ kick_node_from_cluster(memb->nodeid);
}
for (i = 0; i < joined_list_entries; i++) {
memb = find_memb(cg, joined_list[i].nodeid);
if (!memb) {
log_error("no member %d", joined_list[i].nodeid);
error = -ENOENT;
goto fail;
}
memb->added = 1;
if (memb->nodeid == our_nodeid)
cg->we_joined = 1;
else
node_history_init(ls, memb->nodeid, cg);
log_group(ls, "add_change %u nodeid %d joined", cg->seq,
memb->nodeid);
}
if (cg->we_joined) {
log_group(ls, "add_change %u we joined", cg->seq);
list_for_each_entry(memb, &cg->members, list)
node_history_init(ls, memb->nodeid, cg);
}
log_group(ls, "add_change %u member %d joined %d remove %d failed %d",
cg->seq, cg->member_count, cg->joined_count, cg->remove_count,
cg->failed_count);
list_add(&cg->list, &ls->changes);
*cg_out = cg;
return 0;
fail_nomem:
log_error("no memory");
error = -ENOMEM;
fail:
free_cg(cg);
return error;
}
static int we_left(struct cpg_address *left_list, int left_list_entries)
{
int i;
for (i = 0; i < left_list_entries; i++) {
if (left_list[i].nodeid == our_nodeid)
return 1;
}
return 0;
}
static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
struct lockspace *ls;
struct change *cg;
struct member *memb;
int rv;
ls = find_ls_handle(handle);
if (!ls) {
log_error("confchg_cb no lockspace for cpg %s",
group_name->value);
return;
}
if (ls->leaving && we_left(left_list, left_list_entries)) {
/* we called cpg_leave(), and this should be the final
cpg callback we receive */
log_group(ls, "confchg for our leave");
stop_kernel(ls, 0);
set_configfs_members(ls->name, 0, NULL, 0, NULL);
set_sysfs_event_done(ls->name, 0);
cpg_finalize(ls->cpg_handle);
client_dead(ls->cpg_client);
purge_plocks(ls, our_nodeid, 1);
list_del(&ls->list);
free_ls(ls);
return;
}
rv = add_change(ls, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries, &cg);
if (rv)
return;
stop_kernel(ls, cg->seq);
list_for_each_entry(memb, &cg->removed, list)
purge_plocks(ls, memb->nodeid, 0);
#if 0
/* deadlock code needs to adjust per a confchg, is this the right
way/place for this? */
deadlk_confchg(ls, member_list, member_list_entries,
left_list, left_list_entries,
joined_list, joined_list_entries);
#endif
}
static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *data, int len)
{
struct lockspace *ls;
struct dlm_header *hd;
ls = find_ls_handle(handle);
if (!ls) {
log_error("deliver_cb no ls for cpg %s", group_name->value);
return;
}
hd = (struct dlm_header *)data;
hd->version[0] = le16_to_cpu(hd->version[0]);
hd->version[1] = le16_to_cpu(hd->version[1]);
hd->version[2] = le16_to_cpu(hd->version[2]);
hd->type = le16_to_cpu(hd->type);
hd->nodeid = le32_to_cpu(hd->nodeid);
hd->to_nodeid = le32_to_cpu(hd->to_nodeid);
hd->global_id = le32_to_cpu(hd->global_id);
hd->flags = le32_to_cpu(hd->flags);
hd->msgdata = le32_to_cpu(hd->msgdata);
if (hd->version[0] != protocol_active[0]) {
log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
nodeid, hd->version[0], hd->version[1],
hd->version[2], protocol_active[0],
protocol_active[1], protocol_active[2]);
return;
}
if (hd->nodeid != nodeid) {
log_error("bad msg nodeid %d %d", hd->nodeid, nodeid);
return;
}
switch (hd->type) {
case DLM_MSG_START:
receive_start(ls, hd, len);
break;
case DLM_MSG_PLOCK:
receive_plock(ls, hd, len);
break;
case DLM_MSG_PLOCK_OWN:
receive_own(ls, hd, len);
break;
case DLM_MSG_PLOCK_DROP:
receive_drop(ls, hd, len);
break;
case DLM_MSG_PLOCK_SYNC_LOCK:
case DLM_MSG_PLOCK_SYNC_WAITER:
receive_sync(ls, hd, len);
break;
case DLM_MSG_PLOCKS_STORED:
receive_plocks_stored(ls, hd, len);
break;
case DLM_MSG_DEADLK_CYCLE_START:
receive_cycle_start(ls, hd, len);
break;
case DLM_MSG_DEADLK_CYCLE_END:
receive_cycle_end(ls, hd, len);
break;
case DLM_MSG_DEADLK_CHECKPOINT_READY:
receive_checkpoint_ready(ls, hd, len);
break;
case DLM_MSG_DEADLK_CANCEL_LOCK:
receive_cancel_lock(ls, hd, len);
break;
default:
log_error("unknown msg type %d", hd->type);
}
}
static cpg_callbacks_t cpg_callbacks = {
.cpg_deliver_fn = deliver_cb,
.cpg_confchg_fn = confchg_cb,
};
void update_flow_control_status(void)
{
cpg_flow_control_state_t flow_control_state;
cpg_error_t error;
error = cpg_flow_control_state_get(daemon_handle, &flow_control_state);
if (error != CPG_OK) {
log_error("cpg_flow_control_state_get %d", error);
return;
}
if (flow_control_state == CPG_FLOW_CONTROL_ENABLED) {
if (message_flow_control_on == 0) {
log_debug("flow control on");
}
message_flow_control_on = 1;
} else {
if (message_flow_control_on) {
log_debug("flow control off");
}
message_flow_control_on = 0;
}
}
static void process_lockspace_cpg(int ci)
{
struct lockspace *ls;
cpg_error_t error;
ls = find_ls_ci(ci);
if (!ls) {
log_error("process_lockspace_cpg no lockspace for ci %d", ci);
return;
}
error = cpg_dispatch(ls->cpg_handle, CPG_DISPATCH_ALL);
if (error != CPG_OK) {
log_error("cpg_dispatch error %d", error);
return;
}
apply_changes(ls);
update_flow_control_status();
}
/* received an "online" uevent from dlm-kernel */
int dlm_join_lockspace(struct lockspace *ls)
{
cpg_error_t error;
cpg_handle_t h;
struct cpg_name name;
int i = 0, fd, ci;
error = cpg_initialize(&h, &cpg_callbacks);
if (error != CPG_OK) {
log_error("cpg_initialize error %d", error);
goto fail_free;
}
cpg_fd_get(h, &fd);
ci = client_add(fd, process_lockspace_cpg, NULL);
list_add(&ls->list, &lockspaces);
ls->cpg_handle = h;
ls->cpg_client = ci;
ls->cpg_fd = fd;
ls->kernel_stopped = 1;
ls->need_plocks = 1;
ls->joining = 1;
memset(&name, 0, sizeof(name));
sprintf(name.value, "dlm:%s", ls->name);
name.length = strlen(name.value) + 1;
/* TODO: allow global_id to be set in cluster.conf? */
ls->global_id = cpgname_to_crc(name.value, name.length);
retry:
error = cpg_join(h, &name);
if (error == CPG_ERR_TRY_AGAIN) {
sleep(1);
if (!(++i % 10))
log_error("cpg_join error retrying");
goto retry;
}
if (error != CPG_OK) {
log_error("cpg_join error %d", error);
cpg_finalize(h);
goto fail;
}
return 0;
fail:
list_del(&ls->list);
client_dead(ci);
cpg_finalize(h);
fail_free:
free_ls(ls);
return error;
}
/* received an "offline" uevent from dlm-kernel */
int dlm_leave_lockspace(struct lockspace *ls)
{
cpg_error_t error;
struct cpg_name name;
int i = 0;
ls->leaving = 1;
memset(&name, 0, sizeof(name));
sprintf(name.value, "dlm:%s", ls->name);
name.length = strlen(name.value) + 1;
retry:
error = cpg_leave(ls->cpg_handle, &name);
if (error == CPG_ERR_TRY_AGAIN) {
sleep(1);
if (!(++i % 10))
log_error("cpg_leave error retrying");
goto retry;
}
if (error != CPG_OK)
log_error("cpg_leave error %d", error);
return 0;
}
int setup_cpg(void)
{
cpg_error_t error;
error = cpg_initialize(&daemon_handle, &cpg_callbacks);
if (error != CPG_OK) {
log_error("setup_cpg cpg_initialize error %d", error);
return -1;
}
/* join "dlm_controld" cpg to interact with other daemons in
the cluster before we start processing uevents? Could this
also help in handling transient partitions? */
return 0;
}
/* fs_controld has seen nodedown for nodeid; it's now ok for dlm to do
recovery for the failed node */
int set_fs_notified(struct lockspace *ls, int nodeid)
{
struct node *node;
/* this shouldn't happen */
node = get_node_history(ls, nodeid);
if (!node)
return -ESRCH;
/* this can happen, we haven't seen a nodedown for this node yet,
but we should soon */
if (!node->check_fs)
return -EAGAIN;
node->fs_notified = 1;
return 0;
}
int set_lockspace_info(struct lockspace *ls, struct dlmc_lockspace *lockspace)
{
struct change *cg, *last = NULL;
strncpy(lockspace->name, ls->name, DLM_LOCKSPACE_LEN);
lockspace->global_id = ls->global_id;
if (ls->joining)
lockspace->flags |= DLMC_LF_JOINING;
if (ls->leaving)
lockspace->flags |= DLMC_LF_LEAVING;
if (ls->kernel_stopped)
lockspace->flags |= DLMC_LF_KERNEL_STOPPED;
if (ls->fs_registered)
lockspace->flags |= DLMC_LF_FS_REGISTERED;
if (ls->need_plocks)
lockspace->flags |= DLMC_LF_NEED_PLOCKS;
if (ls->save_plocks)
lockspace->flags |= DLMC_LF_SAVE_PLOCKS;
if (!ls->started_change)
goto next;
cg = ls->started_change;
lockspace->cg_prev.member_count = cg->member_count;
lockspace->cg_prev.joined_count = cg->joined_count;
lockspace->cg_prev.remove_count = cg->remove_count;
lockspace->cg_prev.failed_count = cg->failed_count;
lockspace->cg_prev.combined_seq = cg->combined_seq;
lockspace->cg_prev.seq = cg->seq;
next:
if (list_empty(&ls->changes))
goto out;
list_for_each_entry(cg, &ls->changes, list)
last = cg;
cg = list_first_entry(&ls->changes, struct change, list);
lockspace->cg_next.member_count = cg->member_count;
lockspace->cg_next.joined_count = cg->joined_count;
lockspace->cg_next.remove_count = cg->remove_count;
lockspace->cg_next.failed_count = cg->failed_count;
lockspace->cg_next.combined_seq = last->seq;
lockspace->cg_next.seq = cg->seq;
if (cg->state == CGST_WAIT_CONDITIONS)
lockspace->cg_next.wait_condition = 4;
if (poll_fencing)
lockspace->cg_next.wait_condition = 1;
else if (poll_quorum)
lockspace->cg_next.wait_condition = 2;
else if (poll_fs)
lockspace->cg_next.wait_condition = 3;
if (cg->state == CGST_WAIT_MESSAGES)
lockspace->cg_next.wait_messages = 1;
out:
return 0;
}
static int _set_node_info(struct lockspace *ls, struct change *cg, int nodeid,
struct dlmc_node *node)
{
struct member *m = NULL;
struct node *n;
node->nodeid = nodeid;
if (cg)
m = find_memb(cg, nodeid);
if (!m)
goto history;
node->flags |= DLMC_NF_MEMBER;
if (m->start)
node->flags |= DLMC_NF_START;
if (m->disallowed)
node->flags |= DLMC_NF_DISALLOWED;
history:
n = get_node_history(ls, nodeid);
if (!n)
goto out;
if (n->check_fencing)
node->flags |= DLMC_NF_CHECK_FENCING;
if (n->check_quorum)
node->flags |= DLMC_NF_CHECK_QUORUM;
if (n->check_fs)
node->flags |= DLMC_NF_CHECK_FS;
if (n->fs_notified)
node->flags |= DLMC_NF_FS_NOTIFIED;
node->added_seq = n->added_seq;
node->removed_seq = n->removed_seq;
node->failed_reason = n->failed_reason;
out:
return 0;
}
int set_node_info(struct lockspace *ls, int nodeid, struct dlmc_node *node)
{
struct change *cg;
if (!list_empty(&ls->changes)) {
cg = list_first_entry(&ls->changes, struct change, list);
return _set_node_info(ls, cg, nodeid, node);
}
return _set_node_info(ls, ls->started_change, nodeid, node);
}
int set_lockspaces(int *count, struct dlmc_lockspace **lss_out)
{
struct lockspace *ls;
struct dlmc_lockspace *lss, *lsp;
int ls_count = 0;
list_for_each_entry(ls, &lockspaces, list)
ls_count++;
lss = malloc(ls_count * sizeof(struct dlmc_lockspace));
if (!lss)
return -ENOMEM;
memset(lss, 0, ls_count * sizeof(struct dlmc_lockspace));
lsp = lss;
list_for_each_entry(ls, &lockspaces, list) {
set_lockspace_info(ls, lsp++);
}
*count = ls_count;
*lss_out = lss;
return 0;
}
int set_lockspace_nodes(struct lockspace *ls, int option, int *node_count,
struct dlmc_node **nodes_out)
{
struct change *cg;
struct node *n;
struct dlmc_node *nodes = NULL, *nodep;
struct member *memb;
int count = 0;
if (option == DLMC_NODES_ALL) {
if (!list_empty(&ls->changes))
cg = list_first_entry(&ls->changes, struct change,list);
else
cg = ls->started_change;
list_for_each_entry(n, &ls->node_history, list)
count++;
} else if (option == DLMC_NODES_MEMBERS) {
if (!ls->started_change)
goto out;
cg = ls->started_change;
count = cg->member_count;
} else if (option == DLMC_NODES_NEXT) {
if (list_empty(&ls->changes))
goto out;
cg = list_first_entry(&ls->changes, struct change, list);
count = cg->member_count;
} else
goto out;
nodes = malloc(count * sizeof(struct dlmc_node));
if (!nodes)
return -ENOMEM;
memset(nodes, 0, count * sizeof(struct dlmc_node));
nodep = nodes;
if (option == DLMC_NODES_ALL) {
list_for_each_entry(n, &ls->node_history, list)
_set_node_info(ls, cg, n->nodeid, nodep++);
} else {
list_for_each_entry(memb, &cg->members, list)
_set_node_info(ls, cg, memb->nodeid, nodep++);
}
out:
*node_count = count;
*nodes_out = nodes;
return 0;
}
diff --git a/group/dlm_controld/dlm_daemon.h b/group/dlm_controld/dlm_daemon.h
index bcd94a37b..0b3feb9ae 100644
--- a/group/dlm_controld/dlm_daemon.h
+++ b/group/dlm_controld/dlm_daemon.h
@@ -1,315 +1,317 @@
#ifndef __DLM_DAEMON_DOT_H__
#define __DLM_DAEMON_DOT_H__
#include <sys/types.h>
#include <asm/types.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/utsname.h>
#include <sys/poll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>
#include <fcntl.h>
#include <netdb.h>
#include <limits.h>
#include <unistd.h>
#include <time.h>
#include <syslog.h>
#include <sched.h>
#include <signal.h>
#include <sys/time.h>
#include <dirent.h>
#include <openais/saAis.h>
#include <openais/saCkpt.h>
#include <corosync/cpg.h>
#include <corosync/engine/logsys.h>
#include <linux/dlmconstants.h>
#include "libdlmcontrol.h"
#include "dlm_controld.h"
#include "list.h"
#include "linux_endian.h"
/* DLM_LOCKSPACE_LEN: maximum lockspace name length, from linux/dlmconstants.h.
Copied in libdlm.h so apps don't need to include the kernel header.
The libcpg limit is larger at CPG_MAX_NAME_LENGTH 128. Our cpg name includes
a "dlm:" prefix before the lockspace name. */
/* Maximum members of a lockspace, should match CPG_MEMBERS_MAX in corosync/cpg.h.
There are no max defines in dlm-kernel for lockspace members. */
#define MAX_NODES 128
/* Maximum number of IP addresses per node, when using SCTP and multi-ring in
corosync In dlm-kernel this is DLM_MAX_ADDR_COUNT, currently 3. */
#define MAX_NODE_ADDRESSES 4
/* Max string length printed on a line, for debugging/dump output. */
#define MAXLINE 256
/* group_mode */
#define GROUP_LIBGROUP 2
#define GROUP_LIBCPG 3
extern int daemon_debug_opt;
extern int daemon_quit;
extern int poll_fencing;
extern int poll_quorum;
extern int poll_fs;
extern int poll_ignore_plock;
extern int plock_fd;
extern int plock_ci;
extern struct list_head lockspaces;
extern int cman_quorate;
extern int our_nodeid;
extern char daemon_debug_buf[256];
extern char dump_buf[DLMC_DUMP_SIZE];
extern int dump_point;
extern int dump_wrap;
extern char plock_dump_buf[DLMC_DUMP_SIZE];
extern int plock_dump_len;
extern int group_mode;
void daemon_dump_save(void);
#define log_debug(fmt, args...) \
do { \
snprintf(daemon_debug_buf, 255, "%ld " fmt "\n", time(NULL), ##args); \
daemon_dump_save(); \
if (daemon_debug_opt) \
fprintf(stderr, "%s", daemon_debug_buf); \
if (cfgd_debug_logsys) \
log_printf(LOG_DEBUG, "%s", daemon_debug_buf); \
} while (0)
#define log_group(ls, fmt, args...) \
do { \
snprintf(daemon_debug_buf, 255, "%ld %s " fmt "\n", time(NULL), \
(ls)->name, ##args); \
daemon_dump_save(); \
if (daemon_debug_opt) \
fprintf(stderr, "%s", daemon_debug_buf); \
if (cfgd_debug_logsys) \
log_printf(LOG_DEBUG, "%s", daemon_debug_buf); \
} while (0)
#define log_error(fmt, args...) \
do { \
log_debug(fmt, ##args); \
log_printf(LOG_ERR, fmt, ##args); \
} while (0)
#define log_plock(ls, fmt, args...) \
do { \
snprintf(daemon_debug_buf, 255, "%ld %s " fmt "\n", time(NULL), \
(ls)->name, ##args); \
if (daemon_debug_opt && cfgd_plock_debug) \
fprintf(stderr, "%s", daemon_debug_buf); \
} while (0)
/* dlm_header types */
enum {
DLM_MSG_START = 1,
DLM_MSG_PLOCK,
DLM_MSG_PLOCK_OWN,
DLM_MSG_PLOCK_DROP,
DLM_MSG_PLOCK_SYNC_LOCK,
DLM_MSG_PLOCK_SYNC_WAITER,
DLM_MSG_PLOCKS_STORED,
DLM_MSG_DEADLK_CYCLE_START,
DLM_MSG_DEADLK_CYCLE_END,
DLM_MSG_DEADLK_CHECKPOINT_READY,
DLM_MSG_DEADLK_CANCEL_LOCK
};
/* dlm_header flags */
#define DLM_MFLG_JOINING 1 /* accompanies start, we are joining */
#define DLM_MFLG_HAVEPLOCK 2 /* accompanies start, we have plock state */
struct dlm_header {
uint16_t version[3];
uint16_t type; /* DLM_MSG_ */
uint32_t nodeid; /* sender */
uint32_t to_nodeid; /* recipient, 0 for all */
uint32_t global_id; /* global unique id for this lockspace */
uint32_t flags; /* DLM_MFLG_ */
uint32_t msgdata; /* in-header payload depends on MSG type; lkid
for deadlock, seq for lockspace membership */
uint32_t pad1;
uint64_t pad2;
};
struct lockspace {
struct list_head list;
char name[DLM_LOCKSPACE_LEN+1];
uint32_t global_id;
/* lockspace membership stuff */
cpg_handle_t cpg_handle;
int cpg_client;
int cpg_fd;
int joining;
int leaving;
int kernel_stopped;
int fs_registered;
uint32_t change_seq;
uint32_t started_count;
struct change *started_change;
struct list_head changes;
struct list_head node_history;
/* plock stuff */
int plock_ckpt_node;
int need_plocks;
int save_plocks;
uint32_t associated_mg_id;
struct list_head saved_messages;
struct list_head plock_resources;
time_t last_checkpoint_time;
time_t last_plock_time;
struct timeval drop_resources_last;
uint64_t plock_ckpt_handle;
/* save copy of groupd member callback data for queries */
int cb_member_count;
int cb_members[MAX_NODES];
/* deadlock stuff */
int deadlk_low_nodeid;
struct list_head deadlk_nodes;
uint64_t deadlk_ckpt_handle;
int deadlk_confchg_init;
struct list_head transactions;
struct list_head resources;
struct timeval cycle_start_time;
struct timeval cycle_end_time;
struct timeval last_send_cycle_start;
int cycle_running;
int all_checkpoints_ready;
};
/* action.c */
void set_associated_id(uint32_t mg_id);
int set_sysfs_control(char *name, int val);
int set_sysfs_event_done(char *name, int val);
int set_sysfs_id(char *name, uint32_t id);
int set_configfs_members(char *name, int new_count, int *new_members,
int renew_count, int *renew_members);
int add_configfs_node(int nodeid, char *addr, int addrlen, int local);
void del_configfs_node(int nodeid);
void clear_configfs(void);
int setup_configfs(void);
+int check_uncontrolled_lockspaces(void);
/* config.c */
int setup_ccs(void);
void close_ccs(void);
void read_ccs_name(char *path, char *name);
void read_ccs_yesno(char *path, int *yes, int *no);
void read_ccs_int(char *path, int *config_val);
int get_weight(int nodeid, char *lockspace);
/* cpg.c */
int setup_cpg(void);
void process_lockspace_changes(void);
void dlm_send_message(struct lockspace *ls, char *buf, int len);
int dlm_join_lockspace(struct lockspace *ls);
int dlm_leave_lockspace(struct lockspace *ls);
char *msg_name(int type);
void update_flow_control_status(void);
int set_node_info(struct lockspace *ls, int nodeid, struct dlmc_node *node);
int set_lockspace_info(struct lockspace *ls, struct dlmc_lockspace *lockspace);
int set_lockspaces(int *count, struct dlmc_lockspace **lss_out);
int set_lockspace_nodes(struct lockspace *ls, int option, int *node_count,
struct dlmc_node **nodes_out);
int set_fs_notified(struct lockspace *ls, int nodeid);
/* deadlock.c */
void setup_deadlock(void);
void send_cycle_start(struct lockspace *ls);
void receive_checkpoint_ready(struct lockspace *ls, struct dlm_header *hd,
int len);
void receive_cycle_start(struct lockspace *ls, struct dlm_header *hd, int len);
void receive_cycle_end(struct lockspace *ls, struct dlm_header *hd, int len);
void receive_cancel_lock(struct lockspace *ls, struct dlm_header *hd, int len);
/* main.c */
int do_read(int fd, void *buf, size_t count);
int do_write(int fd, void *buf, size_t count);
void client_dead(int ci);
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci));
int client_fd(int ci);
void client_ignore(int ci, int fd);
void client_back(int ci, int fd);
struct lockspace *find_ls(char *name);
struct lockspace *find_ls_id(uint32_t id);
char *dlm_mode_str(int mode);
void cluster_dead(int ci);
/* member_cman.c */
int setup_cman(void);
void close_cman(void);
void process_cman(int ci);
void cman_statechange(void);
int is_cman_member(int nodeid);
char *nodeid2name(int nodeid);
+void kick_node_from_cluster(int nodeid);
/* netlink.c */
int setup_netlink(void);
void process_netlink(int ci);
/* plock.c */
int setup_plocks(void);
void process_plocks(int ci);
int limit_plocks(void);
void receive_plock(struct lockspace *ls, struct dlm_header *hd, int len);
void receive_own(struct lockspace *ls, struct dlm_header *hd, int len);
void receive_sync(struct lockspace *ls, struct dlm_header *hd, int len);
void receive_drop(struct lockspace *ls, struct dlm_header *hd, int len);
void process_saved_plocks(struct lockspace *ls);
void close_plock_checkpoint(struct lockspace *ls);
void store_plocks(struct lockspace *ls);
void retrieve_plocks(struct lockspace *ls);
void purge_plocks(struct lockspace *ls, int nodeid, int unmount);
int fill_plock_dump_buf(struct lockspace *ls);
/* group.c */
int setup_groupd(void);
void close_groupd(void);
void process_groupd(int ci);
int dlm_join_lockspace_group(struct lockspace *ls);
int dlm_leave_lockspace_group(struct lockspace *ls);
int set_node_info_group(struct lockspace *ls, int nodeid,
struct dlmc_node *node);
int set_lockspace_info_group(struct lockspace *ls,
struct dlmc_lockspace *lockspace);
int set_lockspaces_group(int *count, struct dlmc_lockspace **lss_out);
int set_lockspace_nodes_group(struct lockspace *ls, int option, int *node_count,
struct dlmc_node **nodes);
void set_group_mode(void);
/* logging.c */
void init_logging(void);
void setup_logging();
void close_logging(void);
#endif
diff --git a/group/dlm_controld/main.c b/group/dlm_controld/main.c
index 7b044ddc5..a05cfe1aa 100644
--- a/group/dlm_controld/main.c
+++ b/group/dlm_controld/main.c
@@ -1,1277 +1,1278 @@
#include "dlm_daemon.h"
#include "config.h"
#include <pthread.h>
#include "copyright.cf"
#include <linux/dlmconstants.h>
#include <linux/netlink.h>
#include <linux/genetlink.h>
#include <linux/dlm_netlink.h>
#define LOCKFILE_NAME "/var/run/dlm_controld.pid"
#define CLIENT_NALLOC 32
static int client_maxi;
static int client_size = 0;
static struct client *client = NULL;
static struct pollfd *pollfd = NULL;
static pthread_t query_thread;
static pthread_mutex_t query_mutex;
static struct list_head fs_register_list;
struct client {
int fd;
void *workfn;
void *deadfn;
struct lockspace *ls;
};
int do_read(int fd, void *buf, size_t count)
{
int rv, off = 0;
while (off < count) {
rv = read(fd, buf + off, count - off);
if (rv == 0)
return -1;
if (rv == -1 && errno == EINTR)
continue;
if (rv == -1)
return -1;
off += rv;
}
return 0;
}
int do_write(int fd, void *buf, size_t count)
{
int rv, off = 0;
retry:
rv = write(fd, buf + off, count);
if (rv == -1 && errno == EINTR)
goto retry;
if (rv < 0) {
log_error("write errno %d", errno);
return rv;
}
if (rv != count) {
count -= rv;
off += rv;
goto retry;
}
return 0;
}
static void client_alloc(void)
{
int i;
if (!client) {
client = malloc(CLIENT_NALLOC * sizeof(struct client));
pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd));
} else {
client = realloc(client, (client_size + CLIENT_NALLOC) *
sizeof(struct client));
pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) *
sizeof(struct pollfd));
if (!pollfd)
log_error("can't alloc for pollfd");
}
if (!client || !pollfd)
log_error("can't alloc for client array");
for (i = client_size; i < client_size + CLIENT_NALLOC; i++) {
client[i].workfn = NULL;
client[i].deadfn = NULL;
client[i].fd = -1;
pollfd[i].fd = -1;
pollfd[i].revents = 0;
}
client_size += CLIENT_NALLOC;
}
void client_dead(int ci)
{
close(client[ci].fd);
client[ci].workfn = NULL;
client[ci].fd = -1;
pollfd[ci].fd = -1;
}
int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
{
int i;
if (!client)
client_alloc();
again:
for (i = 0; i < client_size; i++) {
if (client[i].fd == -1) {
client[i].workfn = workfn;
if (deadfn)
client[i].deadfn = deadfn;
else
client[i].deadfn = client_dead;
client[i].fd = fd;
pollfd[i].fd = fd;
pollfd[i].events = POLLIN;
if (i > client_maxi)
client_maxi = i;
return i;
}
}
client_alloc();
goto again;
}
int client_fd(int ci)
{
return client[ci].fd;
}
void client_ignore(int ci, int fd)
{
pollfd[ci].fd = -1;
pollfd[ci].events = 0;
}
void client_back(int ci, int fd)
{
pollfd[ci].fd = fd;
pollfd[ci].events = POLLIN;
}
static void sigterm_handler(int sig)
{
daemon_quit = 1;
}
static struct lockspace *create_ls(char *name)
{
struct lockspace *ls;
ls = malloc(sizeof(*ls));
if (!ls)
goto out;
memset(ls, 0, sizeof(*ls));
strncpy(ls->name, name, DLM_LOCKSPACE_LEN);
INIT_LIST_HEAD(&ls->changes);
INIT_LIST_HEAD(&ls->node_history);
INIT_LIST_HEAD(&ls->saved_messages);
INIT_LIST_HEAD(&ls->plock_resources);
INIT_LIST_HEAD(&ls->deadlk_nodes);
INIT_LIST_HEAD(&ls->transactions);
INIT_LIST_HEAD(&ls->resources);
out:
return ls;
}
struct lockspace *find_ls(char *name)
{
struct lockspace *ls;
list_for_each_entry(ls, &lockspaces, list) {
if ((strlen(ls->name) == strlen(name)) &&
!strncmp(ls->name, name, strlen(name)))
return ls;
}
return NULL;
}
struct lockspace *find_ls_id(uint32_t id)
{
struct lockspace *ls;
list_for_each_entry(ls, &lockspaces, list) {
if (ls->global_id == id)
return ls;
}
return NULL;
}
struct fs_reg {
struct list_head list;
char name[DLM_LOCKSPACE_LEN+1];
};
static int fs_register_check(char *name)
{
struct fs_reg *fs;
list_for_each_entry(fs, &fs_register_list, list) {
if (!strcmp(name, fs->name))
return 1;
}
return 0;
}
static int fs_register_add(char *name)
{
struct fs_reg *fs;
if (fs_register_check(name))
return -EALREADY;
fs = malloc(sizeof(struct fs_reg));
if (!fs)
return -ENOMEM;
strncpy(fs->name, name, DLM_LOCKSPACE_LEN);
list_add(&fs->list, &fs_register_list);
return 0;
}
static void fs_register_del(char *name)
{
struct fs_reg *fs;
list_for_each_entry(fs, &fs_register_list, list) {
if (!strcmp(name, fs->name)) {
list_del(&fs->list);
free(fs);
return;
}
}
}
#define MAXARGS 8
static char *get_args(char *buf, int *argc, char **argv, char sep, int want)
{
char *p = buf, *rp = NULL;
int i;
argv[0] = p;
for (i = 1; i < MAXARGS; i++) {
p = strchr(buf, sep);
if (!p)
break;
*p = '\0';
if (want == i) {
rp = p + 1;
break;
}
argv[i] = p + 1;
buf = p + 1;
}
*argc = i;
/* we ended by hitting \0, return the point following that */
if (!rp)
rp = strchr(buf, '\0') + 1;
return rp;
}
char *dlm_mode_str(int mode)
{
switch (mode) {
case DLM_LOCK_IV:
return "IV";
case DLM_LOCK_NL:
return "NL";
case DLM_LOCK_CR:
return "CR";
case DLM_LOCK_CW:
return "CW";
case DLM_LOCK_PR:
return "PR";
case DLM_LOCK_PW:
return "PW";
case DLM_LOCK_EX:
return "EX";
}
return "??";
}
/* recv "online" (join) and "offline" (leave)
messages from dlm via uevents and pass them on to groupd */
static void process_uevent(int ci)
{
struct lockspace *ls;
char buf[MAXLINE];
char *argv[MAXARGS], *act, *sys;
int rv, argc = 0;
memset(buf, 0, sizeof(buf));
memset(argv, 0, sizeof(char *) * MAXARGS);
retry_recv:
rv = recv(client[ci].fd, &buf, sizeof(buf), 0);
if (rv == -1 && rv == EINTR)
goto retry_recv;
if (rv == -1 && rv == EAGAIN)
return;
if (rv < 0) {
log_error("uevent recv error %d errno %d", rv, errno);
return;
}
if (!strstr(buf, "dlm"))
return;
log_debug("uevent: %s", buf);
get_args(buf, &argc, argv, '/', 4);
if (argc != 4)
log_error("uevent message has %d args", argc);
act = argv[0];
sys = argv[2];
if ((strlen(sys) != strlen("dlm")) || strcmp(sys, "dlm"))
return;
log_debug("kernel: %s %s", act, argv[3]);
rv = 0;
if (!strcmp(act, "online@")) {
ls = find_ls(argv[3]);
if (ls) {
rv = -EEXIST;
goto out;
}
ls = create_ls(argv[3]);
if (!ls) {
rv = -ENOMEM;
goto out;
}
if (fs_register_check(ls->name))
ls->fs_registered = 1;
if (group_mode == GROUP_LIBGROUP)
rv = dlm_join_lockspace_group(ls);
else
rv = dlm_join_lockspace(ls);
if (rv) {
/* ls already freed */
goto out;
}
} else if (!strcmp(act, "offline@")) {
ls = find_ls(argv[3]);
if (!ls) {
rv = -ENOENT;
goto out;
}
if (group_mode == GROUP_LIBGROUP)
dlm_leave_lockspace_group(ls);
else
dlm_leave_lockspace(ls);
}
out:
if (rv < 0)
log_error("process_uevent %s error %d errno %d",
act, rv, errno);
}
static int setup_uevent(void)
{
struct sockaddr_nl snl;
int s, rv;
s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_KOBJECT_UEVENT);
if (s < 0) {
log_error("uevent netlink socket");
return s;
}
memset(&snl, 0, sizeof(snl));
snl.nl_family = AF_NETLINK;
snl.nl_pid = getpid();
snl.nl_groups = 1;
rv = bind(s, (struct sockaddr *) &snl, sizeof(snl));
if (rv < 0) {
log_error("uevent bind error %d errno %d", rv, errno);
close(s);
return rv;
}
return s;
}
static void init_header(struct dlmc_header *h, int cmd, char *name, int result,
int extra_len)
{
memset(h, 0, sizeof(struct dlmc_header));
h->magic = DLMC_MAGIC;
h->version = DLMC_VERSION;
h->len = sizeof(struct dlmc_header) + extra_len;
h->command = cmd;
h->data = result;
if (name)
strncpy(h->name, name, DLM_LOCKSPACE_LEN);
}
static void query_dump_debug(int fd)
{
struct dlmc_header h;
int extra_len;
int len;
/* in the case of dump_wrap, extra_len will go in two writes,
first the log tail, then the log head */
if (dump_wrap)
extra_len = DLMC_DUMP_SIZE;
else
extra_len = dump_point;
init_header(&h, DLMC_CMD_DUMP_DEBUG, NULL, 0, extra_len);
do_write(fd, &h, sizeof(h));
if (dump_wrap) {
len = DLMC_DUMP_SIZE - dump_point;
do_write(fd, dump_buf + dump_point, len);
len = dump_point;
} else
len = dump_point;
/* NUL terminate the debug string */
dump_buf[dump_point] = '\0';
do_write(fd, dump_buf, len);
}
static void query_dump_plocks(int fd, char *name)
{
struct lockspace *ls;
struct dlmc_header h;
int rv;
ls = find_ls(name);
if (!ls) {
plock_dump_len = 0;
rv = -ENOENT;
} else {
/* writes to plock_dump_buf and sets plock_dump_len */
rv = fill_plock_dump_buf(ls);
}
init_header(&h, DLMC_CMD_DUMP_PLOCKS, name, rv, plock_dump_len);
do_write(fd, &h, sizeof(h));
if (plock_dump_len)
do_write(fd, plock_dump_buf, plock_dump_len);
}
/* combines a header and the data and sends it back to the client in
a single do_write() call */
static void do_reply(int fd, int cmd, char *name, int result, int option,
char *buf, int buflen)
{
struct dlmc_header *h;
char *reply;
int reply_len;
reply_len = sizeof(struct dlmc_header) + buflen;
reply = malloc(reply_len);
if (!reply)
return;
memset(reply, 0, reply_len);
h = (struct dlmc_header *)reply;
init_header(h, cmd, name, result, buflen);
h->option = option;
if (buf && buflen)
memcpy(reply + sizeof(struct dlmc_header), buf, buflen);
do_write(fd, reply, reply_len);
free(reply);
}
static void query_lockspace_info(int fd, char *name)
{
struct lockspace *ls;
struct dlmc_lockspace lockspace;
int rv;
ls = find_ls(name);
if (!ls) {
rv = -ENOENT;
goto out;
}
memset(&lockspace, 0, sizeof(lockspace));
lockspace.group_mode = group_mode;
if (group_mode == GROUP_LIBGROUP)
rv = set_lockspace_info_group(ls, &lockspace);
else
rv = set_lockspace_info(ls, &lockspace);
out:
do_reply(fd, DLMC_CMD_LOCKSPACE_INFO, name, rv, 0,
(char *)&lockspace, sizeof(lockspace));
}
static void query_node_info(int fd, char *name, int nodeid)
{
struct lockspace *ls;
struct dlmc_node node;
int rv;
ls = find_ls(name);
if (!ls) {
rv = -ENOENT;
goto out;
}
if (group_mode == GROUP_LIBGROUP)
rv = set_node_info_group(ls, nodeid, &node);
else
rv = set_node_info(ls, nodeid, &node);
out:
do_reply(fd, DLMC_CMD_NODE_INFO, name, rv, 0,
(char *)&node, sizeof(node));
}
static void query_lockspaces(int fd, int max)
{
int ls_count = 0;
struct dlmc_lockspace *lss = NULL;
int rv, result;
if (group_mode == GROUP_LIBGROUP)
rv = set_lockspaces_group(&ls_count, &lss);
else
rv = set_lockspaces(&ls_count, &lss);
if (rv < 0) {
result = rv;
ls_count = 0;
goto out;
}
if (ls_count > max) {
result = -E2BIG;
ls_count = max;
} else {
result = ls_count;
}
out:
do_reply(fd, DLMC_CMD_LOCKSPACES, NULL, result, 0,
(char *)lss, ls_count * sizeof(struct dlmc_lockspace));
if (lss)
free(lss);
}
static void query_lockspace_nodes(int fd, char *name, int option, int max)
{
struct lockspace *ls;
int node_count = 0;
struct dlmc_node *nodes = NULL;
int rv, result;
ls = find_ls(name);
if (!ls) {
result = -ENOENT;
node_count = 0;
goto out;
}
if (group_mode == GROUP_LIBGROUP)
rv = set_lockspace_nodes_group(ls, option, &node_count, &nodes);
else
rv = set_lockspace_nodes(ls, option, &node_count, &nodes);
if (rv < 0) {
result = rv;
node_count = 0;
goto out;
}
/* node_count is the number of structs copied/returned; the caller's
max may be less than that, in which case we copy as many as they
asked for and return -E2BIG */
if (node_count > max) {
result = -E2BIG;
node_count = max;
} else {
result = node_count;
}
out:
do_reply(fd, DLMC_CMD_LOCKSPACE_NODES, name, result, 0,
(char *)nodes, node_count * sizeof(struct dlmc_node));
if (nodes)
free(nodes);
}
static void process_connection(int ci)
{
struct dlmc_header h;
char *extra = NULL;
int rv, extra_len;
struct lockspace *ls;
rv = do_read(client[ci].fd, &h, sizeof(h));
if (rv < 0) {
log_debug("connection %d read error %d", ci, rv);
goto out;
}
if (h.magic != DLMC_MAGIC) {
log_debug("connection %d magic error %x", ci, h.magic);
goto out;
}
if ((h.version & 0xFFFF0000) != (DLMC_VERSION & 0xFFFF0000)) {
log_debug("connection %d version error %x", ci, h.version);
goto out;
}
if (h.len > sizeof(h)) {
extra_len = h.len - sizeof(h);
extra = malloc(extra_len);
if (!extra) {
log_error("process_connection no mem %d", extra_len);
goto out;
}
memset(extra, 0, extra_len);
rv = do_read(client[ci].fd, extra, extra_len);
if (rv < 0) {
log_debug("connection %d extra read error %d", ci, rv);
goto out;
}
}
switch (h.command) {
case DLMC_CMD_FS_REGISTER:
if (group_mode == GROUP_LIBGROUP) {
rv = -EINVAL;
} else {
rv = fs_register_add(h.name);
ls = find_ls(h.name);
if (ls)
ls->fs_registered = 1;
}
do_reply(client[ci].fd, DLMC_CMD_FS_REGISTER, h.name, rv, 0,
NULL, 0);
break;
case DLMC_CMD_FS_UNREGISTER:
if (group_mode == GROUP_LIBGROUP)
break;
fs_register_del(h.name);
ls = find_ls(h.name);
if (ls)
ls->fs_registered = 0;
break;
case DLMC_CMD_FS_NOTIFIED:
ls = find_ls(h.name);
if (ls)
rv = set_fs_notified(ls, h.data);
else
rv = -ENOENT;
/* pass back the nodeid provided by caller in option field */
do_reply(client[ci].fd, DLMC_CMD_FS_NOTIFIED, h.name, rv,
h.option, NULL, 0);
break;
case DLMC_CMD_DEADLOCK_CHECK:
ls = find_ls(h.name);
if (ls)
send_cycle_start(ls);
client_dead(ci);
break;
default:
log_error("process_connection %d unknown command %d",
ci, h.command);
}
out:
if (extra)
free(extra);
}
static void process_listener(int ci)
{
int fd, i;
fd = accept(client[ci].fd, NULL, NULL);
if (fd < 0) {
log_error("process_listener: accept error %d %d", fd, errno);
return;
}
i = client_add(fd, process_connection, NULL);
log_debug("client connection %d fd %d", i, fd);
}
static int setup_listener(char *sock_path)
{
struct sockaddr_un addr;
socklen_t addrlen;
int rv, s;
/* we listen for new client connections on socket s */
s = socket(AF_LOCAL, SOCK_STREAM, 0);
if (s < 0) {
log_error("socket error %d %d", s, errno);
return s;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_LOCAL;
strcpy(&addr.sun_path[1], sock_path);
addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1;
rv = bind(s, (struct sockaddr *) &addr, addrlen);
if (rv < 0) {
log_error("bind error %d %d", rv, errno);
close(s);
return rv;
}
rv = listen(s, 5);
if (rv < 0) {
log_error("listen error %d %d", rv, errno);
close(s);
return rv;
}
return s;
}
void query_lock(void)
{
pthread_mutex_lock(&query_mutex);
}
void query_unlock(void)
{
pthread_mutex_unlock(&query_mutex);
}
/* This is a thread, so we have to be careful, don't call log_ functions.
We need a thread to process queries because the main thread may block
for long periods when writing to sysfs to stop dlm-kernel (any maybe
other places). */
static void *process_queries(void *arg)
{
struct dlmc_header h;
int s = *((int *)arg);
int f, rv;
for (;;) {
f = accept(s, NULL, NULL);
rv = do_read(f, &h, sizeof(h));
if (rv < 0) {
goto out;
}
if (h.magic != DLMC_MAGIC) {
goto out;
}
if ((h.version & 0xFFFF0000) != (DLMC_VERSION & 0xFFFF0000)) {
goto out;
}
query_lock();
switch (h.command) {
case DLMC_CMD_DUMP_DEBUG:
query_dump_debug(f);
break;
case DLMC_CMD_DUMP_PLOCKS:
query_dump_plocks(f, h.name);
break;
case DLMC_CMD_LOCKSPACE_INFO:
query_lockspace_info(f, h.name);
break;
case DLMC_CMD_NODE_INFO:
query_node_info(f, h.name, h.data);
break;
case DLMC_CMD_LOCKSPACES:
query_lockspaces(f, h.data);
break;
case DLMC_CMD_LOCKSPACE_NODES:
query_lockspace_nodes(f, h.name, h.option, h.data);
break;
default:
break;
}
query_unlock();
out:
close(f);
}
}
static int setup_queries(void)
{
int rv, s;
rv = setup_listener(DLMC_QUERY_SOCK_PATH);
if (rv < 0)
return rv;
s = rv;
pthread_mutex_init(&query_mutex, NULL);
rv = pthread_create(&query_thread, NULL, process_queries, &s);
if (rv < 0) {
log_error("can't create query thread");
close(s);
return rv;
}
return 0;
}
void cluster_dead(int ci)
{
log_error("cluster is down, exiting");
daemon_quit = 1;
}
static void loop(void)
{
int poll_timeout = -1;
int rv, i;
void (*workfn) (int ci);
void (*deadfn) (int ci);
- /* FIXME: add code that looks for uncontrolled instances of
- dlm lockspaces in the kernel */
-
rv = setup_queries();
if (rv < 0)
goto out;
rv = setup_listener(DLMC_SOCK_PATH);
if (rv < 0)
goto out;
client_add(rv, process_listener, NULL);
rv = setup_cman();
if (rv < 0)
goto out;
client_add(rv, process_cman, cluster_dead);
rv = setup_ccs();
if (rv < 0)
goto out;
setup_logging();
+ rv = check_uncontrolled_lockspaces();
+ if (rv < 0)
+ goto out;
+
rv = setup_configfs();
if (rv < 0)
goto out;
rv = setup_uevent();
if (rv < 0)
goto out;
client_add(rv, process_uevent, NULL);
group_mode = GROUP_LIBCPG;
if (cfgd_groupd_compat) {
rv = setup_groupd();
if (rv < 0)
goto out;
client_add(rv, process_groupd, cluster_dead);
group_mode = GROUP_LIBGROUP;
if (cfgd_groupd_compat == 2)
set_group_mode();
}
log_debug("group_mode %d compat %d", group_mode, cfgd_groupd_compat);
if (group_mode == GROUP_LIBCPG) {
rv = setup_cpg();
if (rv < 0)
goto out;
/* client_add(rv, process_cpg, cluster_dead); */
if (cfgd_enable_deadlk) {
rv = setup_netlink();
if (rv < 0)
goto out;
client_add(rv, process_netlink, NULL);
setup_deadlock();
}
rv = setup_plocks();
if (rv < 0)
goto out;
plock_fd = rv;
plock_ci = client_add(rv, process_plocks, NULL);
}
for (;;) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR) {
if (daemon_quit && list_empty(&lockspaces))
goto out;
daemon_quit = 0;
continue;
}
if (rv < 0) {
log_error("poll errno %d", errno);
goto out;
}
/* FIXME: lock/unlock around operations that take a while */
query_lock();
for (i = 0; i <= client_maxi; i++) {
if (client[i].fd < 0)
continue;
if (pollfd[i].revents & POLLIN) {
workfn = client[i].workfn;
workfn(i);
}
if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
deadfn = client[i].deadfn;
deadfn(i);
}
}
query_unlock();
if (daemon_quit)
break;
poll_timeout = -1;
if (poll_fencing || poll_quorum || poll_fs) {
process_lockspace_changes();
poll_timeout = 1000;
}
if (poll_ignore_plock) {
if (!limit_plocks()) {
poll_ignore_plock = 0;
client_back(plock_ci, plock_fd);
}
poll_timeout = 1000;
}
}
out:
if (cfgd_groupd_compat)
close_groupd();
clear_configfs();
close_logging();
close_ccs();
close_cman();
if (!list_empty(&lockspaces))
log_error("lockspaces abandoned");
}
static void lockfile(void)
{
int fd, error;
struct flock lock;
char buf[33];
memset(buf, 0, 33);
fd = open(LOCKFILE_NAME, O_CREAT|O_WRONLY,
S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (fd < 0) {
fprintf(stderr, "cannot open/create lock file %s\n",
LOCKFILE_NAME);
exit(EXIT_FAILURE);
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
error = fcntl(fd, F_SETLK, &lock);
if (error) {
fprintf(stderr, "dlm_controld is already running\n");
exit(EXIT_FAILURE);
}
error = ftruncate(fd, 0);
if (error) {
fprintf(stderr, "cannot clear lock file %s\n", LOCKFILE_NAME);
exit(EXIT_FAILURE);
}
sprintf(buf, "%d\n", getpid());
error = write(fd, buf, strlen(buf));
if (error <= 0) {
fprintf(stderr, "cannot write lock file %s\n", LOCKFILE_NAME);
exit(EXIT_FAILURE);
}
}
static void print_usage(void)
{
printf("Usage:\n");
printf("\n");
printf("dlm_controld [options]\n");
printf("\n");
printf("Options:\n");
printf("\n");
printf(" -D Enable daemon debugging and don't fork\n");
printf(" -L <num> Enable (1) or disable (0) debugging to logsys (default %d)\n", DEFAULT_DEBUG_LOGSYS);
printf(" -K Enable kernel dlm debugging messages\n");
printf(" -g <num> groupd compatibility mode, 0 off, 1 on, 2 detect\n");
printf(" 0: use libcpg, no backward compat, best performance\n");
printf(" 1: use libgroup for compat with cluster2/rhel5\n");
printf(" 2: use groupd to detect old, or mode 1, nodes that\n"
" require compat, use libcpg if none found\n");
printf(" Default is %d\n", DEFAULT_GROUPD_COMPAT);
printf(" -f <num> Enable (1) or disable (0) fencing recovery dependency\n");
printf(" Default is %d\n", DEFAULT_ENABLE_FENCING);
printf(" -q <num> Enable (1) or disable (0) quorum recovery dependency\n");
printf(" Default is %d\n", DEFAULT_ENABLE_QUORUM);
printf(" -d <num> Enable (1) or disable (0) deadlock detection code\n");
printf(" Default is %d\n", DEFAULT_ENABLE_DEADLK);
printf(" -p <num> Enable (1) or disable (0) plock code for cluster fs\n");
printf(" Default is %d\n", DEFAULT_ENABLE_PLOCK);
printf(" -P Enable plock debugging\n");
printf(" -l <limit> Limit the rate of plock operations\n");
printf(" Default is %d, set to 0 for no limit\n", DEFAULT_PLOCK_RATE_LIMIT);
printf(" -o <n> Enable (1) or disable (0) plock ownership\n");
printf(" Default is %d\n", DEFAULT_PLOCK_OWNERSHIP);
printf(" -t <ms> plock ownership drop resources time (milliseconds)\n");
printf(" Default is %u\n", DEFAULT_DROP_RESOURCES_TIME);
printf(" -c <num> plock ownership drop resources count\n");
printf(" Default is %u\n", DEFAULT_DROP_RESOURCES_COUNT);
printf(" -a <ms> plock ownership drop resources age (milliseconds)\n");
printf(" Default is %u\n", DEFAULT_DROP_RESOURCES_AGE);
printf(" -h Print this help, then exit\n");
printf(" -V Print program version information, then exit\n");
}
#define OPTION_STRING "L:DKg:f:q:d:p:Pl:o:t:c:a:hV"
static void read_arguments(int argc, char **argv)
{
int cont = 1;
int optchar;
/* we don't allow these to be set on command line, should we? */
optk_timewarn = 0;
optk_timewarn = 0;
while (cont) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'D':
daemon_debug_opt = 1;
break;
case 'L':
optd_debug_logsys = 1;
cfgd_debug_logsys = atoi(optarg);
break;
case 'g':
optd_groupd_compat = 1;
cfgd_groupd_compat = atoi(optarg);
break;
case 'K':
optk_debug = 1;
cfgk_debug = 1;
break;
case 'f':
optd_enable_fencing = 1;
cfgd_enable_fencing = atoi(optarg);
break;
case 'q':
optd_enable_quorum = 1;
cfgd_enable_quorum = atoi(optarg);
break;
case 'd':
optd_enable_deadlk = 1;
cfgd_enable_deadlk = atoi(optarg);
break;
case 'p':
optd_enable_plock = 1;
cfgd_enable_plock = atoi(optarg);
break;
case 'P':
optd_plock_debug = 1;
cfgd_plock_debug = 1;
break;
case 'l':
optd_plock_rate_limit = 1;
cfgd_plock_rate_limit = atoi(optarg);
break;
case 'o':
optd_plock_ownership = 1;
cfgd_plock_ownership = atoi(optarg);
break;
case 't':
optd_drop_resources_time = 1;
cfgd_drop_resources_time = atoi(optarg);
break;
case 'c':
optd_drop_resources_count = 1;
cfgd_drop_resources_count = atoi(optarg);
break;
case 'a':
optd_drop_resources_age = 1;
cfgd_drop_resources_age = atoi(optarg);
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case 'V':
printf("dlm_controld %s (built %s %s)\n",
RELEASE_VERSION, __DATE__, __TIME__);
printf("%s\n", REDHAT_COPYRIGHT);
exit(EXIT_SUCCESS);
break;
case ':':
case '?':
fprintf(stderr, "Please use '-h' for usage.\n");
exit(EXIT_FAILURE);
break;
case EOF:
cont = 0;
break;
default:
fprintf(stderr, "unknown option: %c\n", optchar);
exit(EXIT_FAILURE);
break;
};
}
if (!optd_debug_logsys && getenv("DLM_CONTROLD_DEBUG")) {
optd_debug_logsys = 1;
cfgd_debug_logsys = atoi(getenv("DLM_CONTROLD_DEBUG"));
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static void set_scheduler(void)
{
struct sched_param sched_param;
int rv;
rv = sched_get_priority_max(SCHED_RR);
if (rv != -1) {
sched_param.sched_priority = rv;
rv = sched_setscheduler(0, SCHED_RR, &sched_param);
if (rv == -1)
log_error("could not set SCHED_RR priority %d err %d",
sched_param.sched_priority, errno);
} else {
log_error("could not get maximum scheduler priority err %d",
errno);
}
}
int main(int argc, char **argv)
{
INIT_LIST_HEAD(&lockspaces);
INIT_LIST_HEAD(&fs_register_list);
init_logging();
read_arguments(argc, argv);
lockfile();
if (!daemon_debug_opt) {
if (daemon(0, 0) < 0) {
perror("daemon error");
exit(EXIT_FAILURE);
}
}
signal(SIGTERM, sigterm_handler);
set_scheduler();
set_oom_adj(-16);
loop();
return 0;
}
void daemon_dump_save(void)
{
int len, i;
len = strlen(daemon_debug_buf);
for (i = 0; i < len; i++) {
dump_buf[dump_point++] = daemon_debug_buf[i];
if (dump_point == DLMC_DUMP_SIZE) {
dump_point = 0;
dump_wrap = 1;
}
}
}
int daemon_debug_opt;
int daemon_quit;
int poll_fencing;
int poll_quorum;
int poll_fs;
int poll_ignore_plock;
int plock_fd;
int plock_ci;
struct list_head lockspaces;
int cman_quorate;
int our_nodeid;
char daemon_debug_buf[256];
char dump_buf[DLMC_DUMP_SIZE];
int dump_point;
int dump_wrap;
char plock_dump_buf[DLMC_DUMP_SIZE];
int plock_dump_len;
int group_mode;
diff --git a/group/dlm_controld/member_cman.c b/group/dlm_controld/member_cman.c
index ee25d4df3..7fe0b95ca 100644
--- a/group/dlm_controld/member_cman.c
+++ b/group/dlm_controld/member_cman.c
@@ -1,216 +1,235 @@
#include "dlm_daemon.h"
#include "config.h"
#include <libcman.h>
static cman_handle_t ch;
+static cman_handle_t ch_admin;
static cman_node_t old_nodes[MAX_NODES];
static int old_node_count;
static cman_node_t cman_nodes[MAX_NODES];
static int cman_node_count;
+void kick_node_from_cluster(int nodeid)
+{
+ if (!nodeid) {
+ log_error("telling cman to shut down cluster locally");
+ cman_shutdown(ch_admin, CMAN_SHUTDOWN_ANYWAY);
+ } else {
+ log_error("telling cman to remove nodeid %d from cluster",
+ nodeid);
+ cman_kill_node(ch_admin, nodeid);
+ }
+}
+
static int is_member(cman_node_t *node_list, int count, int nodeid)
{
int i;
for (i = 0; i < count; i++) {
if (node_list[i].cn_nodeid == nodeid)
return node_list[i].cn_member;
}
return 0;
}
static int is_old_member(int nodeid)
{
return is_member(old_nodes, old_node_count, nodeid);
}
int is_cman_member(int nodeid)
{
return is_member(cman_nodes, cman_node_count, nodeid);
}
static cman_node_t *find_cman_node(int nodeid)
{
int i;
for (i = 0; i < cman_node_count; i++) {
if (cman_nodes[i].cn_nodeid == nodeid)
return &cman_nodes[i];
}
return NULL;
}
char *nodeid2name(int nodeid)
{
cman_node_t *cn;
cn = find_cman_node(nodeid);
if (!cn)
return NULL;
return cn->cn_name;
}
/* add a configfs dir for cluster members that don't have one,
del the configfs dir for cluster members that are now gone */
static void statechange(void)
{
int i, j, rv;
struct cman_node_address addrs[MAX_NODE_ADDRESSES];
int num_addrs;
struct cman_node_address *addrptr = addrs;
cman_quorate = cman_is_quorate(ch);
old_node_count = cman_node_count;
memcpy(&old_nodes, &cman_nodes, sizeof(old_nodes));
cman_node_count = 0;
memset(&cman_nodes, 0, sizeof(cman_nodes));
rv = cman_get_nodes(ch, MAX_NODES, &cman_node_count, cman_nodes);
if (rv < 0) {
log_debug("cman_get_nodes error %d %d", rv, errno);
return;
}
/* Never allow node ID 0 to be considered a member #315711 */
for (i = 0; i < cman_node_count; i++) {
if (cman_nodes[i].cn_nodeid == 0) {
cman_nodes[i].cn_member = 0;
break;
}
}
for (i = 0; i < old_node_count; i++) {
if (old_nodes[i].cn_member &&
!is_cman_member(old_nodes[i].cn_nodeid)) {
log_debug("cman: node %d removed",
old_nodes[i].cn_nodeid);
del_configfs_node(old_nodes[i].cn_nodeid);
}
}
for (i = 0; i < cman_node_count; i++) {
if (cman_nodes[i].cn_member &&
!is_old_member(cman_nodes[i].cn_nodeid)) {
rv = cman_get_node_addrs(ch, cman_nodes[i].cn_nodeid,
MAX_NODE_ADDRESSES,
&num_addrs, addrs);
if (rv < 0) {
log_debug("cman_get_node_addrs failed, falling back to single-homed. ");
num_addrs = 1;
addrptr = &cman_nodes[i].cn_address;
}
log_debug("cman: node %d added",
cman_nodes[i].cn_nodeid);
for (j = 0; j < num_addrs; j++) {
add_configfs_node(cman_nodes[i].cn_nodeid,
addrptr[j].cna_address,
addrptr[j].cna_addrlen,
(cman_nodes[i].cn_nodeid ==
our_nodeid));
}
}
}
}
static void cman_callback(cman_handle_t h, void *private, int reason, int arg)
{
switch (reason) {
case CMAN_REASON_TRY_SHUTDOWN:
if (list_empty(&lockspaces))
cman_replyto_shutdown(ch, 1);
else {
log_debug("no to cman shutdown");
cman_replyto_shutdown(ch, 0);
}
break;
case CMAN_REASON_STATECHANGE:
statechange();
break;
}
}
void process_cman(int ci)
{
int rv;
rv = cman_dispatch(ch, CMAN_DISPATCH_ALL);
if (rv == -1 && errno == EHOSTDOWN)
cluster_dead(0);
}
int setup_cman(void)
{
cman_node_t node;
int rv, fd;
int init = 0, active = 0;
retry_init:
- ch = cman_init(NULL);
- if (!ch) {
+ ch_admin = cman_admin_init(NULL);
+ if (!ch_admin) {
if (init++ < 2) {
sleep(1);
goto retry_init;
}
+ log_error("cman_admin_init error %d", errno);
+ return -ENOTCONN;
+ }
+
+ ch = cman_init(NULL);
+ if (!ch) {
log_error("cman_init error %d", errno);
return -ENOTCONN;
}
retry_active:
rv = cman_is_active(ch);
if (!rv) {
if (active++ < 2) {
sleep(1);
goto retry_active;
}
log_error("cman_is_active error %d", errno);
cman_finish(ch);
return -ENOTCONN;
}
rv = cman_start_notification(ch, cman_callback);
if (rv < 0) {
log_error("cman_start_notification error %d %d", rv, errno);
cman_finish(ch);
return rv;
}
fd = cman_get_fd(ch);
/* FIXME: wait here for us to be a member of the cluster */
memset(&node, 0, sizeof(node));
rv = cman_get_node(ch, CMAN_NODEID_US, &node);
if (rv < 0) {
log_error("cman_get_node us error %d %d", rv, errno);
cman_stop_notification(ch);
cman_finish(ch);
fd = rv;
goto out;
}
our_nodeid = node.cn_nodeid;
old_node_count = 0;
memset(&old_nodes, 0, sizeof(old_nodes));
cman_node_count = 0;
memset(&cman_nodes, 0, sizeof(cman_nodes));
out:
return fd;
}
void close_cman(void)
{
cman_finish(ch);
}
/* Force re-read of cman nodes */
void cman_statechange(void)
{
statechange();
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 11:04 AM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1018587
Default Alt Text
(101 KB)
Attached To
Mode
rR Resource Agents
Attached
Detach File
Event Timeline
Log In to Comment