Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4624364
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
237 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/cib/callbacks.c b/cib/callbacks.c
index c28bd9fadd..daa736a927 100644
--- a/cib/callbacks.c
+++ b/cib/callbacks.c
@@ -1,1421 +1,1419 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <crm/crm.h>
#include <crm/cib.h>
#include <crm/msg_xml.h>
-#include <crm/common/ipc.h>
-#include <crm/common/ipcs.h>
#include <crm/cluster/internal.h>
#include <crm/common/xml.h>
#include <cibio.h>
#include <callbacks.h>
#include <cibmessages.h>
#include <notify.h>
#include "common.h"
extern GMainLoop *mainloop;
extern gboolean cib_shutdown_flag;
extern gboolean stand_alone;
extern const char *cib_root;
static unsigned long cib_local_bcast_num = 0;
typedef struct cib_local_notify_s {
xmlNode *notify_src;
char *client_id;
gboolean from_peer;
gboolean sync_reply;
} cib_local_notify_t;
qb_ipcs_service_t *ipcs_ro = NULL;
qb_ipcs_service_t *ipcs_rw = NULL;
qb_ipcs_service_t *ipcs_shm = NULL;
extern crm_cluster_t crm_cluster;
extern int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset);
extern void GHFunc_count_peers(gpointer key, gpointer value, gpointer user_data);
gint cib_GCompareFunc(gconstpointer a, gconstpointer b);
gboolean can_write(int flags);
void send_cib_replace(const xmlNode * sync_request, const char *host);
void cib_process_request(xmlNode * request, gboolean privileged, gboolean force_synchronous,
gboolean from_peer, crm_client_t * cib_client);
extern GHashTable *local_notify_queue;
int next_client_id = 0;
extern const char *cib_our_uname;
extern unsigned long cib_num_ops, cib_num_local, cib_num_updates, cib_num_fail;
extern unsigned long cib_bad_connects, cib_num_timeouts;
extern int cib_status;
int cib_process_command(xmlNode * request, xmlNode ** reply,
xmlNode ** cib_diff, gboolean privileged);
gboolean cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged);
static int32_t
cib_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
crm_trace("Connection %p", c);
if (cib_shutdown_flag) {
crm_info("Ignoring new client [%d] during shutdown", crm_ipcs_client_pid(c));
return -EPERM;
}
if(crm_client_new(c, uid, gid) == NULL) {
return -EIO;
}
return 0;
}
static void
cib_ipc_created(qb_ipcs_connection_t *c)
{
crm_trace("Connection %p", c);
}
static int32_t
cib_ipc_dispatch_rw(qb_ipcs_connection_t *c, void *data, size_t size)
{
crm_client_t *client = crm_client_get(c);
crm_trace("%p message from %s", c, client->id);
return cib_common_callback(c, data, size, TRUE);
}
static int32_t
cib_ipc_dispatch_ro(qb_ipcs_connection_t *c, void *data, size_t size)
{
crm_client_t *client = crm_client_get(c);
crm_trace("%p message from %s", c, client->id);
return cib_common_callback(c, data, size, FALSE);
}
/* Error code means? */
static int32_t
-cib_ipc_closed(qb_ipcs_connection_t *c)
+cib_ipc_closed(qb_ipcs_connection_t *c)
{
crm_client_t *client = crm_client_get(c);
crm_trace("Connection %p", c);
crm_client_destroy(client);
return 0;
}
static void
-cib_ipc_destroy(qb_ipcs_connection_t *c)
+cib_ipc_destroy(qb_ipcs_connection_t *c)
{
crm_trace("Connection %p", c);
if (cib_shutdown_flag) {
cib_shutdown(0);
}
}
-struct qb_ipcs_service_handlers ipc_ro_callbacks =
+struct qb_ipcs_service_handlers ipc_ro_callbacks =
{
.connection_accept = cib_ipc_accept,
.connection_created = cib_ipc_created,
.msg_process = cib_ipc_dispatch_ro,
.connection_closed = cib_ipc_closed,
.connection_destroyed = cib_ipc_destroy
};
-struct qb_ipcs_service_handlers ipc_rw_callbacks =
+struct qb_ipcs_service_handlers ipc_rw_callbacks =
{
.connection_accept = cib_ipc_accept,
.connection_created = cib_ipc_created,
.msg_process = cib_ipc_dispatch_rw,
.connection_closed = cib_ipc_closed,
.connection_destroyed = cib_ipc_destroy
};
void
cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, crm_client_t * cib_client, gboolean privileged)
{
const char *op = crm_element_value(op_request, F_CIB_OPERATION);
if (crm_str_eq(op, CRM_OP_REGISTER, TRUE)) {
if(flags & crm_ipc_client_response) {
xmlNode *ack = create_xml_node(NULL, __FUNCTION__);
crm_xml_add(ack, F_CIB_OPERATION, CRM_OP_REGISTER);
crm_xml_add(ack, F_CIB_CLIENTID, cib_client->id);
crm_ipcs_send(cib_client, id, ack, FALSE);
cib_client->request_id = 0;
free_xml(ack);
}
return;
} else if (crm_str_eq(op, T_CIB_NOTIFY, TRUE)) {
/* Update the notify filters for this client */
int on_off = 0;
long long bit = 0;
const char *type = crm_element_value(op_request, F_CIB_NOTIFY_TYPE);
crm_element_value_int(op_request, F_CIB_NOTIFY_ACTIVATE, &on_off);
crm_debug("Setting %s callbacks for %s (%s): %s",
type, cib_client->name, cib_client->id, on_off ? "on" : "off");
if (safe_str_eq(type, T_CIB_POST_NOTIFY)) {
bit = cib_notify_post;
} else if (safe_str_eq(type, T_CIB_PRE_NOTIFY)) {
bit = cib_notify_pre;
} else if (safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) {
bit = cib_notify_confirm;
} else if (safe_str_eq(type, T_CIB_DIFF_NOTIFY)) {
bit = cib_notify_diff;
} else if (safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) {
bit = cib_notify_replace;
}
if(on_off) {
set_bit(cib_client->options, bit);
} else {
clear_bit(cib_client->options, bit);
}
if(flags & crm_ipc_client_response) {
/* TODO - include rc */
crm_ipcs_send_ack(cib_client, id, "ack", __FUNCTION__, __LINE__);
cib_client->request_id = 0;
}
return;
}
cib_process_request(op_request, FALSE, privileged, FALSE, cib_client);
}
int32_t
cib_common_callback(qb_ipcs_connection_t *c, void *data, size_t size, gboolean privileged)
{
uint32_t id = 0;
uint32_t flags = 0;
int call_options = 0;
crm_client_t *cib_client = crm_client_get(c);
xmlNode *op_request = crm_ipcs_recv(cib_client, data, size, &id, &flags);
if(op_request) {
crm_element_value_int(op_request, F_CIB_CALLOPTS, &call_options);
}
crm_trace("Inbound: %.200s", data);
if (op_request == NULL || cib_client == NULL) {
crm_ipcs_send_ack(cib_client, id, "nack", __FUNCTION__, __LINE__);
return 0;
}
if(is_set(call_options, cib_sync_call)) {
CRM_ASSERT(flags & crm_ipc_client_response);
}
if(flags & crm_ipc_client_response) {
CRM_LOG_ASSERT(cib_client->request_id == 0); /* This means the client has two synchronous events in-flight */
cib_client->request_id = id; /* Reply only to the last one */
}
if (cib_client->name == NULL) {
const char *value = crm_element_value(op_request, F_CIB_CLIENTNAME);
if (value == NULL) {
cib_client->name = crm_itoa(cib_client->pid);
} else {
cib_client->name = strdup(value);
}
}
crm_xml_add(op_request, F_CIB_CLIENTID, cib_client->id);
crm_xml_add(op_request, F_CIB_CLIENTNAME, cib_client->name);
#if ENABLE_ACL
determine_request_user(cib_client->user, op_request, F_CIB_USER);
#endif
crm_log_xml_trace(op_request, "Client[inbound]");
cib_common_callback_worker(id, flags, op_request, cib_client, privileged);
free_xml(op_request);
return 0;
}
static void
do_local_notify(xmlNode * notify_src, const char *client_id,
gboolean sync_reply, gboolean from_peer)
{
/* send callback to originating child */
crm_client_t *client_obj = NULL;
int local_rc = pcmk_ok;
if (client_id != NULL) {
client_obj = crm_client_get_by_id(client_id);
}
if (client_obj == NULL) {
local_rc = -ECONNRESET;
crm_trace("No client to sent the response to. F_CIB_CLIENTID not set.");
} else {
int rid = 0;
if(sync_reply) {
if (client_obj->ipcs) {
CRM_LOG_ASSERT(client_obj->request_id);
rid = client_obj->request_id;
client_obj->request_id = 0;
crm_trace("Sending response %d to %s %s",
rid, client_obj->name, from_peer?"(originator of delegated request)":"");
} else {
crm_trace("Sending response to %s %s",
client_obj->name, from_peer?"(originator of delegated request)":"");
}
} else {
crm_trace("Sending an event to %s %s",
client_obj->name, from_peer?"(originator of delegated request)":"");
}
switch(client_obj->kind) {
case CRM_CLIENT_IPC:
if (crm_ipcs_send(client_obj, rid, notify_src, !sync_reply) < 0) {
local_rc = -ENOMSG;
}
break;
# ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
# endif
case CRM_CLIENT_TCP:
crm_remote_send(client_obj->remote, notify_src);
break;
default:
crm_err("Unknown transport %d for %s", client_obj->kind, client_obj->name);
}
}
if (local_rc != pcmk_ok && client_obj != NULL) {
crm_warn("%sSync reply to %s failed: %s",
sync_reply ? "" : "A-",
client_obj ? client_obj->name : "<unknown>", pcmk_strerror(local_rc));
}
}
static void
local_notify_destroy_callback(gpointer data)
{
cib_local_notify_t *notify = data;
free_xml(notify->notify_src);
free(notify->client_id);
free(notify);
}
static void
check_local_notify(int bcast_id)
{
cib_local_notify_t *notify = NULL;
if (!local_notify_queue) {
return;
}
notify = g_hash_table_lookup(local_notify_queue, GINT_TO_POINTER(bcast_id));
if (notify) {
do_local_notify(notify->notify_src, notify->client_id, notify->sync_reply, notify->from_peer);
g_hash_table_remove(local_notify_queue, GINT_TO_POINTER(bcast_id));
}
}
static void
queue_local_notify(xmlNode * notify_src, const char *client_id, gboolean sync_reply, gboolean from_peer)
{
cib_local_notify_t *notify = calloc(1, sizeof(cib_local_notify_t));
notify->notify_src = notify_src;
notify->client_id = strdup(client_id);
notify->sync_reply = sync_reply;
notify->from_peer = from_peer;
if (!local_notify_queue) {
local_notify_queue = g_hash_table_new_full(g_direct_hash,
g_direct_equal, NULL, local_notify_destroy_callback);
}
g_hash_table_insert(local_notify_queue, GINT_TO_POINTER(cib_local_bcast_num), notify);
}
static void
parse_local_options(crm_client_t * cib_client, int call_type, int call_options, const char *host,
const char *op, gboolean * local_notify, gboolean * needs_reply,
gboolean * process, gboolean * needs_forward)
{
if (cib_op_modifies(call_type)
&& !(call_options & cib_inhibit_bcast)) {
/* we need to send an update anyway */
*needs_reply = TRUE;
} else {
*needs_reply = FALSE;
}
if (host == NULL && (call_options & cib_scope_local)) {
crm_trace("Processing locally scoped %s op from %s", op, cib_client->name);
*local_notify = TRUE;
} else if (host == NULL && cib_is_master) {
crm_trace("Processing master %s op locally from %s", op, cib_client->name);
*local_notify = TRUE;
} else if (safe_str_eq(host, cib_our_uname)) {
crm_trace("Processing locally addressed %s op from %s", op, cib_client->name);
*local_notify = TRUE;
} else if (stand_alone) {
*needs_forward = FALSE;
*local_notify = TRUE;
*process = TRUE;
} else {
crm_trace("%s op from %s needs to be forwarded to %s",
op, cib_client->name, host ? host : "the master instance");
*needs_forward = TRUE;
*process = FALSE;
}
}
static gboolean
parse_peer_options(int call_type, xmlNode * request,
gboolean * local_notify, gboolean * needs_reply, gboolean * process,
gboolean * needs_forward)
{
const char *op = NULL;
const char *host = NULL;
const char *delegated = NULL;
const char *originator = crm_element_value(request, F_ORIG);
const char *reply_to = crm_element_value(request, F_CIB_ISREPLY);
const char *update = crm_element_value(request, F_CIB_GLOBAL_UPDATE);
gboolean is_reply = safe_str_eq(reply_to, cib_our_uname);
if (crm_is_true(update)) {
*needs_reply = FALSE;
if (is_reply) {
*local_notify = TRUE;
crm_trace("Processing global/peer update from %s"
" that originated from us", originator);
} else {
crm_trace("Processing global/peer update from %s", originator);
}
return TRUE;
}
host = crm_element_value(request, F_CIB_HOST);
if (host != NULL && safe_str_eq(host, cib_our_uname)) {
crm_trace("Processing request sent to us from %s", originator);
return TRUE;
} else if (host == NULL && cib_is_master == TRUE) {
crm_trace("Processing request sent to master instance from %s", originator);
return TRUE;
}
op = crm_element_value(request, F_CIB_OPERATION);
if(safe_str_eq(op, "cib_shutdown_req")) {
/* Always process these */
*local_notify = FALSE;
if(reply_to == NULL || is_reply) {
*process = TRUE;
}
if(is_reply) {
*needs_reply = FALSE;
}
return *process;
}
if (is_reply) {
crm_trace("Forward reply sent from %s to local clients", originator);
*process = FALSE;
*needs_reply = FALSE;
*local_notify = TRUE;
return TRUE;
}
delegated = crm_element_value(request, F_CIB_DELEGATED);
if (delegated != NULL) {
crm_trace("Ignoring msg for master instance");
} else if (host != NULL) {
/* this is for a specific instance and we're not it */
crm_trace("Ignoring msg for instance on %s", crm_str(host));
} else if (reply_to == NULL && cib_is_master == FALSE) {
/* this is for the master instance and we're not it */
crm_trace("Ignoring reply to %s", crm_str(reply_to));
} else if (safe_str_eq(op, "cib_shutdown_req")) {
if (reply_to != NULL) {
crm_debug("Processing %s from %s", op, host);
*needs_reply = FALSE;
} else {
crm_debug("Processing %s reply from %s", op, host);
}
return TRUE;
} else {
crm_err("Nothing for us to do?");
crm_log_xml_err(request, "Peer[inbound]");
}
return FALSE;
}
static void
forward_request(xmlNode * request, crm_client_t * cib_client, int call_options)
{
const char *op = crm_element_value(request, F_CIB_OPERATION);
const char *host = crm_element_value(request, F_CIB_HOST);
crm_xml_add(request, F_CIB_DELEGATED, cib_our_uname);
if (host != NULL) {
crm_trace("Forwarding %s op to %s", op, host);
send_cluster_message(crm_get_peer(0, host), crm_msg_cib, request, FALSE);
} else {
crm_trace("Forwarding %s op to master instance", op);
send_cluster_message(NULL, crm_msg_cib, request, FALSE);
}
/* Return the request to its original state */
xml_remove_prop(request, F_CIB_DELEGATED);
if (call_options & cib_discard_reply) {
crm_trace("Client not interested in reply");
}
}
static gboolean
send_peer_reply(xmlNode * msg, xmlNode * result_diff, const char *originator, gboolean broadcast)
{
CRM_ASSERT(msg != NULL);
if (broadcast) {
/* this (successful) call modified the CIB _and_ the
* change needs to be broadcast...
* send via HA to other nodes
*/
int diff_add_updates = 0;
int diff_add_epoch = 0;
int diff_add_admin_epoch = 0;
int diff_del_updates = 0;
int diff_del_epoch = 0;
int diff_del_admin_epoch = 0;
const char *digest = NULL;
CRM_LOG_ASSERT(result_diff != NULL);
digest = crm_element_value(result_diff, XML_ATTR_DIGEST);
cib_diff_version_details(result_diff,
&diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
&diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
crm_trace("Sending update diff %d.%d.%d -> %d.%d.%d %s",
diff_del_admin_epoch, diff_del_epoch, diff_del_updates,
diff_add_admin_epoch, diff_add_epoch, diff_add_updates, digest);
crm_xml_add(msg, F_CIB_ISREPLY, originator);
crm_xml_add(msg, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE);
crm_xml_add(msg, F_CIB_OPERATION, CIB_OP_APPLY_DIFF);
CRM_ASSERT(digest != NULL);
add_message_xml(msg, F_CIB_UPDATE_DIFF, result_diff);
crm_log_xml_explicit(msg, "copy");
return send_cluster_message(NULL, crm_msg_cib, msg, TRUE);
} else if (originator != NULL) {
/* send reply via HA to originating node */
crm_trace("Sending request result to originator only");
crm_xml_add(msg, F_CIB_ISREPLY, originator);
return send_cluster_message(crm_get_peer(0, originator), crm_msg_cib, msg, FALSE);
}
return FALSE;
}
void
cib_process_request(xmlNode * request, gboolean force_synchronous, gboolean privileged,
gboolean unused, crm_client_t * cib_client)
{
int call_type = 0;
int call_options = 0;
gboolean process = TRUE;
gboolean is_update = TRUE;
gboolean from_peer = TRUE;
gboolean needs_reply = TRUE;
gboolean local_notify = FALSE;
gboolean needs_forward = FALSE;
gboolean global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE));
xmlNode *op_reply = NULL;
xmlNode *result_diff = NULL;
int rc = pcmk_ok;
const char *op = crm_element_value(request, F_CIB_OPERATION);
const char *originator = crm_element_value(request, F_ORIG);
const char *host = crm_element_value(request, F_CIB_HOST);
const char *target = NULL;
const char *client_id = crm_element_value(request, F_CIB_CLIENTID);
if(cib_client) {
from_peer = FALSE;
}
cib_num_ops++;
if (cib_num_ops == 0) {
cib_num_fail = 0;
cib_num_local = 0;
cib_num_updates = 0;
crm_info("Stats wrapped around");
}
crm_element_value_int(request, F_CIB_CALLOPTS, &call_options);
if (force_synchronous) {
call_options |= cib_sync_call;
}
if (host != NULL && strlen(host) == 0) {
host = NULL;
}
if(host) {
target = host;
} else if(call_options & cib_scope_local) {
target = "local host";
} else {
target = "master";
}
if(from_peer) {
crm_trace("Processing peer %s operation from %s on %s intended for %s",
op, client_id, originator, target);
} else {
crm_xml_add(request, F_ORIG, cib_our_uname);
crm_trace("Processing local %s operation from %s intended for %s",
op, client_id, target);
}
rc = cib_get_operation_id(op, &call_type);
if (rc != pcmk_ok) {
/* TODO: construct error reply? */
crm_err("Pre-processing of command failed: %s", pcmk_strerror(rc));
return;
}
if (from_peer == FALSE) {
parse_local_options(cib_client, call_type, call_options, host, op,
&local_notify, &needs_reply, &process, &needs_forward);
} else if (parse_peer_options(call_type, request, &local_notify,
&needs_reply, &process, &needs_forward) == FALSE) {
return;
}
is_update = cib_op_modifies(call_type);
if (is_update) {
cib_num_updates++;
}
if (call_options & cib_discard_reply) {
needs_reply = is_update;
local_notify = FALSE;
}
if (needs_forward) {
const char *host = crm_element_value(request, F_CIB_HOST);
const char *section = crm_element_value(request, F_CIB_SECTION);
crm_info("Forwarding %s operation for section %s to %s (origin=%s/%s/%s)",
op,
section ? section : "'all'",
host ? host : "master",
originator ? originator : "local",
crm_element_value(request, F_CIB_CLIENTNAME),
crm_element_value(request, F_CIB_CALLID));
forward_request(request, cib_client, call_options);
return;
}
if (cib_status != pcmk_ok) {
rc = cib_status;
crm_err("Operation ignored, cluster configuration is invalid."
" Please repair and restart: %s", pcmk_strerror(cib_status));
op_reply = cib_construct_reply(request, the_cib, cib_status);
} else if (process) {
int now = time(NULL);
int level = LOG_INFO;
const char *section = crm_element_value(request, F_CIB_SECTION);
cib_num_local++;
rc = cib_process_command(request, &op_reply, &result_diff, privileged);
if (global_update) {
switch (rc) {
case pcmk_ok:
level = LOG_INFO;
break;
case -pcmk_err_old_data:
case -pcmk_err_diff_resync:
case -pcmk_err_diff_failed:
level = LOG_TRACE;
break;
default:
level = LOG_ERR;
}
} else if (rc != pcmk_ok && is_update) {
cib_num_fail++;
level = LOG_WARNING;
/*
} else if (safe_str_eq(op, CIB_OP_QUERY)) {
level = LOG_TRACE;
} else if (safe_str_eq(op, CIB_OP_SLAVE)) {
level = LOG_TRACE;
} else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) {
level = LOG_TRACE;
*/
}
do_crm_log(level,
"Completed %s operation for section %s: %s (rc=%d, origin=%s/%s/%s, version=%s.%s.%s)",
op, section ? section : "'all'", pcmk_strerror(rc), rc,
originator ? originator : "local",
crm_element_value(request, F_CIB_CLIENTNAME),
crm_element_value(request, F_CIB_CALLID),
the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION_ADMIN) : "0",
the_cib ? crm_element_value(the_cib, XML_ATTR_GENERATION) : "0",
the_cib ? crm_element_value(the_cib, XML_ATTR_NUMUPDATES) : "0"
);
if((now + 1) < time(NULL)) {
crm_write_blackbox(0, NULL);
}
if (op_reply == NULL && (needs_reply || local_notify)) {
crm_err("Unexpected NULL reply to message");
crm_log_xml_err(request, "null reply");
needs_reply = FALSE;
local_notify = FALSE;
}
}
/* from now on we are the server */
if (needs_reply == FALSE || stand_alone) {
/* nothing more to do...
* this was a non-originating slave update
*/
crm_trace("Completed slave update");
} else if (rc == pcmk_ok && result_diff != NULL && !(call_options & cib_inhibit_bcast)) {
gboolean broadcast = FALSE;
cib_local_bcast_num++;
crm_xml_add_int(request, F_CIB_LOCAL_NOTIFY_ID, cib_local_bcast_num);
broadcast = send_peer_reply(request, result_diff, originator, TRUE);
if (broadcast &&
client_id &&
local_notify &&
op_reply) {
/* If we have been asked to sync the reply,
* and a bcast msg has gone out, we queue the local notify
* until we know the bcast message has been received */
local_notify = FALSE;
crm_trace("Queuing local %ssync notification for %s", (call_options & cib_sync_call)?"":"a-", client_id);
queue_local_notify(op_reply, client_id, (call_options & cib_sync_call), from_peer);
op_reply = NULL; /* the reply is queued, so don't free here */
}
} else if (call_options & cib_discard_reply) {
crm_trace("Caller isn't interested in reply");
} else if (from_peer) {
if (is_update == FALSE || result_diff == NULL) {
crm_trace("Request not broadcast: R/O call");
} else if (call_options & cib_inhibit_bcast) {
crm_trace("Request not broadcast: inhibited");
} else if (rc != pcmk_ok) {
crm_trace("Request not broadcast: call failed: %s", pcmk_strerror(rc));
} else {
crm_trace("Directing reply to %s", originator);
}
send_peer_reply(op_reply, result_diff, originator, FALSE);
}
if (local_notify && client_id) {
crm_trace("Performing local %ssync notification for %s", (call_options & cib_sync_call)?"":"a-", client_id);
if (process == FALSE) {
do_local_notify(request, client_id, call_options & cib_sync_call, from_peer);
} else {
do_local_notify(op_reply, client_id, call_options & cib_sync_call, from_peer);
}
}
free_xml(op_reply);
free_xml(result_diff);
return;
}
xmlNode *
cib_construct_reply(xmlNode * request, xmlNode * output, int rc)
{
int lpc = 0;
xmlNode *reply = NULL;
const char *name = NULL;
const char *value = NULL;
const char *names[] = {
F_CIB_OPERATION,
F_CIB_CALLID,
F_CIB_CLIENTID,
F_CIB_CALLOPTS
};
static int max = DIMOF(names);
crm_trace("Creating a basic reply");
reply = create_xml_node(NULL, "cib-reply");
crm_xml_add(reply, F_TYPE, T_CIB);
for (lpc = 0; lpc < max; lpc++) {
name = names[lpc];
value = crm_element_value(request, name);
crm_xml_add(reply, name, value);
}
crm_xml_add_int(reply, F_CIB_RC, rc);
if (output != NULL) {
crm_trace("Attaching reply output");
add_message_xml(reply, F_CIB_CALLDATA, output);
}
return reply;
}
int
cib_process_command(xmlNode * request, xmlNode ** reply, xmlNode ** cib_diff, gboolean privileged)
{
xmlNode *input = NULL;
xmlNode *output = NULL;
xmlNode *result_cib = NULL;
xmlNode *current_cib = NULL;
#if ENABLE_ACL
xmlNode *filtered_current_cib = NULL;
#endif
int call_type = 0;
int call_options = 0;
int log_level = LOG_TRACE;
const char *op = NULL;
const char *section = NULL;
int rc = pcmk_ok;
int rc2 = pcmk_ok;
gboolean send_r_notify = FALSE;
gboolean global_update = FALSE;
gboolean config_changed = FALSE;
gboolean manage_counters = TRUE;
CRM_ASSERT(cib_status == pcmk_ok);
*reply = NULL;
*cib_diff = NULL;
current_cib = the_cib;
/* Start processing the request... */
op = crm_element_value(request, F_CIB_OPERATION);
crm_element_value_int(request, F_CIB_CALLOPTS, &call_options);
rc = cib_get_operation_id(op, &call_type);
if (rc == pcmk_ok && privileged == FALSE) {
rc = cib_op_can_run(call_type, call_options, privileged, global_update);
}
rc2 = cib_op_prepare(call_type, request, &input, §ion);
if (rc == pcmk_ok) {
rc = rc2;
}
if (rc != pcmk_ok) {
crm_trace("Call setup failed: %s", pcmk_strerror(rc));
goto done;
} else if (cib_op_modifies(call_type) == FALSE) {
#if ENABLE_ACL
if (acl_enabled(config_hash) == FALSE
|| acl_filter_cib(request, current_cib, current_cib, &filtered_current_cib) == FALSE) {
rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE,
section, request, input, FALSE, &config_changed,
current_cib, &result_cib, NULL, &output);
} else if (filtered_current_cib == NULL) {
crm_debug("Pre-filtered the entire cib");
rc = -EACCES;
} else {
crm_debug("Pre-filtered the queried cib according to the ACLs");
rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE,
section, request, input, FALSE, &config_changed,
filtered_current_cib, &result_cib, NULL, &output);
}
#else
rc = cib_perform_op(op, call_options, cib_op_func(call_type), TRUE,
section, request, input, FALSE, &config_changed,
current_cib, &result_cib, NULL, &output);
#endif
CRM_CHECK(result_cib == NULL, free_xml(result_cib));
goto done;
}
/* Handle a valid write action */
global_update = crm_is_true(crm_element_value(request, F_CIB_GLOBAL_UPDATE));
if (global_update) {
manage_counters = FALSE;
call_options |= cib_force_diff;
CRM_CHECK(call_type == 3 || call_type == 4, crm_err("Call type: %d", call_type);
crm_log_xml_err(request, "bad op"));
}
#ifdef SUPPORT_PRENOTIFY
if ((call_options & cib_inhibit_notify) == 0) {
cib_pre_notify(call_options, op, the_cib, input);
}
#endif
if (rc == pcmk_ok) {
if (call_options & cib_inhibit_bcast) {
/* skip */
crm_trace("Skipping update: inhibit broadcast");
manage_counters = FALSE;
}
/* result_cib must not be modified after cib_perform_op() returns */
rc = cib_perform_op(op, call_options, cib_op_func(call_type), FALSE,
section, request, input, manage_counters, &config_changed,
current_cib, &result_cib, cib_diff, &output);
#if ENABLE_ACL
if (acl_enabled(config_hash) == TRUE
&& acl_check_diff(request, current_cib, result_cib, *cib_diff) == FALSE) {
rc = -EACCES;
}
#endif
if (manage_counters == FALSE) {
/* If the diff is NULL at this point, its because nothing changed */
config_changed = cib_config_changed(NULL, NULL, cib_diff);
}
/* Always write to disk for replace ops,
* this also negates the need to detect ordering changes
*/
if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) {
config_changed = TRUE;
}
}
if (rc == pcmk_ok && (call_options & cib_dryrun) == 0) {
rc = activateCibXml(result_cib, config_changed, op);
if (rc == pcmk_ok && cib_internal_config_changed(*cib_diff)) {
cib_read_config(config_hash, result_cib);
}
if (crm_str_eq(CIB_OP_REPLACE, op, TRUE)) {
if (section == NULL) {
send_r_notify = TRUE;
} else if (safe_str_eq(section, XML_TAG_CIB)) {
send_r_notify = TRUE;
} else if (safe_str_eq(section, XML_CIB_TAG_NODES)) {
send_r_notify = TRUE;
} else if (safe_str_eq(section, XML_CIB_TAG_STATUS)) {
send_r_notify = TRUE;
}
} else if (crm_str_eq(CIB_OP_ERASE, op, TRUE)) {
send_r_notify = TRUE;
}
} else if (rc == -pcmk_err_dtd_validation) {
if (output != NULL) {
crm_log_xml_info(output, "cib:output");
free_xml(output);
}
#if ENABLE_ACL
{
xmlNode *filtered_result_cib = NULL;
if (acl_enabled(config_hash) == FALSE
|| acl_filter_cib(request, current_cib, result_cib,
&filtered_result_cib) == FALSE) {
output = result_cib;
} else {
crm_debug("Filtered the result cib for output according to the ACLs");
output = filtered_result_cib;
if (result_cib != NULL) {
free_xml(result_cib);
}
}
}
#else
output = result_cib;
#endif
} else {
free_xml(result_cib);
}
if ((call_options & cib_inhibit_notify) == 0) {
const char *call_id = crm_element_value(request, F_CIB_CALLID);
const char *client = crm_element_value(request, F_CIB_CLIENTNAME);
#ifdef SUPPORT_POSTNOTIFY
cib_post_notify(call_options, op, input, rc, the_cib);
#endif
cib_diff_notify(call_options, client, call_id, op, input, rc, *cib_diff);
}
if (send_r_notify) {
const char *origin = crm_element_value(request, F_ORIG);
cib_replace_notify(origin, the_cib, rc, *cib_diff);
}
if (rc != pcmk_ok) {
log_level = LOG_TRACE;
if (rc == -pcmk_err_dtd_validation && global_update) {
log_level = LOG_WARNING;
crm_log_xml_info(input, "cib:global_update");
}
} else if (config_changed) {
log_level = LOG_TRACE;
if (cib_is_master) {
log_level = LOG_NOTICE;
}
} else if (cib_is_master) {
log_level = LOG_TRACE;
}
log_cib_diff(log_level, *cib_diff, "cib:diff");
done:
if ((call_options & cib_discard_reply) == 0) {
*reply = cib_construct_reply(request, output, rc);
crm_log_xml_explicit(*reply, "cib:reply");
}
#if ENABLE_ACL
if (filtered_current_cib != NULL) {
free_xml(filtered_current_cib);
}
#endif
if (call_type >= 0) {
cib_op_cleanup(call_type, call_options, &input, &output);
}
return rc;
}
gint
cib_GCompareFunc(gconstpointer a, gconstpointer b)
{
const xmlNode *a_msg = a;
const xmlNode *b_msg = b;
int msg_a_id = 0;
int msg_b_id = 0;
const char *value = NULL;
value = crm_element_value_const(a_msg, F_CIB_CALLID);
msg_a_id = crm_parse_int(value, NULL);
value = crm_element_value_const(b_msg, F_CIB_CALLID);
msg_b_id = crm_parse_int(value, NULL);
if (msg_a_id == msg_b_id) {
return 0;
} else if (msg_a_id < msg_b_id) {
return -1;
}
return 1;
}
#if SUPPORT_HEARTBEAT
void
cib_ha_peer_callback(HA_Message * msg, void *private_data)
{
xmlNode *xml = convert_ha_message(NULL, msg, __FUNCTION__);
cib_peer_callback(xml, private_data);
free_xml(xml);
}
#endif
void
cib_peer_callback(xmlNode * msg, void *private_data)
{
const char *reason = NULL;
const char *originator = crm_element_value(msg, F_ORIG);
if (originator == NULL || crm_str_eq(originator, cib_our_uname, TRUE)) {
/* message is from ourselves */
int bcast_id = 0;
if (!(crm_element_value_int(msg, F_CIB_LOCAL_NOTIFY_ID, &bcast_id))) {
check_local_notify(bcast_id);
}
return;
} else if (crm_peer_cache == NULL) {
reason = "membership not established";
goto bail;
}
if (crm_element_value(msg, F_CIB_CLIENTNAME) == NULL) {
crm_xml_add(msg, F_CIB_CLIENTNAME, originator);
}
/* crm_log_xml_trace("Peer[inbound]", msg); */
cib_process_request(msg, FALSE, TRUE, TRUE, NULL);
return;
bail:
if (reason) {
const char *seq = crm_element_value(msg, F_SEQ);
const char *op = crm_element_value(msg, F_CIB_OPERATION);
crm_warn("Discarding %s message (%s) from %s: %s", op, seq, originator, reason);
}
}
#if SUPPORT_HEARTBEAT
extern oc_ev_t *cib_ev_token;
static void *ccm_library = NULL;
int (*ccm_api_callback_done) (void *cookie) = NULL;
int (*ccm_api_handle_event) (const oc_ev_t * token) = NULL;
void
cib_client_status_callback(const char *node, const char *client, const char *status, void *private)
{
crm_node_t *peer = NULL;
if (safe_str_eq(client, CRM_SYSTEM_CIB)) {
crm_info("Status update: Client %s/%s now has status [%s]", node, client, status);
if (safe_str_eq(status, JOINSTATUS)) {
status = ONLINESTATUS;
} else if (safe_str_eq(status, LEAVESTATUS)) {
status = OFFLINESTATUS;
}
peer = crm_get_peer(0, node);
crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cib, status);
}
return;
}
int
cib_ccm_dispatch(gpointer user_data)
{
int rc = 0;
oc_ev_t *ccm_token = (oc_ev_t *) user_data;
crm_trace("received callback");
if (ccm_api_handle_event == NULL) {
ccm_api_handle_event =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_handle_event", 1);
}
rc = (*ccm_api_handle_event) (ccm_token);
if (0 == rc) {
return 0;
}
crm_err("CCM connection appears to have failed: rc=%d.", rc);
/* eventually it might be nice to recover and reconnect... but until then... */
crm_err("Exiting to recover from CCM connection failure");
crm_exit(2);
return -1;
}
int current_instance = 0;
void
cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data)
{
gboolean update_id = FALSE;
const oc_ev_membership_t *membership = data;
CRM_ASSERT(membership != NULL);
crm_info("Processing CCM event=%s (id=%d)", ccm_event_name(event), membership->m_instance);
if (current_instance > membership->m_instance) {
crm_err("Membership instance ID went backwards! %d->%d",
current_instance, membership->m_instance);
CRM_ASSERT(current_instance <= membership->m_instance);
}
switch (event) {
case OC_EV_MS_NEW_MEMBERSHIP:
case OC_EV_MS_INVALID:
update_id = TRUE;
break;
case OC_EV_MS_PRIMARY_RESTORED:
update_id = TRUE;
break;
case OC_EV_MS_NOT_PRIMARY:
crm_trace("Ignoring transitional CCM event: %s", ccm_event_name(event));
break;
case OC_EV_MS_EVICTED:
crm_err("Evicted from CCM: %s", ccm_event_name(event));
break;
default:
crm_err("Unknown CCM event: %d", event);
}
if (update_id) {
unsigned int lpc = 0;
CRM_CHECK(membership != NULL, return);
current_instance = membership->m_instance;
for (lpc = 0; lpc < membership->m_n_out; lpc++) {
crm_update_ccm_node(membership, lpc + membership->m_out_idx, CRM_NODE_LOST,
current_instance);
}
for (lpc = 0; lpc < membership->m_n_member; lpc++) {
crm_update_ccm_node(membership, lpc + membership->m_memb_idx, CRM_NODE_ACTIVE,
current_instance);
}
}
if (ccm_api_callback_done == NULL) {
ccm_api_callback_done =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_callback_done", 1);
}
(*ccm_api_callback_done) (cookie);
return;
}
#endif
gboolean
can_write(int flags)
{
return TRUE;
}
static gboolean
cib_force_exit(gpointer data)
{
crm_notice("Forcing exit!");
terminate_cib(__FUNCTION__, TRUE);
return FALSE;
}
static void
disconnect_remote_client(gpointer key, gpointer value, gpointer user_data)
{
crm_client_t *a_client = value;
crm_err("Disconnecting %s... Not implemented", crm_str(a_client->name));
}
void
cib_shutdown(int nsig)
{
struct qb_ipcs_stats srv_stats;
if (cib_shutdown_flag == FALSE) {
int disconnects = 0;
qb_ipcs_connection_t *c = NULL;
cib_shutdown_flag = TRUE;
c = qb_ipcs_connection_first_get(ipcs_rw);
while(c != NULL) {
qb_ipcs_connection_t *last = c;
c = qb_ipcs_connection_next_get(ipcs_rw, last);
crm_debug("Disconnecting r/w client %p...", last);
qb_ipcs_disconnect(last);
qb_ipcs_connection_unref(last);
disconnects++;
}
c = qb_ipcs_connection_first_get(ipcs_ro);
while(c != NULL) {
qb_ipcs_connection_t *last = c;
c = qb_ipcs_connection_next_get(ipcs_ro, last);
crm_debug("Disconnecting r/o client %p...", last);
qb_ipcs_disconnect(last);
qb_ipcs_connection_unref(last);
disconnects++;
}
c = qb_ipcs_connection_first_get(ipcs_shm);
while(c != NULL) {
qb_ipcs_connection_t *last = c;
c = qb_ipcs_connection_next_get(ipcs_shm, last);
crm_debug("Disconnecting non-blocking r/w client %p...", last);
qb_ipcs_disconnect(last);
qb_ipcs_connection_unref(last);
disconnects++;
}
disconnects += crm_hash_table_size(client_connections);
crm_debug("Disconnecting %d remote clients", crm_hash_table_size(client_connections));
g_hash_table_foreach(client_connections, disconnect_remote_client, NULL);
crm_info("Disconnected %d clients", disconnects);
}
qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE);
-
+
if(crm_hash_table_size(client_connections) == 0) {
crm_info("All clients disconnected (%d)", srv_stats.active_connections);
initiate_exit();
-
+
} else {
crm_info("Waiting on %d clients to disconnect (%d)",
crm_hash_table_size(client_connections), srv_stats.active_connections);
}
}
void
initiate_exit(void)
{
int active = 0;
xmlNode *leaving = NULL;
active = crm_active_peers();
if (active < 2) {
terminate_cib(__FUNCTION__, FALSE);
return;
}
crm_info("Sending disconnect notification to %d peers...", active);
leaving = create_xml_node(NULL, "exit-notification");
crm_xml_add(leaving, F_TYPE, "cib");
crm_xml_add(leaving, F_CIB_OPERATION, "cib_shutdown_req");
send_cluster_message(NULL, crm_msg_cib, leaving, TRUE);
free_xml(leaving);
g_timeout_add(crm_get_msec("5s"), cib_force_exit, NULL);
}
extern int remote_fd;
extern int remote_tls_fd;
extern void terminate_cs_connection(void);
void
terminate_cib(const char *caller, gboolean fast)
{
if (remote_fd > 0) {
close(remote_fd);
remote_fd = 0;
}
if (remote_tls_fd > 0) {
close(remote_tls_fd);
remote_tls_fd = 0;
}
-
+
if(!fast) {
crm_info("%s: Disconnecting from cluster infrastructure", caller);
crm_cluster_disconnect(&crm_cluster);
}
uninitializeCib();
crm_info("%s: Exiting%s...", caller, fast?" fast":mainloop?" from mainloop":"");
if(fast == FALSE && mainloop != NULL && g_main_is_running(mainloop)) {
g_main_quit(mainloop);
} else {
qb_ipcs_destroy(ipcs_ro);
qb_ipcs_destroy(ipcs_rw);
qb_ipcs_destroy(ipcs_shm);
if (fast) {
crm_exit(EX_USAGE);
} else {
crm_exit(EX_OK);
}
}
}
diff --git a/cib/callbacks.h b/cib/callbacks.h
index f2f95cc37b..8a58213aa1 100644
--- a/cib/callbacks.h
+++ b/cib/callbacks.h
@@ -1,77 +1,78 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <crm/crm.h>
#include <crm/cib.h>
#include <crm/common/xml.h>
#include <crm/cluster.h>
+#include <crm/common/ipcs.h>
#include <crm/common/mainloop.h>
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include <gnutls/gnutls.h>
#endif
extern gboolean cib_is_master;
extern GHashTable *peer_hash;
extern GHashTable *config_hash;
-enum cib_notifications
+enum cib_notifications
{
cib_notify_pre = 0x0001,
cib_notify_post = 0x0002,
cib_notify_replace = 0x0004,
cib_notify_confirm = 0x0008,
cib_notify_diff = 0x0010,
};
typedef struct cib_operation_s {
const char *operation;
gboolean modifies_cib;
gboolean needs_privileges;
gboolean needs_quorum;
int (*prepare) (xmlNode *, xmlNode **, const char **);
int (*cleanup) (int, xmlNode **, xmlNode **);
int (*fn) (const char *, int, const char *, xmlNode *,
xmlNode *, xmlNode *, xmlNode **, xmlNode **);
} cib_operation_t;
extern struct qb_ipcs_service_handlers ipc_ro_callbacks;
extern struct qb_ipcs_service_handlers ipc_rw_callbacks;
extern qb_ipcs_service_t *ipcs_ro;
extern qb_ipcs_service_t *ipcs_rw;
extern qb_ipcs_service_t *ipcs_shm;
extern void cib_peer_callback(xmlNode * msg, void *private_data);
extern void cib_client_status_callback(const char *node, const char *client,
const char *status, void *private);
extern void cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, crm_client_t * cib_client, gboolean privileged);
void cib_shutdown(int nsig);
void initiate_exit(void);
void terminate_cib(const char *caller, gboolean fast);
#if SUPPORT_HEARTBEAT
extern void cib_ha_peer_callback(HA_Message * msg, void *private_data);
extern int cib_ccm_dispatch(gpointer user_data);
extern void cib_ccm_msg_callback(oc_ed_t event, void *cookie, size_t size, const void *data);
#endif
diff --git a/cib/main.c b/cib/main.c
index 22053e97fa..d0bee819cd 100644
--- a/cib/main.c
+++ b/cib/main.c
@@ -1,584 +1,583 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/utsname.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <crm/crm.h>
#include <crm/cib/internal.h>
#include <crm/msg_xml.h>
-#include <crm/common/ipc.h>
#include <crm/cluster/internal.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
#include <cibio.h>
#include <callbacks.h>
#include <pwd.h>
#if HAVE_LIBXML2
# include <libxml/parser.h>
#endif
#ifdef HAVE_GETOPT_H
# include <getopt.h>
#endif
#if HAVE_BZLIB_H
# include <bzlib.h>
#endif
extern int init_remote_listener(int port, gboolean encrypted);
extern gboolean stand_alone;
gboolean cib_shutdown_flag = FALSE;
int cib_status = pcmk_ok;
crm_cluster_t crm_cluster;
#if SUPPORT_HEARTBEAT
oc_ev_t *cib_ev_token;
ll_cluster_t *hb_conn = NULL;
extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t, int);
gboolean cib_register_ha(ll_cluster_t * hb_cluster, const char *client_name);
#else
void *hb_conn = NULL;
#endif
extern void terminate_cib(const char *caller, gboolean fast);
GMainLoop *mainloop = NULL;
const char *cib_root = NULL;
char *cib_our_uname = NULL;
gboolean preserve_status = FALSE;
gboolean cib_writes_enabled = TRUE;
int remote_fd = 0;
int remote_tls_fd = 0;
int cib_init(void);
void cib_shutdown(int nsig);
gboolean startCib(const char *filename);
extern int write_cib_contents(gpointer p);
GHashTable *config_hash = NULL;
GHashTable *local_notify_queue = NULL;
char *channel1 = NULL;
char *channel2 = NULL;
char *channel3 = NULL;
char *channel4 = NULL;
char *channel5 = NULL;
#define OPTARGS "maswr:V?"
void cib_cleanup(void);
static void
cib_enable_writes(int nsig)
{
crm_info("(Re)enabling disk writes");
cib_writes_enabled = TRUE;
}
static void
log_cib_client(gpointer key, gpointer value, gpointer user_data)
{
crm_info("Client %s", crm_client_name(value));
}
/* *INDENT-OFF* */
static struct crm_option long_options[] = {
/* Top-level Options */
{"help", 0, 0, '?', "\tThis text"},
{"verbose", 0, 0, 'V', "\tIncrease debug output"},
{"per-action-cib", 0, 0, 'a', "\tAdvanced use only"},
{"stand-alone", 0, 0, 's', "\tAdvanced use only"},
{"disk-writes", 0, 0, 'w', "\tAdvanced use only"},
{"cib-root", 1, 0, 'r', "\tAdvanced use only"},
{0, 0, 0, 0}
};
/* *INDENT-ON* */
int
main(int argc, char **argv)
{
int flag;
int rc = 0;
int index = 0;
int argerr = 0;
struct passwd *pwentry = NULL;
crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE);
crm_set_options(NULL, "[options]",
long_options, "Daemon for storing and replicating the cluster configuration");
-
+
mainloop_add_signal(SIGTERM, cib_shutdown);
mainloop_add_signal(SIGPIPE, cib_enable_writes);
cib_writer = mainloop_add_trigger(G_PRIORITY_LOW, write_cib_contents, NULL);
crm_peer_init();
while (1) {
flag = crm_get_option(argc, argv, &index);
if (flag == -1)
break;
switch (flag) {
case 'V':
crm_bump_log_level(argc, argv);
break;
case 's':
stand_alone = TRUE;
preserve_status = TRUE;
cib_writes_enabled = FALSE;
pwentry = getpwnam(CRM_DAEMON_USER);
CRM_CHECK(pwentry != NULL,
crm_perror(LOG_ERR, "Invalid uid (%s) specified", CRM_DAEMON_USER);
return 100);
rc = setgid(pwentry->pw_gid);
if (rc < 0) {
crm_perror(LOG_ERR, "Could not set group to %d", pwentry->pw_gid);
return 100;
}
rc = setuid(pwentry->pw_uid);
if (rc < 0) {
crm_perror(LOG_ERR, "Could not set user to %d", pwentry->pw_uid);
return 100;
}
break;
case '?': /* Help message */
crm_help(flag, EX_OK);
break;
case 'w':
cib_writes_enabled = TRUE;
break;
case 'r':
cib_root = optarg;
break;
case 'm':
cib_metadata();
return 0;
default:
++argerr;
break;
}
}
if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) {
cib_metadata();
return 0;
}
if (optind > argc) {
++argerr;
}
if (argerr) {
crm_help('?', EX_USAGE);
}
if(cib_root == NULL) {
char *path = g_strdup_printf("%s/cib.xml", CRM_CONFIG_DIR);
char *legacy = g_strdup_printf("%s/cib.xml", CRM_LEGACY_CONFIG_DIR);
if(g_file_test(path, G_FILE_TEST_EXISTS)) {
cib_root = CRM_CONFIG_DIR;
} else if(g_file_test(legacy, G_FILE_TEST_EXISTS)) {
cib_root = CRM_LEGACY_CONFIG_DIR;
crm_notice("Using legacy config location: %s", cib_root);
} else {
cib_root = CRM_CONFIG_DIR;
crm_notice("Using new config location: %s", cib_root);
}
g_free(legacy);
g_free(path);
} else {
crm_notice("Using custom config location: %s", cib_root);
}
if (crm_is_writable(cib_root, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) {
crm_err("Bad permissions on %s. Terminating", cib_root);
fprintf(stderr, "ERROR: Bad permissions on %s. See logs for details\n", cib_root);
fflush(stderr);
return 100;
}
/* read local config file */
rc = cib_init();
CRM_CHECK(crm_hash_table_size(client_connections) == 0, crm_warn("Not all clients gone at exit"));
g_hash_table_foreach(client_connections, log_cib_client, NULL);
cib_cleanup();
#if SUPPORT_HEARTBEAT
if (hb_conn) {
hb_conn->llc_ops->delete(hb_conn);
}
#endif
crm_info("Done");
return rc;
}
void
cib_cleanup(void)
{
crm_peer_destroy();
if (local_notify_queue) {
g_hash_table_destroy(local_notify_queue);
}
crm_client_cleanup();
g_hash_table_destroy(config_hash);
free(cib_our_uname);
free(channel1);
free(channel2);
free(channel3);
free(channel4);
free(channel5);
}
unsigned long cib_num_ops = 0;
const char *cib_stat_interval = "10min";
unsigned long cib_num_local = 0, cib_num_updates = 0, cib_num_fail = 0;
unsigned long cib_bad_connects = 0, cib_num_timeouts = 0;
#if SUPPORT_HEARTBEAT
gboolean ccm_connect(void);
static void
ccm_connection_destroy(gpointer user_data)
{
crm_err("CCM connection failed... blocking while we reconnect");
CRM_ASSERT(ccm_connect());
return;
}
static void *ccm_library = NULL;
gboolean
ccm_connect(void)
{
gboolean did_fail = TRUE;
int num_ccm_fails = 0;
int max_ccm_fails = 30;
int ret;
int cib_ev_fd;
int (*ccm_api_register) (oc_ev_t ** token) =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_register", 1);
int (*ccm_api_set_callback) (const oc_ev_t * token,
oc_ev_class_t class,
oc_ev_callback_t * fn,
oc_ev_callback_t ** prev_fn) =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_set_callback", 1);
void (*ccm_api_special) (const oc_ev_t *, oc_ev_class_t, int) =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_special", 1);
int (*ccm_api_activate) (const oc_ev_t * token, int *fd) =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_activate", 1);
int (*ccm_api_unregister) (oc_ev_t * token) =
find_library_function(&ccm_library, CCM_LIBRARY, "oc_ev_unregister", 1);
- static struct mainloop_fd_callbacks ccm_fd_callbacks =
+ static struct mainloop_fd_callbacks ccm_fd_callbacks =
{
.dispatch = cib_ccm_dispatch,
.destroy = ccm_connection_destroy,
};
-
+
while (did_fail) {
did_fail = FALSE;
crm_info("Registering with CCM...");
ret = (*ccm_api_register) (&cib_ev_token);
if (ret != 0) {
did_fail = TRUE;
}
if (did_fail == FALSE) {
crm_trace("Setting up CCM callbacks");
ret = (*ccm_api_set_callback) (cib_ev_token, OC_EV_MEMB_CLASS,
cib_ccm_msg_callback, NULL);
if (ret != 0) {
crm_warn("CCM callback not set");
did_fail = TRUE;
}
}
if (did_fail == FALSE) {
(*ccm_api_special) (cib_ev_token, OC_EV_MEMB_CLASS, 0);
crm_trace("Activating CCM token");
ret = (*ccm_api_activate) (cib_ev_token, &cib_ev_fd);
if (ret != 0) {
crm_warn("CCM Activation failed");
did_fail = TRUE;
}
}
if (did_fail) {
num_ccm_fails++;
(*ccm_api_unregister) (cib_ev_token);
if (num_ccm_fails < max_ccm_fails) {
crm_warn("CCM Connection failed %d times (%d max)", num_ccm_fails, max_ccm_fails);
sleep(3);
} else {
crm_err("CCM Activation failed %d (max) times", num_ccm_fails);
return FALSE;
}
}
}
crm_debug("CCM Activation passed... all set to go!");
mainloop_add_fd("heartbeat-ccm", G_PRIORITY_MEDIUM, cib_ev_fd, cib_ev_token, &ccm_fd_callbacks);
return TRUE;
}
#endif
#if SUPPORT_COROSYNC
static gboolean
cib_ais_dispatch(int kind, const char *from, const char *data)
{
xmlNode *xml = NULL;
if (kind == crm_class_cluster) {
xml = string2xml(data);
if (xml == NULL) {
goto bail;
}
crm_xml_add(xml, F_ORIG, from);
/* crm_xml_add_int(xml, F_SEQ, wrapper->id); */
cib_peer_callback(xml, NULL);
}
free_xml(xml);
return TRUE;
bail:
crm_err("Invalid XML: '%.120s'", data);
return TRUE;
}
static void
cib_ais_destroy(gpointer user_data)
{
if (cib_shutdown_flag) {
crm_info("Corosync disconnection complete");
} else {
crm_err("Corosync connection lost! Exiting.");
terminate_cib(__FUNCTION__, TRUE);
}
}
#endif
static void
cib_peer_update_callback(enum crm_status_type type, crm_node_t * node, const void *data)
{
#if 0
/* crm_active_peers(crm_proc_cib) appears to give the wrong answer
* sometimes, this might help figure out why
*/
if(type == crm_status_nstate) {
crm_info("status: %s is now %s (was %s)", node->uname, node->state, (const char *)data);
if (safe_str_eq(CRMD_JOINSTATE_MEMBER, node->state)) {
return;
}
} else if(type == crm_status_processes) {
uint32_t old = 0;
if (data) {
old = *(const uint32_t *)data;
}
-
+
if ((node->processes ^ old) & crm_proc_cib) {
crm_info("status: cib process on %s is now %sactive",
node->uname, is_set(node->processes, crm_proc_cib)?"":"in");
} else {
return;
}
} else {
return;
}
#endif
if(cib_shutdown_flag && crm_active_peers() < 2 && crm_hash_table_size(client_connections) == 0) {
crm_info("No more peers");
terminate_cib(__FUNCTION__, FALSE);
}
}
#if SUPPORT_HEARTBEAT
static void
cib_ha_connection_destroy(gpointer user_data)
{
if (cib_shutdown_flag) {
crm_info("Heartbeat disconnection complete... exiting");
terminate_cib(__FUNCTION__, FALSE);
} else {
crm_err("Heartbeat connection lost! Exiting.");
terminate_cib(__FUNCTION__, TRUE);
}
}
#endif
int
cib_init(void)
{
if (is_openais_cluster()) {
#if SUPPORT_COROSYNC
crm_cluster.destroy = cib_ais_destroy;
crm_cluster.cs_dispatch = cib_ais_dispatch;
#endif
} else if(is_heartbeat_cluster()) {
#if SUPPORT_HEARTBEAT
crm_cluster.hb_dispatch = cib_ha_peer_callback;
crm_cluster.destroy = cib_ha_connection_destroy;
#endif
}
config_hash =
g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str);
if (startCib("cib.xml") == FALSE) {
crm_crit("Cannot start CIB... terminating");
crm_exit(1);
}
if (stand_alone == FALSE) {
if (crm_cluster_connect(&crm_cluster) == FALSE) {
crm_crit("Cannot sign in to the cluster... terminating");
crm_exit(100);
}
cib_our_uname = crm_cluster.uname;
if (is_openais_cluster()) {
crm_set_status_callback(&cib_peer_update_callback);
}
#if SUPPORT_HEARTBEAT
if (is_heartbeat_cluster()) {
gboolean was_error = FALSE;
hb_conn = crm_cluster.hb_conn;
if (was_error == FALSE) {
if (HA_OK !=
hb_conn->llc_ops->set_cstatus_callback(hb_conn, cib_client_status_callback,
hb_conn)) {
crm_err("Cannot set cstatus callback: %s", hb_conn->llc_ops->errmsg(hb_conn));
was_error = TRUE;
}
}
if (was_error == FALSE) {
was_error = (ccm_connect() == FALSE);
}
if (was_error == FALSE) {
/* Async get client status information in the cluster */
crm_info("Requesting the list of configured nodes");
hb_conn->llc_ops->client_status(hb_conn, NULL, CRM_SYSTEM_CIB, -1);
}
}
#endif
} else {
cib_our_uname = strdup("localhost");
}
ipcs_ro = mainloop_add_ipc_server(cib_channel_ro, QB_IPC_NATIVE, &ipc_ro_callbacks);
ipcs_rw = mainloop_add_ipc_server(cib_channel_rw, QB_IPC_NATIVE, &ipc_rw_callbacks);
ipcs_shm = mainloop_add_ipc_server(cib_channel_shm, QB_IPC_SHM, &ipc_rw_callbacks);
if (stand_alone) {
cib_is_master = TRUE;
}
if (ipcs_ro != NULL && ipcs_rw != NULL && ipcs_shm != NULL) {
/* Create the mainloop and run it... */
mainloop = g_main_new(FALSE);
crm_info("Starting %s mainloop", crm_system_name);
g_main_run(mainloop);
} else {
crm_err("Failed to create IPC servers: shutting down and inhibiting respawn");
crm_exit(100);
}
qb_ipcs_destroy(ipcs_ro);
qb_ipcs_destroy(ipcs_rw);
qb_ipcs_destroy(ipcs_shm);
return crm_exit(0);
}
gboolean
startCib(const char *filename)
{
gboolean active = FALSE;
xmlNode *cib = readCibXmlFile(cib_root, filename, !preserve_status);
CRM_ASSERT(cib != NULL);
if (activateCibXml(cib, TRUE, "start") == 0) {
int port = 0;
const char *port_s = NULL;
active = TRUE;
cib_read_config(config_hash, cib);
port_s = crm_element_value(cib, "remote-tls-port");
if (port_s) {
port = crm_parse_int(port_s, "0");
remote_tls_fd = init_remote_listener(port, TRUE);
}
port_s = crm_element_value(cib, "remote-clear-port");
if (port_s) {
port = crm_parse_int(port_s, "0");
remote_fd = init_remote_listener(port, FALSE);
}
crm_info("CIB Initialization completed successfully");
}
return active;
}
diff --git a/cib/messages.c b/cib/messages.c
index 29ad5d7eac..e4afac65d9 100644
--- a/cib/messages.c
+++ b/cib/messages.c
@@ -1,466 +1,467 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <sys/param.h>
#include <sys/types.h>
#include <crm/crm.h>
#include <crm/cib/internal.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
+#include <crm/common/ipcs.h>
#include <crm/cluster/internal.h>
#include <cibio.h>
#include <cibmessages.h>
#include <callbacks.h>
#define MAX_DIFF_RETRY 5
#ifdef CIBPIPE
gboolean cib_is_master = TRUE;
#else
gboolean cib_is_master = FALSE;
#endif
xmlNode *the_cib = NULL;
gboolean syncd_once = FALSE;
extern const char *cib_our_uname;
int revision_check(xmlNode * cib_update, xmlNode * cib_copy, int flags);
int get_revision(xmlNode * xml_obj, int cur_revision);
int updateList(xmlNode * local_cib, xmlNode * update_command, xmlNode * failed,
int operation, const char *section);
gboolean check_generation(xmlNode * newCib, xmlNode * oldCib);
gboolean update_results(xmlNode * failed, xmlNode * target, const char *operation, int return_code);
int cib_update_counter(xmlNode * xml_obj, const char *field, gboolean reset);
int sync_our_cib(xmlNode * request, gboolean all);
extern xmlNode *cib_msg_copy(const xmlNode * msg, gboolean with_data);
extern gboolean cib_shutdown_flag;
int
cib_process_shutdown_req(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
#ifdef CIBPIPE
return -EINVAL;
#else
int result = pcmk_ok;
const char *host = crm_element_value(req, F_ORIG);
*answer = NULL;
if (crm_element_value(req, F_CIB_ISREPLY) == NULL) {
crm_info("Shutdown REQ from %s", host);
return pcmk_ok;
} else if (cib_shutdown_flag) {
crm_info("Shutdown ACK from %s", host);
terminate_cib(__FUNCTION__, FALSE);
return pcmk_ok;
} else {
crm_err("Shutdown ACK from %s - not shutting down", host);
result = -EINVAL;
}
return result;
#endif
}
int
cib_process_default(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
int result = pcmk_ok;
crm_trace("Processing \"%s\" event", op);
*answer = NULL;
if (op == NULL) {
result = -EINVAL;
crm_err("No operation specified");
} else if (strcasecmp(CRM_OP_NOOP, op) == 0) {
;
} else {
result = -EPROTONOSUPPORT;
crm_err("Action [%s] is not supported by the CIB", op);
}
return result;
}
int
cib_process_quit(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
int result = pcmk_ok;
crm_trace("Processing \"%s\" event", op);
crm_warn("The CRMd has asked us to exit... complying");
crm_exit(0);
return result;
}
int
cib_process_readwrite(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
#ifdef CIBPIPE
return -EINVAL;
#else
int result = pcmk_ok;
crm_trace("Processing \"%s\" event", op);
if (safe_str_eq(op, CIB_OP_ISMASTER)) {
if (cib_is_master == TRUE) {
result = pcmk_ok;
} else {
result = -EPERM;
}
return result;
}
if (safe_str_eq(op, CIB_OP_MASTER)) {
if (cib_is_master == FALSE) {
crm_info("We are now in R/W mode");
cib_is_master = TRUE;
syncd_once = TRUE;
} else {
crm_debug("We are still in R/W mode");
}
} else if (cib_is_master) {
crm_info("We are now in R/O mode");
cib_is_master = FALSE;
}
return result;
#endif
}
int
cib_process_ping(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
#ifdef CIBPIPE
return -EINVAL;
#else
int result = pcmk_ok;
crm_trace("Processing \"%s\" event", op);
*answer = create_xml_node(NULL, XML_CRM_TAG_PING);
crm_xml_add(*answer, XML_PING_ATTR_STATUS, "ok");
crm_xml_add(*answer, XML_PING_ATTR_SYSFROM, CRM_SYSTEM_CIB);
return result;
#endif
}
int
cib_process_sync(const char *op, int options, const char *section, xmlNode * req, xmlNode * input,
xmlNode * existing_cib, xmlNode ** result_cib, xmlNode ** answer)
{
#ifdef CIBPIPE
return -EINVAL;
#else
return sync_our_cib(req, TRUE);
#endif
}
int
cib_process_sync_one(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
#ifdef CIBPIPE
return -EINVAL;
#else
return sync_our_cib(req, FALSE);
#endif
}
int sync_in_progress = 0;
int
cib_server_process_diff(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
int rc = pcmk_ok;
if (cib_is_master) {
/* the master is never waiting for a resync */
sync_in_progress = 0;
}
if (sync_in_progress > MAX_DIFF_RETRY) {
/* request another full-sync,
* the last request may have been lost
*/
sync_in_progress = 0;
}
if (sync_in_progress) {
int diff_add_updates = 0;
int diff_add_epoch = 0;
int diff_add_admin_epoch = 0;
int diff_del_updates = 0;
int diff_del_epoch = 0;
int diff_del_admin_epoch = 0;
cib_diff_version_details(input,
&diff_add_admin_epoch, &diff_add_epoch, &diff_add_updates,
&diff_del_admin_epoch, &diff_del_epoch, &diff_del_updates);
sync_in_progress++;
crm_notice("Not applying diff %d.%d.%d -> %d.%d.%d (sync in progress)",
diff_del_admin_epoch, diff_del_epoch, diff_del_updates,
diff_add_admin_epoch, diff_add_epoch, diff_add_updates);
return -pcmk_err_diff_resync;
}
rc = cib_process_diff(op, options, section, req, input, existing_cib, result_cib, answer);
if (rc == -pcmk_err_diff_resync && cib_is_master == FALSE) {
xmlNode *sync_me = create_xml_node(NULL, "sync-me");
free_xml(*result_cib);
*result_cib = NULL;
crm_info("Requesting re-sync from peer");
sync_in_progress++;
crm_xml_add(sync_me, F_TYPE, "cib");
crm_xml_add(sync_me, F_CIB_OPERATION, CIB_OP_SYNC_ONE);
crm_xml_add(sync_me, F_CIB_DELEGATED, cib_our_uname);
if (send_cluster_message(NULL, crm_msg_cib, sync_me, FALSE) == FALSE) {
rc = -ENOTCONN;
}
free_xml(sync_me);
} else if (rc == -pcmk_err_diff_resync) {
rc = -pcmk_err_diff_failed;
if (options & cib_force_diff) {
crm_warn("Not requesting full refresh in R/W mode");
}
}
return rc;
}
int
cib_process_replace_svr(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
const char *tag = crm_element_name(input);
int rc =
cib_process_replace(op, options, section, req, input, existing_cib, result_cib, answer);
if (rc == pcmk_ok && safe_str_eq(tag, XML_TAG_CIB)) {
sync_in_progress = 0;
}
return rc;
}
static int
delete_cib_object(xmlNode * parent, xmlNode * delete_spec)
{
const char *object_name = NULL;
const char *object_id = NULL;
xmlNode *equiv_node = NULL;
int result = pcmk_ok;
if (delete_spec != NULL) {
object_name = crm_element_name(delete_spec);
}
object_id = crm_element_value(delete_spec, XML_ATTR_ID);
crm_trace("Processing: <%s id=%s>", crm_str(object_name), crm_str(object_id));
if (delete_spec == NULL) {
result = -EINVAL;
} else if (parent == NULL) {
result = -EINVAL;
} else if (object_id == NULL) {
/* placeholder object */
equiv_node = find_xml_node(parent, object_name, FALSE);
} else {
equiv_node = find_entity(parent, object_name, object_id);
}
if (result != pcmk_ok) {
; /* nothing */
} else if (equiv_node == NULL) {
result = pcmk_ok;
} else if (xml_has_children(delete_spec) == FALSE) {
/* only leaves are deleted */
crm_debug("Removing leaf: <%s id=%s>", crm_str(object_name), crm_str(object_id));
free_xml(equiv_node);
equiv_node = NULL;
} else {
xmlNode *child = NULL;
for (child = __xml_first_child(delete_spec); child != NULL; child = __xml_next(child)) {
int tmp_result = delete_cib_object(equiv_node, child);
/* only the first error is likely to be interesting */
if (tmp_result != pcmk_ok && result == pcmk_ok) {
result = tmp_result;
}
}
}
return result;
}
int
cib_process_delete_absolute(const char *op, int options, const char *section, xmlNode * req,
xmlNode * input, xmlNode * existing_cib, xmlNode ** result_cib,
xmlNode ** answer)
{
xmlNode *failed = NULL;
int result = pcmk_ok;
xmlNode *update_section = NULL;
crm_trace("Processing \"%s\" event for section=%s", op, crm_str(section));
if (safe_str_eq(XML_CIB_TAG_SECTION_ALL, section)) {
section = NULL;
} else if (safe_str_eq(XML_TAG_CIB, section)) {
section = NULL;
} else if (safe_str_eq(crm_element_name(input), XML_TAG_CIB)) {
section = NULL;
}
CRM_CHECK(strcasecmp(CIB_OP_DELETE, op) == 0, return -EINVAL);
if (input == NULL) {
crm_err("Cannot perform modification with no data");
return -EINVAL;
}
failed = create_xml_node(NULL, XML_TAG_FAILED);
update_section = get_object_root(section, *result_cib);
result = delete_cib_object(update_section, input);
update_results(failed, input, op, result);
if (xml_has_children(failed)) {
CRM_CHECK(result != pcmk_ok, result = -EINVAL);
}
if (result != pcmk_ok) {
crm_log_xml_err(failed, "CIB Update failures");
*answer = failed;
} else {
free_xml(failed);
}
return result;
}
gboolean
check_generation(xmlNode * newCib, xmlNode * oldCib)
{
if (cib_compare_generation(newCib, oldCib) >= 0) {
return TRUE;
}
crm_warn("Generation from update is older than the existing one");
return FALSE;
}
#ifndef CIBPIPE
int
sync_our_cib(xmlNode * request, gboolean all)
{
int result = pcmk_ok;
char *digest = NULL;
const char *host = crm_element_value(request, F_ORIG);
const char *op = crm_element_value(request, F_CIB_OPERATION);
xmlNode *replace_request = cib_msg_copy(request, FALSE);
CRM_CHECK(the_cib != NULL,;);
CRM_CHECK(replace_request != NULL,;);
crm_debug("Syncing CIB to %s", all ? "all peers" : host);
if (all == FALSE && host == NULL) {
crm_log_xml_err(request, "bad sync");
}
/* remove the "all == FALSE" condition
*
* sync_from was failing, the local client wasnt being notified
* because it didnt know it was a reply
* setting this does not prevent the other nodes from applying it
* if all == TRUE
*/
if (host != NULL) {
crm_xml_add(replace_request, F_CIB_ISREPLY, host);
}
crm_xml_add(replace_request, F_CIB_OPERATION, CIB_OP_REPLACE);
crm_xml_add(replace_request, "original_" F_CIB_OPERATION, op);
crm_xml_add(replace_request, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE);
crm_xml_add(replace_request, XML_ATTR_CRM_VERSION, CRM_FEATURE_SET);
digest = calculate_xml_versioned_digest(the_cib, FALSE, TRUE, CRM_FEATURE_SET);
crm_xml_add(replace_request, XML_ATTR_DIGEST, digest);
add_message_xml(replace_request, F_CIB_CALLDATA, the_cib);
if (send_cluster_message(all ? NULL : crm_get_peer(0, host), crm_msg_cib, replace_request, FALSE) == FALSE) {
result = -ENOTCONN;
}
free_xml(replace_request);
free(digest);
return result;
}
#endif
diff --git a/cib/notify.c b/cib/notify.c
index bc01086640..272971948e 100644
--- a/cib/notify.c
+++ b/cib/notify.c
@@ -1,378 +1,377 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <crm/crm.h>
#include <crm/cib/internal.h>
#include <crm/msg_xml.h>
#include <crm/common/xml.h>
#include <cibio.h>
#include <callbacks.h>
#include <notify.h>
-#include <crm/common/ipcs.h>
int pending_updates = 0;
-struct cib_notification_s
+struct cib_notification_s
{
xmlNode *msg;
struct iovec *iov;
};
void attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib);
void do_cib_notify(int options, const char *op, xmlNode * update,
int result, xmlNode * result_data, const char *msg_type);
static void
need_pre_notify(gpointer key, gpointer value, gpointer user_data)
{
crm_client_t *client = value;
if (is_set(client->options, cib_notify_pre)) {
gboolean *needed = user_data;
*needed = TRUE;
}
}
static void
need_post_notify(gpointer key, gpointer value, gpointer user_data)
{
crm_client_t *client = value;
if (is_set(client->options, cib_notify_post)) {
gboolean *needed = user_data;
*needed = TRUE;
}
}
static gboolean
cib_notify_send_one(gpointer key, gpointer value, gpointer user_data)
{
const char *type = NULL;
gboolean do_send = FALSE;
crm_client_t *client = value;
struct cib_notification_s *update = user_data;
CRM_CHECK(client != NULL, return TRUE);
CRM_CHECK(update != NULL, return TRUE);
if (client->ipcs == NULL && client->remote == NULL) {
crm_warn("Skipping client with NULL channel");
return FALSE;
}
type = crm_element_value(update->msg, F_SUBTYPE);
CRM_LOG_ASSERT(type != NULL);
if (is_set(client->options, cib_notify_diff) && safe_str_eq(type, T_CIB_DIFF_NOTIFY)) {
do_send = TRUE;
} else if (is_set(client->options, cib_notify_replace) && safe_str_eq(type, T_CIB_REPLACE_NOTIFY)) {
do_send = TRUE;
} else if (is_set(client->options, cib_notify_confirm) && safe_str_eq(type, T_CIB_UPDATE_CONFIRM)) {
do_send = TRUE;
} else if (is_set(client->options, cib_notify_pre) && safe_str_eq(type, T_CIB_PRE_NOTIFY)) {
do_send = TRUE;
} else if (is_set(client->options, cib_notify_post) && safe_str_eq(type, T_CIB_POST_NOTIFY)) {
do_send = TRUE;
}
if (do_send) {
switch(client->kind) {
case CRM_CLIENT_IPC:
if(crm_ipcs_sendv(client, update->iov, crm_ipc_server_event) < 0) {
crm_warn("Notification of client %s/%s failed", client->name, client->id);
}
break;
# ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
# endif
case CRM_CLIENT_TCP:
crm_debug("Sent %s notification to client %s/%s", type, client->name, client->id);
crm_remote_send(client->remote, update->msg);
break;
default:
crm_err("Unknown transport %d for %s", client->kind, client->name);
}
}
return FALSE;
}
static void cib_notify_send(xmlNode *xml)
{
struct iovec *iov;
struct cib_notification_s update;
ssize_t rc = crm_ipcs_prepare(0, xml, &iov);
crm_trace("Notifying clients");
if(rc > 0) {
update.msg = xml;
update.iov = iov;
g_hash_table_foreach_remove(client_connections, cib_notify_send_one, &update);
} else {
crm_notice("Notification failed: %s (%d)", pcmk_strerror(rc), rc);
}
-
+
if(iov) {
free(iov[0].iov_base);
free(iov[1].iov_base);
free(iov);
}
crm_trace("Notify complete");
}
void
cib_pre_notify(int options, const char *op, xmlNode * existing, xmlNode * update)
{
xmlNode *update_msg = NULL;
const char *type = NULL;
const char *id = NULL;
gboolean needed = FALSE;
g_hash_table_foreach(client_connections, need_pre_notify, &needed);
if (needed == FALSE) {
return;
}
/* TODO: consider pre-notification for removal */
update_msg = create_xml_node(NULL, "pre-notify");
if (update != NULL) {
id = crm_element_value(update, XML_ATTR_ID);
}
crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY);
crm_xml_add(update_msg, F_SUBTYPE, T_CIB_PRE_NOTIFY);
crm_xml_add(update_msg, F_CIB_OPERATION, op);
if (id != NULL) {
crm_xml_add(update_msg, F_CIB_OBJID, id);
}
if (update != NULL) {
crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update));
} else if (existing != NULL) {
crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(existing));
}
type = crm_element_value(update_msg, F_CIB_OBJTYPE);
attach_cib_generation(update_msg, "cib_generation", the_cib);
if (existing != NULL) {
add_message_xml(update_msg, F_CIB_EXISTING, existing);
}
if (update != NULL) {
add_message_xml(update_msg, F_CIB_UPDATE, update);
}
cib_notify_send(update_msg);
if (update == NULL) {
crm_trace("Performing operation %s (on section=%s)", op, type);
} else {
crm_trace("Performing %s on <%s%s%s>", op, type, id ? " id=" : "", id ? id : "");
}
free_xml(update_msg);
}
void
cib_post_notify(int options, const char *op, xmlNode * update,
int result, xmlNode * new_obj)
{
gboolean needed = FALSE;
g_hash_table_foreach(client_connections, need_post_notify, &needed);
if (needed == FALSE) {
return;
}
do_cib_notify(options, op, update, result, new_obj, T_CIB_UPDATE_CONFIRM);
}
void
cib_diff_notify(int options, const char *client, const char *call_id, const char *op,
xmlNode * update, int result, xmlNode * diff)
{
int add_updates = 0;
int add_epoch = 0;
int add_admin_epoch = 0;
int del_updates = 0;
int del_epoch = 0;
int del_admin_epoch = 0;
int log_level = LOG_DEBUG_2;
if (diff == NULL) {
return;
}
if (result != pcmk_ok) {
log_level = LOG_WARNING;
}
cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates,
&del_admin_epoch, &del_epoch, &del_updates);
if (add_updates != del_updates) {
do_crm_log(log_level,
"Update (client: %s%s%s): %d.%d.%d -> %d.%d.%d (%s)",
client, call_id ? ", call:" : "", call_id ? call_id : "",
del_admin_epoch, del_epoch, del_updates,
add_admin_epoch, add_epoch, add_updates, pcmk_strerror(result));
} else if (diff != NULL) {
do_crm_log(log_level,
"Local-only Change (client:%s%s%s): %d.%d.%d (%s)",
client, call_id ? ", call: " : "", call_id ? call_id : "",
add_admin_epoch, add_epoch, add_updates, pcmk_strerror(result));
}
do_cib_notify(options, op, update, result, diff, T_CIB_DIFF_NOTIFY);
}
void
do_cib_notify(int options, const char *op, xmlNode * update,
int result, xmlNode * result_data, const char *msg_type)
{
xmlNode *update_msg = NULL;
const char *id = NULL;
update_msg = create_xml_node(NULL, "notify");
if (result_data != NULL) {
id = crm_element_value(result_data, XML_ATTR_ID);
}
crm_xml_add(update_msg, F_TYPE, T_CIB_NOTIFY);
crm_xml_add(update_msg, F_SUBTYPE, msg_type);
crm_xml_add(update_msg, F_CIB_OPERATION, op);
crm_xml_add_int(update_msg, F_CIB_RC, result);
if (id != NULL) {
crm_xml_add(update_msg, F_CIB_OBJID, id);
}
if (update != NULL) {
crm_trace("Setting type to update->name: %s", crm_element_name(update));
crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(update));
} else if (result_data != NULL) {
crm_trace("Setting type to new_obj->name: %s", crm_element_name(result_data));
crm_xml_add(update_msg, F_CIB_OBJTYPE, crm_element_name(result_data));
} else {
crm_trace("Not Setting type");
}
attach_cib_generation(update_msg, "cib_generation", the_cib);
if (update != NULL) {
add_message_xml(update_msg, F_CIB_UPDATE, update);
}
if (result_data != NULL) {
add_message_xml(update_msg, F_CIB_UPDATE_RESULT, result_data);
}
cib_notify_send(update_msg);
free_xml(update_msg);
}
void
attach_cib_generation(xmlNode * msg, const char *field, xmlNode * a_cib)
{
xmlNode *generation = create_xml_node(NULL, XML_CIB_TAG_GENERATION_TUPPLE);
if (a_cib != NULL) {
copy_in_properties(generation, a_cib);
}
add_message_xml(msg, field, generation);
free_xml(generation);
}
void
cib_replace_notify(const char *origin, xmlNode * update, int result, xmlNode * diff)
{
xmlNode *replace_msg = NULL;
int add_updates = 0;
int add_epoch = 0;
int add_admin_epoch = 0;
int del_updates = 0;
int del_epoch = 0;
int del_admin_epoch = 0;
if (diff == NULL) {
return;
}
cib_diff_version_details(diff, &add_admin_epoch, &add_epoch, &add_updates,
&del_admin_epoch, &del_epoch, &del_updates);
if(del_updates < 0) {
crm_log_xml_debug(diff, "Bad replace diff");
}
if (add_updates != del_updates) {
crm_info("Replaced: %d.%d.%d -> %d.%d.%d from %s",
del_admin_epoch, del_epoch, del_updates,
add_admin_epoch, add_epoch, add_updates, crm_str(origin));
} else if (diff != NULL) {
crm_info("Local-only Replace: %d.%d.%d from %s",
add_admin_epoch, add_epoch, add_updates, crm_str(origin));
}
replace_msg = create_xml_node(NULL, "notify-replace");
crm_xml_add(replace_msg, F_TYPE, T_CIB_NOTIFY);
crm_xml_add(replace_msg, F_SUBTYPE, T_CIB_REPLACE_NOTIFY);
crm_xml_add(replace_msg, F_CIB_OPERATION, CIB_OP_REPLACE);
crm_xml_add_int(replace_msg, F_CIB_RC, result);
attach_cib_generation(replace_msg, "cib-replace-generation", update);
crm_log_xml_trace(replace_msg, "CIB Replaced");
cib_notify_send(replace_msg);
free_xml(replace_msg);
}
diff --git a/crmd/crmd_fsa.h b/crmd/crmd_fsa.h
index 37de62a7f7..2550a0d8c2 100644
--- a/crmd/crmd_fsa.h
+++ b/crmd/crmd_fsa.h
@@ -1,134 +1,134 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef CRMD_FSA__H
# define CRMD_FSA__H
# include <fsa_defines.h>
# include <crm/crm.h>
# include <crm/cib.h>
# include <crm/common/xml.h>
# include <crm/common/mainloop.h>
# include <crm/cluster.h>
-# include <crm/common/ipc.h>
+# include <crm/common/ipcs.h>
# if SUPPORT_HEARTBEAT
extern ll_cluster_t *fsa_cluster_conn;
# endif
/* copy from struct client_child in heartbeat.h
*
* Plus a couple of other things
*/
struct crm_subsystem_s {
pid_t pid; /* Process id of child process */
const char *name; /* executable name */
const char *path; /* Command location */
const char *command; /* Command with path */
const char *args; /* Command arguments */
crm_client_t *client; /* Client connection object */
gboolean sent_kill;
mainloop_io_t *source; /* How can we communicate with it */
long long flag_connected; /* */
long long flag_required; /* */
};
typedef struct fsa_timer_s fsa_timer_t;
struct fsa_timer_s {
guint source_id; /* timer source id */
int period_ms; /* timer period */
enum crmd_fsa_input fsa_input;
gboolean(*callback) (gpointer data);
gboolean repeat;
int counter;
};
enum fsa_data_type {
fsa_dt_none,
fsa_dt_ha_msg,
fsa_dt_xml,
fsa_dt_lrm,
};
typedef struct fsa_data_s fsa_data_t;
struct fsa_data_s {
int id;
enum crmd_fsa_input fsa_input;
enum crmd_fsa_cause fsa_cause;
long long actions;
const char *origin;
void *data;
enum fsa_data_type data_type;
};
extern enum crmd_fsa_state s_crmd_fsa(enum crmd_fsa_cause cause);
/* Global FSA stuff */
extern volatile gboolean do_fsa_stall;
extern volatile enum crmd_fsa_state fsa_state;
extern volatile long long fsa_input_register;
extern volatile long long fsa_actions;
extern cib_t *fsa_cib_conn;
extern char *fsa_our_uname;
extern char *fsa_our_uuid;
extern char *fsa_pe_ref; /* the last invocation of the PE */
extern char *fsa_our_dc;
extern char *fsa_our_dc_version;
extern GListPtr fsa_message_queue;
extern fsa_timer_t *election_trigger; /* */
extern fsa_timer_t *election_timeout; /* */
extern fsa_timer_t *shutdown_escalation_timer; /* */
extern fsa_timer_t *transition_timer;
extern fsa_timer_t *integration_timer;
extern fsa_timer_t *finalization_timer;
extern fsa_timer_t *wait_timer;
extern fsa_timer_t *recheck_timer;
extern crm_trigger_t *fsa_source;
extern crm_trigger_t *config_read;
extern struct crm_subsystem_s *cib_subsystem;
extern struct crm_subsystem_s *te_subsystem;
extern struct crm_subsystem_s *pe_subsystem;
extern GHashTable *welcomed_nodes;
extern GHashTable *integrated_nodes;
extern GHashTable *finalized_nodes;
extern GHashTable *confirmed_nodes;
extern GHashTable *crmd_peer_state;
/* these two should be moved elsewhere... */
extern void do_update_cib_nodes(gboolean overwrite, const char *caller);
extern gboolean do_dc_heartbeat(gpointer data);
# define AM_I_DC is_set(fsa_input_register, R_THE_DC)
# define AM_I_OPERATIONAL (is_set(fsa_input_register, R_STARTING)==FALSE)
extern unsigned long long saved_ccm_membership_id;
extern gboolean ever_had_quorum;
# include <fsa_proto.h>
# include <crmd_utils.h>
#define trigger_fsa(source) crm_trace("Triggering FSA: %s", __FUNCTION__); \
mainloop_set_trigger(source);
#endif
diff --git a/crmd/crmd_messages.h b/crmd/crmd_messages.h
index a4837bbd53..50a56cd35d 100644
--- a/crmd/crmd_messages.h
+++ b/crmd/crmd_messages.h
@@ -1,111 +1,111 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef XML_CRM_MESSAGES__H
# define XML_CRM_MESSAGES__H
# include <crm/crm.h>
-# include <crm/common/ipc.h>
+# include <crm/common/ipcs.h>
# include <crm/common/xml.h>
# include <crm/cluster/internal.h>
# include <crmd_fsa.h>
typedef struct ha_msg_input_s {
xmlNode *msg;
xmlNode *xml;
} ha_msg_input_t;
extern ha_msg_input_t *new_ha_msg_input(xmlNode * orig);
extern void delete_ha_msg_input(ha_msg_input_t * orig);
extern void *fsa_typed_data_adv(fsa_data_t * fsa_data, enum fsa_data_type a_type,
const char *caller);
# define fsa_typed_data(x) fsa_typed_data_adv(msg_data, x, __FUNCTION__)
extern void register_fsa_error_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
fsa_data_t * cur_data, void *new_data, const char *raised_from);
# define register_fsa_error(cause, input, new_data) register_fsa_error_adv(cause, input, msg_data, new_data, __FUNCTION__)
extern int register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
void *data, long long with_actions,
gboolean prepend, const char *raised_from);
extern void fsa_dump_queue(int log_level);
extern void route_message(enum crmd_fsa_cause cause, xmlNode * input);
# define crmd_fsa_stall(suppress) do { \
if(suppress == FALSE && msg_data != NULL) { \
register_fsa_input_adv( \
((fsa_data_t*)msg_data)->fsa_cause, I_WAIT_FOR_EVENT, \
((fsa_data_t*)msg_data)->data, action, TRUE, __FUNCTION__); \
} else { \
register_fsa_input_adv( \
C_FSA_INTERNAL, I_WAIT_FOR_EVENT, \
NULL, action, TRUE, __FUNCTION__); \
} \
} while(0)
# define register_fsa_input(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__)
# define register_fsa_action(action) { \
fsa_actions |= action; \
if(fsa_source) { \
mainloop_set_trigger(fsa_source); \
} \
crm_debug("%s added action %s to the FSA", \
__FUNCTION__, fsa_action2string(action)); \
}
# define register_fsa_input_before(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, TRUE, __FUNCTION__)
# define register_fsa_input_later(cause, input, data) register_fsa_input_adv(cause, input, data, A_NOTHING, FALSE, __FUNCTION__)
void delete_fsa_input(fsa_data_t * fsa_data);
GListPtr put_message(fsa_data_t * new_message);
fsa_data_t *get_message(void);
gboolean is_message(void);
gboolean have_wait_message(void);
extern gboolean relay_message(xmlNode * relay_message, gboolean originated_locally);
extern void process_message(xmlNode * msg, gboolean originated_locally, const char *src_node_name);
extern gboolean crm_dc_process_message(xmlNode * whole_message,
xmlNode * action,
const char *host_from,
const char *sys_from,
const char *sys_to, const char *op, gboolean dc_mode);
extern gboolean send_msg_via_ipc(xmlNode * msg, const char *sys);
extern gboolean add_pending_outgoing_reply(const char *originating_node_name,
const char *crm_msg_reference,
const char *sys_to, const char *sys_from);
extern gboolean crmd_authorize_message(xmlNode * client_msg, crm_client_t * curr_client);
extern gboolean send_request(xmlNode * msg, char **msg_reference);
extern enum crmd_fsa_input handle_message(xmlNode * stored_msg);
extern ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t * orig);
#endif
diff --git a/include/crm/common/mainloop.h b/include/crm/common/mainloop.h
index 144e028ef3..d08fb0761b 100644
--- a/include/crm/common/mainloop.h
+++ b/include/crm/common/mainloop.h
@@ -1,106 +1,106 @@
/*
* Copyright (C) 2009 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef CRM_COMMON_MAINLOOP__H
# define CRM_COMMON_MAINLOOP__H
/**
* \file
* \brief Wrappers for and extensions to glib mainloop
* \ingroup core
*/
# include <glib.h>
typedef struct trigger_s crm_trigger_t;
typedef struct mainloop_io_s mainloop_io_t;
typedef struct mainloop_child_s mainloop_child_t;
crm_trigger_t *mainloop_add_trigger(int priority, int(*dispatch) (gpointer user_data),
gpointer userdata);
void mainloop_set_trigger(crm_trigger_t * source);
void mainloop_trigger_complete(crm_trigger_t *trig);
gboolean mainloop_destroy_trigger(crm_trigger_t * source);
gboolean crm_signal(int sig, void (*dispatch) (int sig));
gboolean mainloop_add_signal(int sig, void (*dispatch) (int sig));
gboolean mainloop_destroy_signal(int sig);
#include <crm/common/ipc.h>
-#include <crm/common/ipcs.h>
+#include <qb/qbipcs.h>
struct ipc_client_callbacks
{
int (*dispatch)(const char *buffer, ssize_t length, gpointer userdata);
void (*destroy) (gpointer);
};
qb_ipcs_service_t *mainloop_add_ipc_server(
const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks);
void mainloop_del_ipc_server(qb_ipcs_service_t *server);
mainloop_io_t *mainloop_add_ipc_client(
const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks);
void mainloop_del_ipc_client(mainloop_io_t *client);
crm_ipc_t *mainloop_get_ipc_client(mainloop_io_t *client);
struct mainloop_fd_callbacks
{
int (*dispatch)(gpointer userdata);
void (*destroy)(gpointer userdata);
};
mainloop_io_t *mainloop_add_fd(
const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks);
void mainloop_del_fd(mainloop_io_t *client);
/*
* Create a new tracked process
* To track a process group, use -pid
*/
void
mainloop_add_child(pid_t pid,
int timeout,
const char *desc,
void *userdata,
void (*callback)(mainloop_child_t* p,
int status,
int signo,
int exitcode));
void *
mainloop_get_child_userdata(mainloop_child_t *child);
int
mainloop_get_child_timeout(mainloop_child_t *child);
pid_t
mainloop_get_child_pid(mainloop_child_t *child);
void
mainloop_clear_child_userdata(mainloop_child_t *child);
#define G_PRIORITY_MEDIUM (G_PRIORITY_HIGH/2)
#endif
diff --git a/lib/cib/cib_remote.c b/lib/cib/cib_remote.c
index a19da594b0..0c19724e36 100644
--- a/lib/cib/cib_remote.c
+++ b/lib/cib/cib_remote.c
@@ -1,619 +1,619 @@
/*
* Copyright (c) 2008 Andrew Beekhof
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
- *
+ *
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
- *
+ *
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#include <crm_internal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <netdb.h>
#include <termios.h>
#include <sys/socket.h>
#include <glib.h>
#include <crm/crm.h>
#include <crm/cib/internal.h>
#include <crm/msg_xml.h>
-#include <crm/common/ipc.h>
+#include <crm/common/ipcs.h>
#include <crm/common/mainloop.h>
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include <gnutls/gnutls.h>
gnutls_anon_client_credentials anon_cred_c;
#define DEFAULT_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */
const int kx_prio[] = {
GNUTLS_KX_ANON_DH,
0
};
static gboolean remote_gnutls_credentials_init = FALSE;
#else
typedef void gnutls_session;
#endif
#include <arpa/inet.h>
#include <sgtty.h>
#define DH_BITS 1024
typedef struct cib_remote_opaque_s {
int flags;
int socket;
int port;
char *server;
char *user;
char *passwd;
gboolean encrypted;
crm_remote_t command;
crm_remote_t callback;
} cib_remote_opaque_t;
void cib_remote_connection_destroy(gpointer user_data);
int cib_remote_callback_dispatch(gpointer user_data);
int cib_remote_command_dispatch(gpointer user_data);
int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type);
int cib_remote_signoff(cib_t * cib);
int cib_remote_free(cib_t * cib);
int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section,
xmlNode * data, xmlNode ** output_data, int call_options, const char *name);
static int
cib_remote_inputfd(cib_t * cib)
{
cib_remote_opaque_t *private = cib->variant_opaque;
return private->callback.tcp_socket;
}
static int
cib_remote_set_connection_dnotify(cib_t * cib, void (*dnotify) (gpointer user_data))
{
return -EPROTONOSUPPORT;
}
static int
cib_remote_register_notification(cib_t * cib, const char *callback, int enabled)
{
xmlNode *notify_msg = create_xml_node(NULL, "cib_command");
cib_remote_opaque_t *private = cib->variant_opaque;
crm_xml_add(notify_msg, F_CIB_OPERATION, T_CIB_NOTIFY);
crm_xml_add(notify_msg, F_CIB_NOTIFY_TYPE, callback);
crm_xml_add_int(notify_msg, F_CIB_NOTIFY_ACTIVATE, enabled);
crm_remote_send(&private->callback, notify_msg);
free_xml(notify_msg);
return pcmk_ok;
}
cib_t *
cib_remote_new(const char *server, const char *user, const char *passwd, int port,
gboolean encrypted)
{
cib_remote_opaque_t *private = NULL;
cib_t *cib = cib_new_variant();
private = calloc(1, sizeof(cib_remote_opaque_t));
cib->variant = cib_remote;
cib->variant_opaque = private;
if (server) {
private->server = strdup(server);
}
if (user) {
private->user = strdup(user);
}
if (passwd) {
private->passwd = strdup(passwd);
}
private->port = port;
private->encrypted = encrypted;
/* assign variant specific ops */
cib->delegate_fn = cib_remote_perform_op;
cib->cmds->signon = cib_remote_signon;
cib->cmds->signoff = cib_remote_signoff;
cib->cmds->free = cib_remote_free;
cib->cmds->inputfd = cib_remote_inputfd;
cib->cmds->register_notification = cib_remote_register_notification;
cib->cmds->set_connection_dnotify = cib_remote_set_connection_dnotify;
return cib;
}
static int
cib_tls_close(cib_t * cib)
{
cib_remote_opaque_t *private = cib->variant_opaque;
#ifdef HAVE_GNUTLS_GNUTLS_H
if (private->encrypted) {
if (private->command.tls_session) {
gnutls_bye(*(private->command.tls_session), GNUTLS_SHUT_RDWR);
gnutls_deinit(*(private->command.tls_session));
gnutls_free(private->command.tls_session);
}
if (private->callback.tls_session) {
gnutls_bye(*(private->callback.tls_session), GNUTLS_SHUT_RDWR);
gnutls_deinit(*(private->callback.tls_session));
gnutls_free(private->callback.tls_session);
}
private->command.tls_session = NULL;
private->callback.tls_session = NULL;
if (remote_gnutls_credentials_init) {
gnutls_anon_free_client_credentials(anon_cred_c);
gnutls_global_deinit();
remote_gnutls_credentials_init = FALSE;
}
}
#endif
if (private->command.tcp_socket) {
shutdown(private->command.tcp_socket, SHUT_RDWR); /* no more receptions */
close(private->command.tcp_socket);
}
if (private->callback.tcp_socket) {
shutdown(private->callback.tcp_socket, SHUT_RDWR); /* no more receptions */
close(private->callback.tcp_socket);
}
private->command.tcp_socket = 0;
private->callback.tcp_socket = 0;
free(private->command.buffer);
free(private->callback.buffer);
private->command.buffer = NULL;
private->callback.buffer = NULL;
return 0;
}
static int
cib_tls_signon(cib_t * cib, crm_remote_t *connection, gboolean event_channel)
{
int sock;
cib_remote_opaque_t *private = cib->variant_opaque;
int rc = 0;
int disconnected = 0;
xmlNode *answer = NULL;
xmlNode *login = NULL;
static struct mainloop_fd_callbacks cib_fd_callbacks = { 0, };
cib_fd_callbacks.dispatch = event_channel ? cib_remote_callback_dispatch : cib_remote_command_dispatch;
cib_fd_callbacks.destroy = cib_remote_connection_destroy;
connection->tcp_socket = 0;
#ifdef HAVE_GNUTLS_GNUTLS_H
connection->tls_session = NULL;
#endif
sock = crm_remote_tcp_connect(private->server, private->port);
if (sock <= 0) {
crm_perror(LOG_ERR, "remote tcp connection to %s:%d failed", private->server, private->port);
}
connection->tcp_socket = sock;
if (private->encrypted) {
/* initialize GnuTls lib */
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote_gnutls_credentials_init == FALSE) {
gnutls_global_init();
gnutls_anon_allocate_client_credentials(&anon_cred_c);
remote_gnutls_credentials_init = TRUE;
}
/* bind the socket to GnuTls lib */
connection->tls_session = crm_create_anon_tls_session(sock, GNUTLS_CLIENT, anon_cred_c);
if (crm_initiate_client_tls_handshake(connection, DEFAULT_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
crm_err("Session creation for %s:%d failed", private->server, private->port);
gnutls_deinit(*connection->tls_session);
gnutls_free(connection->tls_session);
connection->tls_session = NULL;
cib_tls_close(cib);
return -1;
}
#else
return -EPROTONOSUPPORT;
#endif
}
/* login to server */
login = create_xml_node(NULL, "cib_command");
crm_xml_add(login, "op", "authenticate");
crm_xml_add(login, "user", private->user);
crm_xml_add(login, "password", private->passwd);
crm_xml_add(login, "hidden", "password");
crm_remote_send(connection, login);
free_xml(login);
crm_remote_recv(connection, -1, &disconnected);
if (disconnected) {
rc = -ENOTCONN;
}
answer = crm_remote_parse_buffer(connection);
crm_log_xml_trace(answer, "Reply");
if (answer == NULL) {
rc = -EPROTO;
} else {
/* grab the token */
const char *msg_type = crm_element_value(answer, F_CIB_OPERATION);
const char *tmp_ticket = crm_element_value(answer, F_CIB_CLIENTID);
if (safe_str_neq(msg_type, CRM_OP_REGISTER)) {
crm_err("Invalid registration message: %s", msg_type);
rc = -EPROTO;
} else if (tmp_ticket == NULL) {
rc = -EPROTO;
} else {
connection->token = strdup(tmp_ticket);
}
}
free_xml(answer);
answer = NULL;
if (rc != 0) {
cib_tls_close(cib);
return rc;
}
crm_trace("remote client connection established");
connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH, connection->tcp_socket, cib, &cib_fd_callbacks);
return rc;
}
void
cib_remote_connection_destroy(gpointer user_data)
{
crm_err("Connection destroyed");
#ifdef HAVE_GNUTLS_GNUTLS_H
cib_tls_close(user_data);
#endif
return;
}
int
cib_remote_command_dispatch(gpointer user_data)
{
int disconnected = 0;
cib_t *cib = user_data;
cib_remote_opaque_t *private = cib->variant_opaque;
crm_remote_recv(&private->command, -1, &disconnected);
free(private->command.buffer);
private->command.buffer = NULL;
crm_err("received late reply for remote cib connection, discarding");
if (disconnected) {
return -1;
}
return 0;
}
int
cib_remote_callback_dispatch(gpointer user_data)
{
cib_t *cib = user_data;
cib_remote_opaque_t *private = cib->variant_opaque;
xmlNode *msg = NULL;
int disconnected = 0;
crm_info("Message on callback channel");
crm_remote_recv(&private->callback, -1, &disconnected);
msg = crm_remote_parse_buffer(&private->callback);
while (msg) {
const char *type = crm_element_value(msg, F_TYPE);
crm_trace("Activating %s callbacks...", type);
if (safe_str_eq(type, T_CIB)) {
cib_native_callback(cib, msg, 0, 0);
} else if (safe_str_eq(type, T_CIB_NOTIFY)) {
g_list_foreach(cib->notify_list, cib_native_notify, msg);
} else {
crm_err("Unknown message type: %s", type);
}
free_xml(msg);
msg = crm_remote_parse_buffer(&private->callback);
}
if (disconnected) {
return -1;
}
return 0;
}
int
cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type)
{
int rc = pcmk_ok;
cib_remote_opaque_t *private = cib->variant_opaque;
if (private->passwd == NULL) {
struct termios settings;
int rc;
rc = tcgetattr(0, &settings);
settings.c_lflag &= ~ECHO;
rc = tcsetattr(0, TCSANOW, &settings);
fprintf(stderr, "Password: ");
private->passwd = calloc(1, 1024);
rc = scanf("%s", private->passwd);
fprintf(stdout, "\n");
/* fprintf(stderr, "entered: '%s'\n", buffer); */
if (rc < 1) {
private->passwd = NULL;
}
settings.c_lflag |= ECHO;
rc = tcsetattr(0, TCSANOW, &settings);
}
if (private->server == NULL || private->user == NULL) {
rc = -EINVAL;
}
if (rc == pcmk_ok) {
rc = cib_tls_signon(cib, &(private->command), FALSE);
}
if (rc == pcmk_ok) {
rc = cib_tls_signon(cib, &(private->callback), TRUE);
}
if (rc == pcmk_ok) {
xmlNode *hello =
cib_create_op(0, private->callback.token, CRM_OP_REGISTER, NULL, NULL, NULL, 0, NULL);
crm_xml_add(hello, F_CIB_CLIENTNAME, name);
crm_remote_send(&private->command, hello);
free_xml(hello);
}
if (rc == pcmk_ok) {
crm_notice("%s: Opened connection to %s:%d\n", name, private->server, private->port);
cib->state = cib_connected_command;
cib->type = cib_command;
} else {
fprintf(stderr, "%s: Connection to %s:%d failed: %s\n",
name, private->server, private->port, pcmk_strerror(rc));
}
return rc;
}
int
cib_remote_signoff(cib_t * cib)
{
int rc = pcmk_ok;
/* cib_remote_opaque_t *private = cib->variant_opaque; */
crm_debug("Signing out of the CIB Service");
#ifdef HAVE_GNUTLS_GNUTLS_H
cib_tls_close(cib);
#endif
cib->state = cib_disconnected;
cib->type = cib_none;
return rc;
}
int
cib_remote_free(cib_t * cib)
{
int rc = pcmk_ok;
crm_warn("Freeing CIB");
if (cib->state != cib_disconnected) {
rc = cib_remote_signoff(cib);
if (rc == pcmk_ok) {
cib_remote_opaque_t *private = cib->variant_opaque;
free(private->server);
free(private->user);
free(private->passwd);
free(cib->cmds);
free(private);
free(cib);
}
}
return rc;
}
int
cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section,
xmlNode * data, xmlNode ** output_data, int call_options, const char *name)
{
int rc = pcmk_ok;
int disconnected = 0;
int remaining_time = 0;
time_t start_time;
xmlNode *op_msg = NULL;
xmlNode *op_reply = NULL;
cib_remote_opaque_t *private = cib->variant_opaque;
if (cib->state == cib_disconnected) {
return -ENOTCONN;
}
if (output_data != NULL) {
*output_data = NULL;
}
if (op == NULL) {
crm_err("No operation specified");
return -EINVAL;
}
cib->call_id++;
/* prevent call_id from being negative (or zero) and conflicting
* with the cib_errors enum
* use 2 because we use it as (cib->call_id - 1) below
*/
if (cib->call_id < 1) {
cib->call_id = 1;
}
op_msg =
cib_create_op(cib->call_id, private->callback.token, op, host, section, data, call_options,
NULL);
if (op_msg == NULL) {
return -EPROTO;
}
crm_trace("Sending %s message to CIB service", op);
if (!(call_options & cib_sync_call)) {
crm_remote_send(&private->callback, op_msg);
} else {
crm_remote_send(&private->command, op_msg);
}
free_xml(op_msg);
if ((call_options & cib_discard_reply)) {
crm_trace("Discarding reply");
return pcmk_ok;
} else if (!(call_options & cib_sync_call)) {
return cib->call_id;
}
crm_trace("Waiting for a syncronous reply");
start_time = time(NULL);
remaining_time = cib->call_timeout ? cib->call_timeout : 60;
while (remaining_time > 0 && !disconnected) {
int reply_id = -1;
int msg_id = cib->call_id;
crm_remote_recv(&private->command, remaining_time * 1000, &disconnected);
op_reply = crm_remote_parse_buffer(&private->command);
if (!op_reply) {
break;
}
crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id);
if (reply_id == msg_id) {
break;
} else if (reply_id < msg_id) {
crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id);
crm_log_xml_trace(op_reply, "Old reply");
} else if ((reply_id - 10000) > msg_id) {
/* wrap-around case */
crm_debug("Received old reply: %d (wanted %d)", reply_id, msg_id);
crm_log_xml_trace(op_reply, "Old reply");
} else {
crm_err("Received a __future__ reply:" " %d (wanted %d)", reply_id, msg_id);
}
free_xml(op_reply);
op_reply = NULL;
/* wasn't the right reply, try and read some more */
remaining_time = time(NULL) - start_time;
}
/* if(IPC_ISRCONN(native->command_channel) == FALSE) { */
/* crm_err("CIB disconnected: %d", */
/* native->command_channel->ch_status); */
/* cib->state = cib_disconnected; */
/* } */
if (disconnected) {
crm_err("Disconnected while waiting for reply.");
return -ENOTCONN;
} else if (op_reply == NULL) {
crm_err("No reply message - empty");
return -ENOMSG;
}
crm_trace("Syncronous reply received");
/* Start processing the reply... */
if (crm_element_value_int(op_reply, F_CIB_RC, &rc) != 0) {
rc = -EPROTO;
}
if (rc == -pcmk_err_diff_resync) {
/* This is an internal value that clients do not and should not care about */
rc = pcmk_ok;
}
if (rc == pcmk_ok || rc == -EPERM) {
crm_log_xml_debug(op_reply, "passed");
} else {
/* } else if(rc == -ETIME) { */
crm_err("Call failed: %s", pcmk_strerror(rc));
crm_log_xml_warn(op_reply, "failed");
}
if (output_data == NULL) {
/* do nothing more */
} else if (!(call_options & cib_discard_reply)) {
xmlNode *tmp = get_message_xml(op_reply, F_CIB_CALLDATA);
if (tmp == NULL) {
crm_trace("No output in reply to \"%s\" command %d", op, cib->call_id - 1);
} else {
*output_data = copy_xml(tmp);
}
}
free_xml(op_reply);
return rc;
}
diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c
index 887beb9355..1fa58e5d21 100644
--- a/lib/common/mainloop.c
+++ b/lib/common/mainloop.c
@@ -1,879 +1,879 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
- *
+ *
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
- *
+ *
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>
#include <crm/crm.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
-#include <crm/common/ipc.h>
+#include <crm/common/ipcs.h>
struct mainloop_child_s {
pid_t pid;
char *desc;
unsigned timerid;
unsigned watchid;
gboolean timeout;
void *privatedata;
/* Called when a process dies */
void (*callback)(mainloop_child_t* p, int status, int signo, int exitcode);
};
struct trigger_s {
GSource source;
gboolean running;
gboolean trigger;
void *user_data;
guint id;
};
static gboolean
crm_trigger_prepare(GSource * source, gint * timeout)
{
crm_trigger_t *trig = (crm_trigger_t *) source;
/* cluster-glue's FD and IPC related sources make use of
* g_source_add_poll() but do not set a timeout in their prepare
* functions
*
* This means mainloop's poll() will block until an event for one
* of these sources occurs - any /other/ type of source, such as
* this one or g_idle_*, that doesn't use g_source_add_poll() is
* S-O-L and wont be processed until there is something fd-based
* happens.
*
* Luckily the timeout we can set here affects all sources and
* puts an upper limit on how long poll() can take.
*
* So unconditionally set a small-ish timeout, not too small that
* we're in constant motion, which will act as an upper bound on
* how long the signal handling might be delayed for.
*/
*timeout = 500; /* Timeout in ms */
return trig->trigger;
}
static gboolean
crm_trigger_check(GSource * source)
{
crm_trigger_t *trig = (crm_trigger_t *) source;
return trig->trigger;
}
static gboolean
crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
{
int rc = TRUE;
crm_trigger_t *trig = (crm_trigger_t *) source;
if(trig->running) {
/* Wait until the existing job is complete before starting the next one */
return TRUE;
}
trig->trigger = FALSE;
if (callback) {
rc = callback(trig->user_data);
if(rc < 0) {
crm_trace("Trigger handler %p not yet complete", trig);
trig->running = TRUE;
rc = TRUE;
}
}
return rc;
}
static GSourceFuncs crm_trigger_funcs = {
crm_trigger_prepare,
crm_trigger_check,
crm_trigger_dispatch,
NULL
};
static crm_trigger_t *
mainloop_setup_trigger(GSource * source, int priority, int(*dispatch) (gpointer user_data),
gpointer userdata)
{
crm_trigger_t *trigger = NULL;
trigger = (crm_trigger_t *) source;
trigger->id = 0;
trigger->trigger = FALSE;
trigger->user_data = userdata;
if (dispatch) {
g_source_set_callback(source, dispatch, trigger, NULL);
}
g_source_set_priority(source, priority);
g_source_set_can_recurse(source, FALSE);
trigger->id = g_source_attach(source, NULL);
return trigger;
}
void
-mainloop_trigger_complete(crm_trigger_t *trig)
+mainloop_trigger_complete(crm_trigger_t *trig)
{
crm_trace("Trigger handler %p complete", trig);
trig->running = FALSE;
}
/* If dispatch returns:
* -1: Job running but not complete
* 0: Remove the trigger from mainloop
* 1: Leave the trigger in mainloop
*/
crm_trigger_t *
mainloop_add_trigger(int priority, int(*dispatch) (gpointer user_data), gpointer userdata)
{
GSource *source = NULL;
CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
CRM_ASSERT(source != NULL);
return mainloop_setup_trigger(source, priority, dispatch, userdata);
}
void
mainloop_set_trigger(crm_trigger_t * source)
{
source->trigger = TRUE;
}
gboolean
mainloop_destroy_trigger(crm_trigger_t * source)
{
source->trigger = FALSE;
if (source->id > 0) {
g_source_remove(source->id);
source->id = 0;
}
return TRUE;
}
typedef struct signal_s {
crm_trigger_t trigger; /* must be first */
void (*handler) (int sig);
int signal;
} crm_signal_t;
static crm_signal_t *crm_signals[NSIG];
static gboolean
crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
{
crm_signal_t *sig = (crm_signal_t *) source;
crm_info("Invoking handler for signal %d: %s", sig->signal, strsignal(sig->signal));
sig->trigger.trigger = FALSE;
if (sig->handler) {
sig->handler(sig->signal);
}
return TRUE;
}
static void
mainloop_signal_handler(int sig)
{
if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
}
}
static GSourceFuncs crm_signal_funcs = {
crm_trigger_prepare,
crm_trigger_check,
crm_signal_dispatch,
NULL
};
gboolean
crm_signal(int sig, void (*dispatch) (int sig))
{
sigset_t mask;
struct sigaction sa;
struct sigaction old;
if (sigemptyset(&mask) < 0) {
crm_perror(LOG_ERR, "Call to sigemptyset failed");
return FALSE;
}
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = dispatch;
sa.sa_flags = SA_RESTART;
sa.sa_mask = mask;
if (sigaction(sig, &sa, &old) < 0) {
crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
return FALSE;
}
return TRUE;
}
gboolean
mainloop_add_signal(int sig, void (*dispatch) (int sig))
{
GSource *source = NULL;
int priority = G_PRIORITY_HIGH - 1;
if (sig == SIGTERM) {
/* TERM is higher priority than other signals,
* signals are higher priority than other ipc.
* Yes, minus: smaller is "higher"
*/
priority--;
}
if (sig >= NSIG || sig < 0) {
crm_err("Signal %d is out of range", sig);
return FALSE;
} else if (crm_signals[sig] != NULL
&& crm_signals[sig]->handler == dispatch) {
crm_trace("Signal handler for %d is already installed", sig);
return TRUE;
} else if (crm_signals[sig] != NULL) {
crm_err("Different signal handler for %d is already installed", sig);
return FALSE;
}
CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
CRM_ASSERT(crm_signals[sig] != NULL);
crm_signals[sig]->handler = dispatch;
crm_signals[sig]->signal = sig;
if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
crm_signal_t *tmp = crm_signals[sig];
crm_signals[sig] = NULL;
mainloop_destroy_trigger((crm_trigger_t *) tmp);
return FALSE;
}
#if 0
/* If we want signals to interrupt mainloop's poll(), instead of waiting for
* the timeout, then we should call siginterrupt() below
*
* For now, just enforce a low timeout
*/
if (siginterrupt(sig, 1) < 0) {
crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
}
#endif
return TRUE;
}
gboolean
mainloop_destroy_signal(int sig)
{
crm_signal_t *tmp = NULL;
if (sig >= NSIG || sig < 0) {
crm_err("Signal %d is out of range", sig);
return FALSE;
} else if (crm_signal(sig, NULL) == FALSE) {
crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
return FALSE;
} else if (crm_signals[sig] == NULL) {
return TRUE;
}
tmp = crm_signals[sig];
crm_signals[sig] = NULL;
mainloop_destroy_trigger((crm_trigger_t *) tmp);
return TRUE;
}
static qb_array_t *gio_map = NULL;
/*
* libqb...
*/
struct gio_to_qb_poll {
int32_t is_used;
GIOChannel *channel;
guint source;
int32_t events;
void * data;
qb_ipcs_dispatch_fn_t fn;
enum qb_loop_priority p;
};
static int
gio_adapter_refcount(struct gio_to_qb_poll *adaptor)
{
/* This is evil
* Looking at the giochannel header file, ref_count is the first member of channel
* So cheat...
*/
if(adaptor && adaptor->channel) {
int *ref = (void*)adaptor->channel;
return *ref;
}
return 0;
}
static gboolean
gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data)
{
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
gint fd = g_io_channel_unix_get_fd(gio);
crm_trace("%p.%d %d (ref=%d)", data, fd, condition, gio_adapter_refcount(adaptor));
if(condition & G_IO_NVAL) {
crm_trace("Marking failed adaptor %p unused", adaptor);
adaptor->is_used = QB_FALSE;
}
return (adaptor->fn(fd, condition, adaptor->data) == 0);
}
static void
-gio_poll_destroy(gpointer data)
+gio_poll_destroy(gpointer data)
{
/* adaptor->source is valid but about to be destroyed (ref_count == 0) in gmain.c
* adaptor->channel will still have ref_count > 0... should be == 1
*/
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
crm_trace("Destroying adaptor %p channel %p (ref=%d)", adaptor, adaptor->channel, gio_adapter_refcount(adaptor));
adaptor->is_used = QB_FALSE;
adaptor->channel = NULL;
adaptor->source = 0;
}
static int32_t
gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
struct gio_to_qb_poll *adaptor;
GIOChannel *channel;
int32_t res = 0;
res = qb_array_index(gio_map, fd, (void**)&adaptor);
if (res < 0) {
crm_err("Array lookup failed for fd=%d: %d", fd, res);
return res;
}
crm_trace("Adding fd=%d to mainloop as adapater %p", fd, adaptor);
if (adaptor->is_used) {
crm_err("Adapter for descriptor %d is still in-use", fd);
return -EEXIST;
}
/* channel is created with ref_count = 1 */
channel = g_io_channel_unix_new(fd);
if (!channel) {
crm_err("No memory left to add fd=%d", fd);
return -ENOMEM;
}
/* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
evts |= (G_IO_HUP|G_IO_NVAL|G_IO_ERR);
adaptor->channel = channel;
adaptor->fn = fn;
adaptor->events = evts;
adaptor->data = data;
adaptor->p = p;
adaptor->is_used = QB_TRUE;
adaptor->source = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_poll_destroy);
/* Now that mainloop now holds a reference to adaptor->channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
*
* This means that adaptor->channel will be free'd by:
* g_main_context_dispatch()
* -> g_source_destroy_internal()
* -> g_source_callback_unref()
* shortly after gio_poll_destroy() completes
*/
- g_io_channel_unref(adaptor->channel);
+ g_io_channel_unref(adaptor->channel);
crm_trace("Added to mainloop with gsource id=%d, ref=%d", adaptor->source, gio_adapter_refcount(adaptor));
if(adaptor->source > 0) {
return 0;
}
-
+
return -EINVAL;
}
static int32_t
gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
return 0;
}
static int32_t
gio_poll_dispatch_del(int32_t fd)
{
struct gio_to_qb_poll *adaptor;
crm_trace("Looking for fd=%d", fd);
if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) {
crm_trace("Marking adaptor %p unused (ref=%d)", adaptor, gio_adapter_refcount(adaptor));
adaptor->is_used = QB_FALSE;
}
return 0;
}
struct qb_ipcs_poll_handlers gio_poll_funcs = {
.job_add = NULL,
.dispatch_add = gio_poll_dispatch_add,
.dispatch_mod = gio_poll_dispatch_mod,
.dispatch_del = gio_poll_dispatch_del,
};
static enum qb_ipc_type
pick_ipc_type(enum qb_ipc_type requested)
{
const char *env = getenv("PCMK_ipc_type");
if(env && strcmp("shared-mem", env) == 0) {
return QB_IPC_SHM;
} else if(env && strcmp("socket", env) == 0) {
return QB_IPC_SOCKET;
} else if(env && strcmp("posix", env) == 0) {
return QB_IPC_POSIX_MQ;
} else if(env && strcmp("sysv", env) == 0) {
return QB_IPC_SYSV_MQ;
} else if(requested == QB_IPC_NATIVE) {
/* We prefer shared memory because the server never blocks on
* send. If part of a message fits into the socket, libqb
* needs to block until the remainder can be sent also.
* Otherwise the client will wait forever for the remaining
* bytes.
*/
return QB_IPC_SHM;
}
return requested;
}
qb_ipcs_service_t *mainloop_add_ipc_server(
- const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
+ const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
{
int rc = 0;
qb_ipcs_service_t* server = NULL;
if(gio_map == NULL) {
gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
}
crm_client_init();
server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
rc = qb_ipcs_run(server);
if (rc < 0) {
crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
return NULL;
}
return server;
}
-void mainloop_del_ipc_server(qb_ipcs_service_t *server)
+void mainloop_del_ipc_server(qb_ipcs_service_t *server)
{
if(server) {
qb_ipcs_destroy(server);
}
}
struct mainloop_io_s
{
char *name;
void *userdata;
guint source;
crm_ipc_t *ipc;
GIOChannel *channel;
int (*dispatch_fn_ipc)(const char *buffer, ssize_t length, gpointer userdata);
int (*dispatch_fn_io) (gpointer userdata);
void (*destroy_fn) (gpointer userdata);
};
static int
-mainloop_gio_refcount(mainloop_io_t *client)
+mainloop_gio_refcount(mainloop_io_t *client)
{
/* This is evil
* Looking at the giochannel header file, ref_count is the first member of channel
* So cheat...
*/
if(client && client->channel) {
int *ref = (void*)client->channel;
return *ref;
}
return 0;
}
static gboolean
mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
{
gboolean keep = TRUE;
mainloop_io_t *client = data;
if(condition & G_IO_IN) {
if(client->ipc) {
long rc = 0;
int max = 10;
do {
rc = crm_ipc_read(client->ipc);
if(rc <= 0) {
crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
client->name, client, pcmk_strerror(rc), rc);
} else if(client->dispatch_fn_ipc) {
const char *buffer = crm_ipc_buffer(client->ipc);
crm_trace("New message from %s[%p] = %d", client->name, client, rc, condition);
if(client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
crm_trace("Connection to %s no longer required", client->name);
keep = FALSE;
}
}
} while(keep && rc > 0 && --max > 0);
} else {
crm_trace("New message from %s[%p]", client->name, client);
if(client->dispatch_fn_io) {
if(client->dispatch_fn_io(client->userdata) < 0) {
crm_trace("Connection to %s no longer required", client->name);
keep = FALSE;
}
}
}
}
if(client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
keep = FALSE;
} else if(condition & (G_IO_HUP|G_IO_NVAL|G_IO_ERR)) {
crm_trace("The connection %s[%p] has been closed (I/O condition=%d, refcount=%d)",
client->name, client, condition, mainloop_gio_refcount(client));
keep = FALSE;
} else if((condition & G_IO_IN) == 0) {
/*
#define GLIB_SYSDEF_POLLIN =1
#define GLIB_SYSDEF_POLLPRI =2
#define GLIB_SYSDEF_POLLOUT =4
#define GLIB_SYSDEF_POLLERR =8
#define GLIB_SYSDEF_POLLHUP =16
#define GLIB_SYSDEF_POLLNVAL =32
typedef enum
{
G_IO_IN GLIB_SYSDEF_POLLIN,
G_IO_OUT GLIB_SYSDEF_POLLOUT,
G_IO_PRI GLIB_SYSDEF_POLLPRI,
G_IO_ERR GLIB_SYSDEF_POLLERR,
G_IO_HUP GLIB_SYSDEF_POLLHUP,
G_IO_NVAL GLIB_SYSDEF_POLLNVAL
} GIOCondition;
A bitwise combination representing a condition to watch for on an event source.
G_IO_IN There is data to read.
G_IO_OUT Data can be written (without blocking).
G_IO_PRI There is urgent data to read.
G_IO_ERR Error condition.
G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
- G_IO_NVAL Invalid request. The file descriptor is not open.
+ G_IO_NVAL Invalid request. The file descriptor is not open.
*/
crm_err("Strange condition: %d", condition);
}
/* keep == FALSE results in mainloop_gio_destroy() being called
* just before the source is removed from mainloop
*/
return keep;
}
static void
mainloop_gio_destroy(gpointer c)
{
mainloop_io_t *client = c;
/* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
* client->channel will still have ref_count > 0... should be == 1
*/
crm_trace("Destroying client %s[%p] %d", client->name, c, mainloop_gio_refcount(client));
if(client->ipc) {
crm_ipc_close(client->ipc);
}
if(client->destroy_fn) {
client->destroy_fn(client->userdata);
}
-
+
if(client->ipc) {
crm_ipc_destroy(client->ipc);
}
crm_trace("Destroyed client %s[%p] %d", client->name, c, mainloop_gio_refcount(client));
free(client->name);
memset(client, 0, sizeof(mainloop_io_t)); /* A bit of pointless paranoia */
free(client);
}
mainloop_io_t *
mainloop_add_ipc_client(
- const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
+ const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
{
mainloop_io_t *client = NULL;
crm_ipc_t *conn = crm_ipc_new(name, max_size);
if(conn && crm_ipc_connect(conn)) {
int32_t fd = crm_ipc_get_fd(conn);
client = mainloop_add_fd(name, priority, fd, userdata, NULL);
client->ipc = conn;
client->destroy_fn = callbacks->destroy;
client->dispatch_fn_ipc = callbacks->dispatch;
}
if(conn && client == NULL) {
crm_trace("Connection to %s failed", name);
crm_ipc_close(conn);
crm_ipc_destroy(conn);
}
-
+
return client;
}
void
mainloop_del_ipc_client(mainloop_io_t *client)
{
mainloop_del_fd(client);
}
crm_ipc_t *
mainloop_get_ipc_client(mainloop_io_t *client)
{
if(client) {
return client->ipc;
}
return NULL;
}
mainloop_io_t *
mainloop_add_fd(
- const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
+ const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
{
mainloop_io_t *client = NULL;
if(fd > 0) {
- client = calloc(1, sizeof(mainloop_io_t));
+ client = calloc(1, sizeof(mainloop_io_t));
client->name = strdup(name);
client->userdata = userdata;
if(callbacks) {
client->destroy_fn = callbacks->destroy;
client->dispatch_fn_io = callbacks->dispatch;
}
client->channel = g_io_channel_unix_new(fd);
client->source = g_io_add_watch_full(
client->channel, priority, (G_IO_IN|G_IO_HUP|G_IO_NVAL|G_IO_ERR),
mainloop_gio_callback, client, mainloop_gio_destroy);
/* Now that mainloop now holds a reference to adaptor->channel,
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
*
* This means that adaptor->channel will be free'd by:
* g_main_context_dispatch() or g_source_remove()
* -> g_source_destroy_internal()
* -> g_source_callback_unref()
* shortly after mainloop_gio_destroy() completes
*/
g_io_channel_unref(client->channel);
crm_trace("Added connection %d for %s[%p].%d %d", client->source, client->name, client, fd, mainloop_gio_refcount(client));
}
return client;
}
void
mainloop_del_fd(mainloop_io_t *client)
{
if(client != NULL) {
crm_trace("Removing client %s[%p] %d", client->name, client, mainloop_gio_refcount(client));
if (client->source) {
/* Results in mainloop_gio_destroy() being called just
* before the source is removed from mainloop
*/
g_source_remove(client->source);
}
}
}
pid_t
mainloop_get_child_pid(mainloop_child_t *child)
{
return child->pid;
}
int
mainloop_get_child_timeout(mainloop_child_t *child)
{
return child->timeout;
}
void *
mainloop_get_child_userdata(mainloop_child_t *child)
{
return child->privatedata;
}
void
mainloop_clear_child_userdata(mainloop_child_t *child)
{
child->privatedata = NULL;
}
static gboolean
child_timeout_callback(gpointer p)
{
mainloop_child_t *child = p;
child->timerid = 0;
if (child->timeout) {
crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
return FALSE;
}
child->timeout = TRUE;
crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
if (kill(child->pid, SIGKILL) < 0) {
if (errno == ESRCH) {
/* Nothing left to do */
return FALSE;
}
crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
}
child->timerid = g_timeout_add(5000, child_timeout_callback, child);
return FALSE;
}
static void
mainloop_child_destroy(mainloop_child_t *child)
{
if (child->timerid != 0) {
crm_trace("Removing timer %d", child->timerid);
g_source_remove(child->timerid);
child->timerid = 0;
}
free(child->desc);
g_free(child);
}
static void
child_death_dispatch(GPid pid, gint status, gpointer user_data)
{
int signo = 0;
int exitcode = 0;
mainloop_child_t *child = user_data;
crm_trace("Managed process %d exited: %p", pid, child);
if (WIFEXITED(status)) {
exitcode = WEXITSTATUS(status);
crm_trace("Managed process %d (%s) exited with rc=%d", pid,
child->desc, exitcode);
} else if (WIFSIGNALED(status)) {
signo = WTERMSIG(status);
crm_trace("Managed process %d (%s) exited with signal=%d", pid,
child->desc, signo);
}
#ifdef WCOREDUMP
if (WCOREDUMP(status)) {
crm_err("Managed process %d (%s) dumped core", pid, child->desc);
}
#endif
if (child->callback) {
child->callback(child, status, signo, exitcode);
}
crm_trace("Removed process entry for %d", pid);
mainloop_child_destroy(child);
return;
}
/* Create/Log a new tracked process
* To track a process group, use -pid
*/
void
mainloop_add_child(pid_t pid, int timeout, const char *desc, void * privatedata,
void (*callback)(mainloop_child_t *p, int status, int signo, int exitcode))
{
mainloop_child_t *child = g_new(mainloop_child_t, 1);
child->pid = pid;
child->timerid = 0;
child->timeout = FALSE;
child->desc = strdup(desc);
child->privatedata = privatedata;
child->callback = callback;
if (timeout) {
child->timerid = g_timeout_add(
timeout, child_timeout_callback, child);
}
child->watchid = g_child_watch_add(pid, child_death_dispatch, child);
}
diff --git a/lib/lrmd/lrmd_client.c b/lib/lrmd/lrmd_client.c
index 1f739d418b..6417875e9f 100644
--- a/lib/lrmd/lrmd_client.c
+++ b/lib/lrmd/lrmd_client.c
@@ -1,1898 +1,1899 @@
/*
* Copyright (c) 2012 David Vossel <dvossel@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
- *
+ *
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
- *
+ *
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#include <crm_internal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <glib.h>
#include <dirent.h>
#include <crm/crm.h>
#include <crm/lrmd.h>
#include <crm/services.h>
#include <crm/common/mainloop.h>
+#include <crm/common/ipcs.h>
#include <crm/msg_xml.h>
#include <crm/stonith-ng.h>
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include <gnutls/gnutls.h>
#endif
#include <sys/socket.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <netdb.h>
CRM_TRACE_INIT_DATA(lrmd);
static stonith_t *stonith_api = NULL;
static int lrmd_api_disconnect(lrmd_t *lrmd);
static int lrmd_api_is_connected(lrmd_t *lrmd);
#ifdef HAVE_GNUTLS_GNUTLS_H
#define LRMD_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */
gnutls_psk_client_credentials_t psk_cred_s;
int lrmd_tls_set_key(gnutls_datum_t *key, const char *location);
static void lrmd_tls_disconnect(lrmd_t *lrmd);
static int global_remote_msg_id = 0;
int lrmd_tls_send_msg(crm_remote_t *session, xmlNode *msg, uint32_t id, const char *msg_type);
static void lrmd_tls_connection_destroy(gpointer userdata);
#endif
typedef struct lrmd_private_s {
enum client_type type;
char *token;
mainloop_io_t *source;
/* IPC parameters */
crm_ipc_t *ipc;
crm_remote_t *remote;
/* Extra TLS parameters */
char *remote_nodename;
#ifdef HAVE_GNUTLS_GNUTLS_H
char *server;
int port;
gnutls_psk_client_credentials_t psk_cred_c;
int sock;
GList *pending_notify;
crm_trigger_t *process_notify;
#endif
lrmd_event_callback callback;
} lrmd_private_t;
static lrmd_list_t *
lrmd_list_add(lrmd_list_t * head, const char *value)
{
lrmd_list_t *p, *end;
p = calloc(1, sizeof(lrmd_list_t));
p->val = strdup(value);
end = head;
while (end && end->next) {
end = end->next;
}
if (end) {
end->next = p;
} else {
head = p;
}
return head;
}
void
lrmd_list_freeall(lrmd_list_t * head)
{
lrmd_list_t *p;
while (head) {
char *val = (char *)head->val;
p = head->next;
free(val);
free(head);
head = p;
}
}
lrmd_key_value_t *
lrmd_key_value_add(lrmd_key_value_t * head, const char *key, const char *value)
{
lrmd_key_value_t *p, *end;
p = calloc(1, sizeof(lrmd_key_value_t));
p->key = strdup(key);
p->value = strdup(value);
end = head;
while (end && end->next) {
end = end->next;
}
if (end) {
end->next = p;
} else {
head = p;
}
return head;
}
void
lrmd_key_value_freeall(lrmd_key_value_t * head)
{
lrmd_key_value_t *p;
while (head) {
p = head->next;
free(head->key);
free(head->value);
free(head);
head = p;
}
}
static void
dup_attr(gpointer key, gpointer value, gpointer user_data)
{
g_hash_table_replace(user_data, strdup(key), strdup(value));
}
lrmd_event_data_t *
lrmd_copy_event(lrmd_event_data_t * event)
{
lrmd_event_data_t *copy = NULL;
copy = calloc(1, sizeof(lrmd_event_data_t));
/* This will get all the int values.
* we just have to be careful not to leave any
* dangling pointers to strings. */
memcpy(copy, event, sizeof(lrmd_event_data_t));
copy->rsc_id = event->rsc_id ? strdup(event->rsc_id) : NULL;
copy->op_type = event->op_type ? strdup(event->op_type) : NULL;
copy->user_data = event->user_data ? strdup(event->user_data) : NULL;
copy->output = event->output ? strdup(event->output) : NULL;
copy->remote_nodename = event->remote_nodename ? strdup(event->remote_nodename) : NULL;
if (event->params) {
copy->params = g_hash_table_new_full(crm_str_hash,
g_str_equal, g_hash_destroy_str, g_hash_destroy_str);
if (copy->params != NULL) {
g_hash_table_foreach(event->params, dup_attr, copy->params);
}
}
return copy;
}
void
lrmd_free_event(lrmd_event_data_t * event)
{
if (!event) {
return;
}
/* free gives me grief if i try to cast */
free((char *)event->rsc_id);
free((char *)event->op_type);
free((char *)event->user_data);
free((char *)event->output);
free((char *)event->remote_nodename);
if (event->params) {
g_hash_table_destroy(event->params);
}
free(event);
}
static int
lrmd_dispatch_internal(lrmd_t *lrmd, xmlNode *msg)
{
const char *type;
lrmd_private_t *native = lrmd->private;
lrmd_event_data_t event = { 0, };
if (!native->callback) {
/* no callback set */
crm_trace("notify event received but client has not set callback");
return 1;
}
event.remote_nodename = native->remote_nodename;
type = crm_element_value(msg, F_LRMD_OPERATION);
crm_element_value_int(msg, F_LRMD_CALLID, &event.call_id);
event.rsc_id = crm_element_value(msg, F_LRMD_RSC_ID);
if (crm_str_eq(type, LRMD_OP_RSC_REG, TRUE)) {
event.type = lrmd_event_register;
} else if (crm_str_eq(type, LRMD_OP_RSC_UNREG, TRUE)) {
event.type = lrmd_event_unregister;
} else if (crm_str_eq(type, LRMD_OP_RSC_EXEC, TRUE)) {
crm_element_value_int(msg, F_LRMD_TIMEOUT, &event.timeout);
crm_element_value_int(msg, F_LRMD_RSC_INTERVAL, &event.interval);
crm_element_value_int(msg, F_LRMD_RSC_START_DELAY, &event.start_delay);
crm_element_value_int(msg, F_LRMD_EXEC_RC, (int *)&event.rc);
crm_element_value_int(msg, F_LRMD_OP_STATUS, &event.op_status);
crm_element_value_int(msg, F_LRMD_RSC_DELETED, &event.rsc_deleted);
crm_element_value_int(msg, F_LRMD_RSC_RUN_TIME, (int *)&event.t_run);
crm_element_value_int(msg, F_LRMD_RSC_RCCHANGE_TIME, (int *)&event.t_rcchange);
crm_element_value_int(msg, F_LRMD_RSC_EXEC_TIME, (int *)&event.exec_time);
crm_element_value_int(msg, F_LRMD_RSC_QUEUE_TIME, (int *)&event.queue_time);
event.op_type = crm_element_value(msg, F_LRMD_RSC_ACTION);
event.user_data = crm_element_value(msg, F_LRMD_RSC_USERDATA_STR);
event.output = crm_element_value(msg, F_LRMD_RSC_OUTPUT);
event.type = lrmd_event_exec_complete;
event.params = xml2list(msg);
} else if (crm_str_eq(type, LRMD_OP_POKE, TRUE)) {
event.type = lrmd_event_poke;
} else {
return 1;
}
crm_trace("op %s notify event received", type);
native->callback(&event);
if (event.params) {
g_hash_table_destroy(event.params);
}
return 1;
}
static int
lrmd_ipc_dispatch(const char *buffer, ssize_t length, gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
xmlNode *msg;
int rc;
if (!native->callback) {
/* no callback set */
return 1;
}
msg = string2xml(buffer);
rc = lrmd_dispatch_internal(lrmd, msg);
free_xml(msg);
return rc;
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_free_xml(gpointer userdata)
{
free_xml((xmlNode *) userdata);
}
static int
lrmd_tls_connected(lrmd_t *lrmd)
{
lrmd_private_t *native = lrmd->private;
if (native->remote->tls_session) {
return TRUE;
}
return FALSE;
}
static int
lrmd_tls_dispatch(gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
xmlNode *xml = NULL;
int rc = 0;
int disconnected = 0;
if (lrmd_tls_connected(lrmd) == FALSE) {
crm_trace("tls dispatch triggered after disconnect");
return 0;
}
crm_trace("tls_dispatch triggered");
/* First check if there are any pending notifies to process that came
* while we were waiting for replies earlier. */
if (native->pending_notify) {
GList *iter = NULL;
crm_trace("Processing pending notifies");
for (iter = native->pending_notify; iter; iter = iter->next) {
lrmd_dispatch_internal(lrmd, iter->data);
}
g_list_free_full(native->pending_notify, lrmd_free_xml);
native->pending_notify = NULL;
}
/* Next read the current buffer and see if there are any messages to handle. */
rc = crm_remote_ready(native->remote, 0);
if (rc == 0) {
/* nothing to read, see if any full messages are already in buffer. */
xml = crm_remote_parse_buffer(native->remote);
} else if (rc < 0) {
disconnected = 1;
} else {
crm_remote_recv(native->remote, -1, &disconnected);
xml = crm_remote_parse_buffer(native->remote);
}
while (xml) {
lrmd_dispatch_internal(lrmd, xml);
free_xml(xml);
xml = crm_remote_parse_buffer(native->remote);
}
if (disconnected) {
crm_info("Server disconnected while reading remote server msg.");
lrmd_tls_disconnect(lrmd);
return 0;
}
return 1;
}
#endif
/* Not used with mainloop */
int lrmd_poll(lrmd_t *lrmd, int timeout)
{
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
return crm_ipc_ready(native->ipc);
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
if (native->pending_notify) {
return 1;
} else if (native->remote->buffer
&& strstr(native->remote->buffer, REMOTE_MSG_TERMINATOR)) {
return 1;
}
return crm_remote_ready(native->remote, 0);
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return 0;
}
/* Not used with mainloop */
bool
lrmd_dispatch(lrmd_t * lrmd)
{
lrmd_private_t *private = NULL;
CRM_ASSERT(lrmd != NULL);
private = lrmd->private;
switch (private->type) {
case CRM_CLIENT_IPC:
while (crm_ipc_ready(private->ipc)) {
if (crm_ipc_read(private->ipc) > 0) {
const char *msg = crm_ipc_buffer(private->ipc);
lrmd_ipc_dispatch(msg, strlen(msg), lrmd);
}
}
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
lrmd_tls_dispatch(lrmd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", private->type);
}
if (lrmd_api_is_connected(lrmd) == FALSE) {
crm_err("Connection closed");
return FALSE;
}
return TRUE;
}
static xmlNode *
lrmd_create_op(const char *token, const char *op, xmlNode * data, enum lrmd_call_options options)
{
xmlNode *op_msg = create_xml_node(NULL, "lrmd_command");
CRM_CHECK(op_msg != NULL, return NULL);
CRM_CHECK(token != NULL, return NULL);
crm_xml_add(op_msg, F_XML_TAGNAME, "lrmd_command");
crm_xml_add(op_msg, F_TYPE, T_LRMD);
crm_xml_add(op_msg, F_LRMD_CALLBACK_TOKEN, token);
crm_xml_add(op_msg, F_LRMD_OPERATION, op);
crm_trace("Sending call options: %.8lx, %d", (long)options, options);
crm_xml_add_int(op_msg, F_LRMD_CALLOPTS, options);
if (data != NULL) {
add_message_xml(op_msg, F_LRMD_CALLDATA, data);
}
return op_msg;
}
static void
lrmd_ipc_connection_destroy(gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
crm_info("IPC connection destroyed");
/* Prevent these from being cleaned up in lrmd_api_disconnect() */
native->ipc = NULL;
native->source = NULL;
if (native->callback) {
lrmd_event_data_t event = { 0, };
event.type = lrmd_event_disconnect;
event.remote_nodename = native->remote_nodename;
native->callback(&event);
}
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_tls_connection_destroy(gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
crm_info("TLS connection destroyed");
if (native->remote->tls_session) {
gnutls_bye(*native->remote->tls_session, GNUTLS_SHUT_RDWR);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
}
if (native->psk_cred_c) {
gnutls_psk_free_client_credentials(native->psk_cred_c);
}
if (native->sock) {
close(native->sock);
}
if (native->process_notify) {
mainloop_destroy_trigger(native->process_notify);
native->process_notify = NULL;
}
if (native->pending_notify) {
g_list_free_full(native->pending_notify, lrmd_free_xml);
native->pending_notify = NULL;
}
free(native->remote->buffer);
native->remote->buffer = NULL;
native->source = 0;
native->sock = 0;
native->psk_cred_c = NULL;
native->remote->tls_session = NULL;
native->sock = 0;
if (native->callback) {
lrmd_event_data_t event = { 0, };
event.remote_nodename = native->remote_nodename;
event.type = lrmd_event_disconnect;
native->callback(&event);
}
return;
}
int
lrmd_tls_send_msg(crm_remote_t *session, xmlNode *msg, uint32_t id, const char *msg_type)
{
int rc = -1;
crm_xml_add_int(msg, F_LRMD_REMOTE_MSG_ID, id);
crm_xml_add(msg, F_LRMD_REMOTE_MSG_TYPE, msg_type);
rc = crm_remote_send(session, msg);
if (rc < 0) {
crm_err("Failed to send remote lrmd tls msg, rc = %d" , rc);
return rc;
}
return rc;
}
static xmlNode *
lrmd_tls_recv_reply(lrmd_t *lrmd, int total_timeout, int expected_reply_id, int *disconnected)
{
lrmd_private_t *native = lrmd->private;
xmlNode *xml = NULL;
time_t start = time(NULL);
const char *msg_type = NULL;
int reply_id = 0;
int remaining_timeout = 0;
/* A timeout of 0 here makes no sense. We have to wait a period of time
* for the response to come back. If -1 or 0, default to 10 seconds. */
if (total_timeout <= 0) {
total_timeout = 10000;
}
while (!xml) {
xml = crm_remote_parse_buffer(native->remote);
if (!xml) {
/* read some more off the tls buffer if we still have time left. */
if (remaining_timeout) {
remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
} else {
remaining_timeout = total_timeout;
}
if (remaining_timeout <= 0) {
return NULL;
}
crm_remote_recv(native->remote, remaining_timeout, disconnected);
xml = crm_remote_parse_buffer(native->remote);
if (!xml || *disconnected) {
return NULL;
}
}
CRM_ASSERT(xml != NULL);
crm_element_value_int(xml, F_LRMD_REMOTE_MSG_ID, &reply_id);
msg_type = crm_element_value(xml, F_LRMD_REMOTE_MSG_TYPE);
if (!msg_type) {
crm_err("Empty msg type received while waiting for reply");
free_xml(xml);
xml = NULL;
} else if (safe_str_eq(msg_type, "notify")) {
/* got a notify while waiting for reply, trigger the notify to be processed later */
crm_info("queueing notify");
native->pending_notify = g_list_append(native->pending_notify, xml);
if (native->process_notify) {
crm_info("notify trigger set.");
mainloop_set_trigger(native->process_notify);
}
xml = NULL;
} else if (safe_str_neq(msg_type, "reply")) {
/* msg isn't a reply, make some noise */
crm_err("Expected a reply, got %s", msg_type);
free_xml(xml);
xml = NULL;
} else if (reply_id != expected_reply_id) {
crm_err("Got outdated reply, expected id %d got id %d", expected_reply_id, reply_id);
free_xml(xml);
xml = NULL;
}
}
if (native->remote->buffer && native->process_notify) {
mainloop_set_trigger(native->process_notify);
}
return xml;
}
static int
lrmd_tls_send(lrmd_t *lrmd, xmlNode *msg)
{
int rc = 0;
lrmd_private_t *native = lrmd->private;
global_remote_msg_id++;
if (global_remote_msg_id <= 0) {
global_remote_msg_id = 1;
}
rc = lrmd_tls_send_msg(native->remote, msg, global_remote_msg_id, "request");
if (rc <= 0) {
crm_err("Remote lrmd send failed, disconnecting");
lrmd_tls_disconnect(lrmd);
return -ENOTCONN;
}
return pcmk_ok;
}
static int
lrmd_tls_send_recv(lrmd_t *lrmd, xmlNode *msg, int timeout, xmlNode **reply)
{
int rc = 0;
int disconnected = 0;
xmlNode *xml = NULL;
if (lrmd_tls_connected(lrmd) == FALSE) {
return -1;
}
rc = lrmd_tls_send(lrmd, msg);
if (rc < 0) {
return rc;
}
xml = lrmd_tls_recv_reply(lrmd, timeout, global_remote_msg_id, &disconnected);
if (disconnected) {
crm_err("Remote lrmd server disconnected while waiting for reply with id %d. ", global_remote_msg_id);
lrmd_tls_disconnect(lrmd);
rc = -ENOTCONN;
} else if (!xml) {
crm_err("Remote lrmd never received reply for request id %d. timeout: %dms ", global_remote_msg_id, timeout);
rc = -ECOMM;
}
if (reply) {
*reply = xml;
} else {
free_xml(xml);
}
return rc;
}
#endif
static int
lrmd_send_xml(lrmd_t *lrmd, xmlNode *msg, int timeout, xmlNode **reply)
{
int rc = -1;
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
rc = crm_ipc_send(native->ipc, msg, crm_ipc_client_response, timeout, reply);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_send_recv(lrmd, msg, timeout, reply);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return rc;
}
static int
lrmd_send_xml_no_reply(lrmd_t *lrmd, xmlNode *msg)
{
int rc = -1;
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
rc = crm_ipc_send(native->ipc, msg, crm_ipc_client_none, 0, NULL);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_send(lrmd, msg);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return rc;
}
static int
lrmd_api_is_connected(lrmd_t *lrmd)
{
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
return crm_ipc_connected(native->ipc);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
return lrmd_tls_connected(lrmd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return 0;
}
static int
lrmd_send_command(lrmd_t * lrmd,
const char *op,
xmlNode * data,
xmlNode ** output_data,
int timeout, /* ms. defaults to 1000 if set to 0 */
enum lrmd_call_options options,
gboolean expect_reply) /* TODO we need to reduce usage of this boolean */
{
int rc = pcmk_ok;
int reply_id = -1;
lrmd_private_t *native = lrmd->private;
xmlNode *op_msg = NULL;
xmlNode *op_reply = NULL;
if (!lrmd_api_is_connected(lrmd)) {
return -ENOTCONN;
}
if (op == NULL) {
crm_err("No operation specified");
return -EINVAL;
}
CRM_CHECK(native->token != NULL,;);
crm_trace("sending %s op to lrmd", op);
op_msg = lrmd_create_op(native->token, op, data, options);
if (op_msg == NULL) {
return -EINVAL;
}
crm_xml_add_int(op_msg, F_LRMD_TIMEOUT, timeout);
if (expect_reply) {
rc = lrmd_send_xml(lrmd, op_msg, timeout, &op_reply);
} else {
rc = lrmd_send_xml_no_reply(lrmd, op_msg);
goto done;
}
if (rc < 0) {
crm_perror(LOG_ERR, "Couldn't perform %s operation (timeout=%d): %d", op, timeout, rc);
rc = -ECOMM;
goto done;
}
rc = pcmk_ok;
crm_element_value_int(op_reply, F_LRMD_CALLID, &reply_id);
crm_trace("%s op reply received", op);
if (crm_element_value_int(op_reply, F_LRMD_RC, &rc) != 0) {
rc = -ENOMSG;
goto done;
}
crm_log_xml_trace(op_reply, "Reply");
if (output_data) {
*output_data = op_reply;
op_reply = NULL; /* Prevent subsequent free */
}
done:
if (lrmd_api_is_connected(lrmd) == FALSE) {
crm_err("LRMD disconnected");
}
free_xml(op_msg);
free_xml(op_reply);
return rc;
}
static int
lrmd_api_poke_connection(lrmd_t *lrmd)
{
int rc;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
rc = lrmd_send_command(lrmd, LRMD_OP_POKE, data, NULL, 0, 0, FALSE);
free_xml(data);
return rc;
}
static int
lrmd_handshake(lrmd_t *lrmd, const char *name)
{
int rc = pcmk_ok;
lrmd_private_t *native = lrmd->private;
xmlNode *reply = NULL;
xmlNode *hello = create_xml_node(NULL, "lrmd_command");
crm_xml_add(hello, F_TYPE, T_LRMD);
crm_xml_add(hello, F_LRMD_OPERATION, CRM_OP_REGISTER);
crm_xml_add(hello, F_LRMD_CLIENTNAME, name);
rc = lrmd_send_xml(lrmd, hello, -1, &reply);
if (rc < 0) {
crm_perror(LOG_DEBUG, "Couldn't complete registration with the lrmd API: %d", rc);
rc = -ECOMM;
} else if (reply == NULL) {
crm_err("Did not receive registration reply");
rc = -EPROTO;
} else {
const char *msg_type = crm_element_value(reply, F_LRMD_OPERATION);
const char *tmp_ticket = crm_element_value(reply, F_LRMD_CLIENTID);
if (safe_str_neq(msg_type, CRM_OP_REGISTER)) {
crm_err("Invalid registration message: %s", msg_type);
crm_log_xml_err(reply, "Bad reply");
rc = -EPROTO;
} else if (tmp_ticket == NULL) {
crm_err("No registration token provided");
crm_log_xml_err(reply, "Bad reply");
rc = -EPROTO;
} else {
crm_trace("Obtained registration token: %s", tmp_ticket);
native->token = strdup(tmp_ticket);
rc = pcmk_ok;
}
}
free_xml(reply);
free_xml(hello);
if (rc != pcmk_ok) {
lrmd_api_disconnect(lrmd);
}
return rc;
}
static int
lrmd_ipc_connect(lrmd_t * lrmd, int *fd)
{
int rc = pcmk_ok;
lrmd_private_t *native = lrmd->private;
static struct ipc_client_callbacks lrmd_callbacks = {
.dispatch = lrmd_ipc_dispatch,
.destroy = lrmd_ipc_connection_destroy
};
crm_info("Connecting to lrmd");
if (fd) {
/* No mainloop */
native->ipc = crm_ipc_new("lrmd", 0);
if (native->ipc && crm_ipc_connect(native->ipc)) {
*fd = crm_ipc_get_fd(native->ipc);
} else if (native->ipc) {
rc = -ENOTCONN;
}
} else {
native->source = mainloop_add_ipc_client("lrmd", G_PRIORITY_HIGH, 0, lrmd, &lrmd_callbacks);
native->ipc = mainloop_get_ipc_client(native->source);
}
if (native->ipc == NULL) {
crm_debug("Could not connect to the LRMD API");
rc = -ENOTCONN;
}
return rc;
}
#ifdef HAVE_GNUTLS_GNUTLS_H
int lrmd_tls_set_key(gnutls_datum_t *key, const char *location)
{
FILE *stream;
int read_len = 256;
int cur_len = 0;
int buf_len = read_len;
static char *key_cache = NULL;
static size_t key_cache_len = 0;
static time_t key_cache_updated;
if (key_cache) {
time_t now = time(NULL);
if ((now - key_cache_updated) < 60) {
key->data = gnutls_malloc(key_cache_len + 1);
key->size = key_cache_len;
memcpy(key->data, key_cache, key_cache_len);
crm_debug("using cached LRMD key");
return 0;
} else {
key_cache_len = 0;
key_cache_updated = 0;
free(key_cache);
key_cache = NULL;
crm_debug("clearing lrmd key cache");
}
}
stream = fopen(location, "r");
if (!stream) {
return -1;
}
key->data = gnutls_malloc(read_len);
while (!feof(stream)) {
char next;
if (cur_len == buf_len) {
buf_len = cur_len + read_len;
key->data = gnutls_realloc(key->data, buf_len);
}
next = fgetc(stream);
if (next == EOF && feof(stream)) {
break;
}
key->data[cur_len] = next;
cur_len++;
}
fclose(stream);
key->size = cur_len;
if (!cur_len) {
gnutls_free(key->data);
key->data = 0;
return -1;
}
if (!key_cache) {
key_cache = calloc(1, key->size+1);
memcpy(key_cache, key->data, key->size);
key_cache_len = key->size;
key_cache_updated = time(NULL);
}
return 0;
}
static int
lrmd_tls_key_cb(gnutls_session_t session, char **username, gnutls_datum_t *key)
{
int rc = 0;
if (lrmd_tls_set_key(key, DEFAULT_REMOTE_KEY_LOCATION)) {
rc = lrmd_tls_set_key(key, ALT_REMOTE_KEY_LOCATION);
}
if (rc) {
crm_err("No lrmd remote key found");
return -1;
}
*username = gnutls_malloc(strlen(DEFAULT_REMOTE_USERNAME) + 1);
strcpy(*username, DEFAULT_REMOTE_USERNAME);
return rc;
}
static void
lrmd_gnutls_global_init(void)
{
static int gnutls_init = 0;
if (!gnutls_init) {
gnutls_global_init();
}
gnutls_init = 1;
}
#endif
static void
report_async_connection_result(lrmd_t *lrmd, int rc)
{
lrmd_private_t *native = lrmd->private;
if (native->callback) {
lrmd_event_data_t event = { 0, };
event.type = lrmd_event_connect;
event.remote_nodename = native->remote_nodename;
event.connection_rc = rc;
native->callback(&event);
}
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_tcp_connect_cb(void *userdata, int sock)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
char name[256] = { 0, };
static struct mainloop_fd_callbacks lrmd_tls_callbacks =
{
.dispatch = lrmd_tls_dispatch,
.destroy = lrmd_tls_connection_destroy,
};
int rc = sock;
if (rc < 0) {
lrmd_tls_connection_destroy(lrmd);
crm_info("remote lrmd connect to %s at port %d failed", native->server, native->port);
report_async_connection_result(lrmd, rc);
return;
}
/* TODO continue with tls stuff now that tcp connect passed. make this async as well soon
* to avoid all blocking code in the client. */
native->sock = sock;
gnutls_psk_allocate_client_credentials(&native->psk_cred_c);
gnutls_psk_set_client_credentials_function(native->psk_cred_c, lrmd_tls_key_cb);
native->remote->tls_session = create_psk_tls_session(sock, GNUTLS_CLIENT, native->psk_cred_c);
if (crm_initiate_client_tls_handshake(native->remote, LRMD_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
crm_warn("Client tls handshake failed for server %s:%d. Disconnecting", native->server, native->port);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
native->remote->tls_session = NULL;
lrmd_tls_connection_destroy(lrmd);
report_async_connection_result(lrmd, -1);
return;
}
crm_info("Remote lrmd client TLS connection established with server %s:%d", native->server, native->port);
snprintf(name, 128, "remote-lrmd-%s:%d", native->server, native->port);
native->process_notify = mainloop_add_trigger(G_PRIORITY_HIGH, lrmd_tls_dispatch, lrmd);
native->source = mainloop_add_fd(name, G_PRIORITY_HIGH, native->sock, lrmd, &lrmd_tls_callbacks);
rc = lrmd_handshake(lrmd, name);
report_async_connection_result(lrmd, rc);
return;
}
static int
lrmd_tls_connect_async(lrmd_t *lrmd, int timeout /*ms*/)
{
int rc = 0;
lrmd_private_t *native = lrmd->private;
lrmd_gnutls_global_init();
rc = crm_remote_tcp_connect_async(native->server, native->port, timeout, lrmd, lrmd_tcp_connect_cb);
return rc;
}
static int
lrmd_tls_connect(lrmd_t *lrmd, int *fd)
{
static struct mainloop_fd_callbacks lrmd_tls_callbacks =
{
.dispatch = lrmd_tls_dispatch,
.destroy = lrmd_tls_connection_destroy,
};
lrmd_private_t *native = lrmd->private;
int sock;
lrmd_gnutls_global_init();
sock = crm_remote_tcp_connect(native->server, native->port);
if (sock <= 0) {
crm_warn("Could not establish remote lrmd connection to %s", native->server);
lrmd_tls_connection_destroy(lrmd);
return -ENOTCONN;
}
native->sock = sock;
gnutls_psk_allocate_client_credentials(&native->psk_cred_c);
gnutls_psk_set_client_credentials_function(native->psk_cred_c, lrmd_tls_key_cb);
native->remote->tls_session = create_psk_tls_session(sock, GNUTLS_CLIENT, native->psk_cred_c);
if (crm_initiate_client_tls_handshake(native->remote, LRMD_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
crm_err("Session creation for %s:%d failed", native->server, native->port);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
native->remote->tls_session = NULL;
lrmd_tls_connection_destroy(lrmd);
return -1;
}
crm_info("Remote lrmd client TLS connection established with server %s:%d", native->server, native->port);
if (fd) {
*fd = sock;
} else {
char name[256] = { 0, };
snprintf(name, 128, "remote-lrmd-%s:%d", native->server, native->port);
native->process_notify = mainloop_add_trigger(G_PRIORITY_HIGH, lrmd_tls_dispatch, lrmd);
native->source = mainloop_add_fd(name, G_PRIORITY_HIGH, native->sock, lrmd, &lrmd_tls_callbacks);
}
return pcmk_ok;
}
#endif
static int
lrmd_api_connect(lrmd_t * lrmd, const char *name, int *fd)
{
int rc = -ENOTCONN;
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
rc = lrmd_ipc_connect(lrmd, fd);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_connect(lrmd, fd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
if (rc == pcmk_ok) {
rc = lrmd_handshake(lrmd, name);
}
return rc;
}
static int
lrmd_api_connect_async(lrmd_t * lrmd, const char *name, int timeout)
{
int rc = 0;
lrmd_private_t *native = lrmd->private;
if (!native->callback) {
crm_err("Async connect not possible, no lrmd client callback set.");
return -1;
}
switch (native->type) {
case CRM_CLIENT_IPC:
/* fake async connection with ipc. it should be fast
* enough that we gain very little from async */
rc = lrmd_api_connect(lrmd, name, NULL);
if (!rc) {
report_async_connection_result(lrmd, rc);
}
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_connect_async(lrmd, timeout);
if (rc) {
/* connection failed, report rc now */
report_async_connection_result(lrmd, rc);
}
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return rc;
}
static void
lrmd_ipc_disconnect(lrmd_t *lrmd)
{
lrmd_private_t *native = lrmd->private;
if (native->source != NULL) {
/* Attached to mainloop */
mainloop_del_ipc_client(native->source);
native->source = NULL;
native->ipc = NULL;
} else if(native->ipc) {
/* Not attached to mainloop */
crm_ipc_t *ipc = native->ipc;
native->ipc = NULL;
crm_ipc_close(ipc);
crm_ipc_destroy(ipc);
}
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_tls_disconnect(lrmd_t *lrmd)
{
lrmd_private_t *native = lrmd->private;
if (native->remote->tls_session) {
gnutls_bye(*native->remote->tls_session, GNUTLS_SHUT_RDWR);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
native->remote->tls_session = 0;
}
if (native->source != NULL) {
/* Attached to mainloop */
mainloop_del_ipc_client(native->source);
native->source = NULL;
} else if(native->sock) {
close(native->sock);
}
if (native->pending_notify) {
g_list_free_full(native->pending_notify, lrmd_free_xml);
native->pending_notify = NULL;
}
}
#endif
static int
lrmd_api_disconnect(lrmd_t *lrmd)
{
lrmd_private_t *native = lrmd->private;
crm_info("Disconnecting from lrmd service");
switch (native->type) {
case CRM_CLIENT_IPC:
lrmd_ipc_disconnect(lrmd);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
lrmd_tls_disconnect(lrmd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
free(native->token);
native->token = NULL;
return 0;
}
static int
lrmd_api_register_rsc(lrmd_t * lrmd,
const char *rsc_id,
const char *class,
const char *provider, const char *type, enum lrmd_call_options options)
{
int rc = pcmk_ok;
xmlNode *data = NULL;
if (!class || !type || !rsc_id) {
return -EINVAL;
}
if (safe_str_eq(class, "ocf") && !provider) {
return -EINVAL;
}
data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
crm_xml_add(data, F_LRMD_CLASS, class);
crm_xml_add(data, F_LRMD_PROVIDER, provider);
crm_xml_add(data, F_LRMD_TYPE, type);
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_REG, data, NULL, 0, options, TRUE);
free_xml(data);
return rc;
}
static int
lrmd_api_unregister_rsc(lrmd_t * lrmd, const char *rsc_id, enum lrmd_call_options options)
{
int rc = pcmk_ok;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_UNREG, data, NULL, 0, options, TRUE);
free_xml(data);
return rc;
}
lrmd_rsc_info_t *
lrmd_copy_rsc_info(lrmd_rsc_info_t * rsc_info)
{
lrmd_rsc_info_t *copy = NULL;
copy = calloc(1, sizeof(lrmd_rsc_info_t));
copy->id = strdup(rsc_info->id);
copy->type = strdup(rsc_info->type);
copy->class = strdup(rsc_info->class);
if (rsc_info->provider) {
copy->provider = strdup(rsc_info->provider);
}
return copy;
}
void
lrmd_free_rsc_info(lrmd_rsc_info_t * rsc_info)
{
if (!rsc_info) {
return;
}
free(rsc_info->id);
free(rsc_info->type);
free(rsc_info->class);
free(rsc_info->provider);
free(rsc_info);
}
static lrmd_rsc_info_t *
lrmd_api_get_rsc_info(lrmd_t * lrmd, const char *rsc_id, enum lrmd_call_options options)
{
lrmd_rsc_info_t *rsc_info = NULL;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
xmlNode *output = NULL;
const char *class = NULL;
const char *provider = NULL;
const char *type = NULL;
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
lrmd_send_command(lrmd, LRMD_OP_RSC_INFO, data, &output, 0, options, TRUE);
free_xml(data);
if (!output) {
return NULL;
}
class = crm_element_value(output, F_LRMD_CLASS);
provider = crm_element_value(output, F_LRMD_PROVIDER);
type = crm_element_value(output, F_LRMD_TYPE);
if (!class || !type) {
free_xml(output);
return NULL;
} else if (safe_str_eq(class, "ocf") && !provider) {
free_xml(output);
return NULL;
}
rsc_info = calloc(1, sizeof(lrmd_rsc_info_t));
rsc_info->id = strdup(rsc_id);
rsc_info->class = strdup(class);
if (provider) {
rsc_info->provider = strdup(provider);
}
rsc_info->type = strdup(type);
free_xml(output);
return rsc_info;
}
static void
lrmd_api_set_callback(lrmd_t * lrmd, lrmd_event_callback callback)
{
lrmd_private_t *native = lrmd->private;
native->callback = callback;
}
static int
stonith_get_metadata(const char *provider, const char *type, char **output)
{
int rc = pcmk_ok;
stonith_api->cmds->metadata(stonith_api, st_opt_sync_call, type, provider, output, 0);
if (*output == NULL) {
rc = -EIO;
}
return rc;
}
static int
lsb_get_metadata(const char *type, char **output)
{
#define lsb_metadata_template \
"<?xml version=\"1.0\"?>\n"\
"<!DOCTYPE resource-agent SYSTEM \"ra-api-1.dtd\">\n"\
"<resource-agent name=\"%s\" version=\"0.1\">\n"\
" <version>1.0</version>\n"\
" <longdesc lang=\"en\">\n"\
" %s"\
" </longdesc>\n"\
" <shortdesc lang=\"en\">%s</shortdesc>\n"\
" <parameters>\n"\
" </parameters>\n"\
" <actions>\n"\
" <action name=\"start\" timeout=\"15\" />\n"\
" <action name=\"stop\" timeout=\"15\" />\n"\
" <action name=\"status\" timeout=\"15\" />\n"\
" <action name=\"restart\" timeout=\"15\" />\n"\
" <action name=\"force-reload\" timeout=\"15\" />\n"\
" <action name=\"monitor\" timeout=\"15\" interval=\"15\" />\n"\
" <action name=\"meta-data\" timeout=\"5\" />\n"\
" </actions>\n"\
" <special tag=\"LSB\">\n"\
" <Provides>%s</Provides>\n"\
" <Required-Start>%s</Required-Start>\n"\
" <Required-Stop>%s</Required-Stop>\n"\
" <Should-Start>%s</Should-Start>\n"\
" <Should-Stop>%s</Should-Stop>\n"\
" <Default-Start>%s</Default-Start>\n"\
" <Default-Stop>%s</Default-Stop>\n"\
" </special>\n"\
"</resource-agent>\n"
#define LSB_INITSCRIPT_INFOBEGIN_TAG "### BEGIN INIT INFO"
#define LSB_INITSCRIPT_INFOEND_TAG "### END INIT INFO"
#define PROVIDES "# Provides:"
#define REQ_START "# Required-Start:"
#define REQ_STOP "# Required-Stop:"
#define SHLD_START "# Should-Start:"
#define SHLD_STOP "# Should-Stop:"
#define DFLT_START "# Default-Start:"
#define DFLT_STOP "# Default-Stop:"
#define SHORT_DSCR "# Short-Description:"
#define DESCRIPTION "# Description:"
#define lsb_meta_helper_free_value(m) \
if ((m) != NULL) { \
xmlFree(m); \
(m) = NULL; \
}
#define lsb_meta_helper_get_value(buffer, ptr, keyword) \
if (!ptr && !strncasecmp(buffer, keyword, strlen(keyword))) { \
(ptr) = (char *)xmlEncodeEntitiesReentrant(NULL, BAD_CAST buffer+strlen(keyword)); \
continue; \
}
char ra_pathname[PATH_MAX] = { 0, };
FILE *fp;
GString *meta_data = NULL;
char buffer[1024];
char *provides = NULL;
char *req_start = NULL;
char *req_stop = NULL;
char *shld_start = NULL;
char *shld_stop = NULL;
char *dflt_start = NULL;
char *dflt_stop = NULL;
char *s_dscrpt = NULL;
char *xml_l_dscrpt = NULL;
GString *l_dscrpt = NULL;
snprintf(ra_pathname, sizeof(ra_pathname), "%s%s%s",
type[0] == '/' ? "" : LSB_ROOT_DIR, type[0] == '/' ? "" : "/", type);
if (!(fp = fopen(ra_pathname, "r"))) {
return -EIO;
}
/* Enter into the lsb-compliant comment block */
while (fgets(buffer, sizeof(buffer), fp)) {
/* Now suppose each of the following eight arguments contain only one line */
lsb_meta_helper_get_value(buffer, provides, PROVIDES)
lsb_meta_helper_get_value(buffer, req_start, REQ_START)
lsb_meta_helper_get_value(buffer, req_stop, REQ_STOP)
lsb_meta_helper_get_value(buffer, shld_start, SHLD_START)
lsb_meta_helper_get_value(buffer, shld_stop, SHLD_STOP)
lsb_meta_helper_get_value(buffer, dflt_start, DFLT_START)
lsb_meta_helper_get_value(buffer, dflt_stop, DFLT_STOP)
lsb_meta_helper_get_value(buffer, s_dscrpt, SHORT_DSCR)
/* Long description may cross multiple lines */
if ((l_dscrpt == NULL) && (0 == strncasecmp(buffer, DESCRIPTION, strlen(DESCRIPTION)))) {
l_dscrpt = g_string_new(buffer + strlen(DESCRIPTION));
/* Between # and keyword, more than one space, or a tab character,
* indicates the continuation line. Extracted from LSB init script standard */
while (fgets(buffer, sizeof(buffer), fp)) {
if (!strncmp(buffer, "# ", 3) || !strncmp(buffer, "#\t", 2)) {
buffer[0] = ' ';
l_dscrpt = g_string_append(l_dscrpt, buffer);
} else {
fputs(buffer, fp);
break; /* Long description ends */
}
}
continue;
}
if (l_dscrpt) {
xml_l_dscrpt = (char *)xmlEncodeEntitiesReentrant(NULL, BAD_CAST(l_dscrpt->str));
}
if (!strncasecmp(buffer, LSB_INITSCRIPT_INFOEND_TAG, strlen(LSB_INITSCRIPT_INFOEND_TAG))) {
/* Get to the out border of LSB comment block */
break;
}
if (buffer[0] != '#') {
break; /* Out of comment block in the beginning */
}
}
fclose(fp);
meta_data = g_string_new("");
g_string_sprintf(meta_data, lsb_metadata_template, type,
(xml_l_dscrpt == NULL) ? type : xml_l_dscrpt,
(s_dscrpt == NULL) ? type : s_dscrpt, (provides == NULL) ? "" : provides,
(req_start == NULL) ? "" : req_start, (req_stop == NULL) ? "" : req_stop,
(shld_start == NULL) ? "" : shld_start, (shld_stop == NULL) ? "" : shld_stop,
(dflt_start == NULL) ? "" : dflt_start, (dflt_stop == NULL) ? "" : dflt_stop);
lsb_meta_helper_free_value(xml_l_dscrpt);
lsb_meta_helper_free_value(s_dscrpt);
lsb_meta_helper_free_value(provides);
lsb_meta_helper_free_value(req_start);
lsb_meta_helper_free_value(req_stop);
lsb_meta_helper_free_value(shld_start);
lsb_meta_helper_free_value(shld_stop);
lsb_meta_helper_free_value(dflt_start);
lsb_meta_helper_free_value(dflt_stop);
if (l_dscrpt) {
g_string_free(l_dscrpt, TRUE);
}
*output = strdup(meta_data->str);
g_string_free(meta_data, TRUE);
return pcmk_ok;
}
#if SUPPORT_NAGIOS
static int
nagios_get_metadata(const char *type, char **output)
{
int rc = pcmk_ok;
FILE *file_strm = NULL;
int start = 0, length = 0, read_len = 0;
char *metadata_file = NULL;
int len = 36;
len += strlen(NAGIOS_METADATA_DIR);
len += strlen(type);
metadata_file = calloc(1, len);
CRM_CHECK(metadata_file != NULL, return -ENOMEM);
sprintf(metadata_file, "%s/%s.xml", NAGIOS_METADATA_DIR, type);
file_strm = fopen(metadata_file, "r");
if (file_strm == NULL) {
crm_err("Metadata file %s does not exist", metadata_file);
free(metadata_file);
return -EIO;
}
/* see how big the file is */
start = ftell(file_strm);
fseek(file_strm, 0L, SEEK_END);
length = ftell(file_strm);
fseek(file_strm, 0L, start);
CRM_ASSERT(length >= 0);
CRM_ASSERT(start == ftell(file_strm));
if (length <= 0) {
crm_info("%s was not valid", metadata_file);
free(*output);
*output = NULL;
rc = -EIO;
} else {
crm_trace("Reading %d bytes from file", length);
*output = calloc(1, (length + 1));
read_len = fread(*output, 1, length, file_strm);
if (read_len != length) {
crm_err("Calculated and read bytes differ: %d vs. %d", length, read_len);
free(*output);
*output = NULL;
rc = -EIO;
}
}
fclose(file_strm);
free(metadata_file);
return rc;
}
#endif
static int
generic_get_metadata(const char *standard, const char *provider, const char *type, char **output)
{
svc_action_t *action = resources_action_create(type,
standard,
provider,
type,
"meta-data",
0,
5000,
NULL);
if (!(services_action_sync(action))) {
crm_err("Failed to retrieve meta-data for %s:%s:%s", standard, provider, type);
services_action_free(action);
return -EIO;
}
if (!action->stdout_data) {
crm_err("Failed to retrieve meta-data for %s:%s:%s", standard, provider, type);
services_action_free(action);
return -EIO;
}
*output = strdup(action->stdout_data);
services_action_free(action);
return pcmk_ok;
}
static int
lrmd_api_get_metadata(lrmd_t * lrmd,
const char *class,
const char *provider,
const char *type, char **output, enum lrmd_call_options options)
{
if (!class || !type) {
return -EINVAL;
}
if (safe_str_eq(class, "stonith")) {
return stonith_get_metadata(provider, type, output);
} else if (safe_str_eq(class, "lsb")) {
return lsb_get_metadata(type, output);
#if SUPPORT_NAGIOS
} else if (safe_str_eq(class, "nagios")) {
return nagios_get_metadata(type, output);
#endif
}
return generic_get_metadata(class, provider, type, output);
}
static int
lrmd_api_exec(lrmd_t * lrmd, const char *rsc_id, const char *action, const char *userdata, int interval, /* ms */
int timeout, /* ms */
int start_delay, /* ms */
enum lrmd_call_options options, lrmd_key_value_t * params)
{
int rc = pcmk_ok;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
xmlNode *args = create_xml_node(data, XML_TAG_ATTRS);
lrmd_key_value_t *tmp = NULL;
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
crm_xml_add(data, F_LRMD_RSC_ACTION, action);
crm_xml_add(data, F_LRMD_RSC_USERDATA_STR, userdata);
crm_xml_add_int(data, F_LRMD_RSC_INTERVAL, interval);
crm_xml_add_int(data, F_LRMD_TIMEOUT, timeout);
crm_xml_add_int(data, F_LRMD_RSC_START_DELAY, start_delay);
for (tmp = params; tmp; tmp = tmp->next) {
hash2field((gpointer) tmp->key, (gpointer) tmp->value, args);
}
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_EXEC, data, NULL, timeout, options, TRUE);
free_xml(data);
lrmd_key_value_freeall(params);
return rc;
}
static int
lrmd_api_cancel(lrmd_t * lrmd, const char *rsc_id, const char *action, int interval)
{
int rc = pcmk_ok;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ACTION, action);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
crm_xml_add_int(data, F_LRMD_RSC_INTERVAL, interval);
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_CANCEL, data, NULL, 0, 0, TRUE);
free_xml(data);
return rc;
}
static int
list_stonith_agents(lrmd_list_t ** resources)
{
int rc = 0;
stonith_key_value_t *stonith_resources = NULL;
stonith_key_value_t *dIter = NULL;
stonith_api->cmds->list_agents(stonith_api, st_opt_sync_call, NULL, &stonith_resources, 0);
for (dIter = stonith_resources; dIter; dIter = dIter->next) {
rc++;
if(resources) {
*resources = lrmd_list_add(*resources, dIter->value);
}
}
stonith_key_value_freeall(stonith_resources, 1, 0);
return rc;
}
static int
lrmd_api_list_agents(lrmd_t * lrmd, lrmd_list_t ** resources, const char *class,
const char *provider)
{
int rc = 0;
if (safe_str_eq(class, "stonith")) {
rc += list_stonith_agents(resources);
} else {
GListPtr gIter = NULL;
GList *agents = resources_list_agents(class, provider);
for (gIter = agents; gIter != NULL; gIter = gIter->next) {
*resources = lrmd_list_add(*resources, (const char *)gIter->data);
rc++;
}
g_list_free_full(agents, free);
if (!class) {
rc += list_stonith_agents(resources);
}
}
if(rc == 0) {
crm_notice("No agents found for class %s", class);
rc = -EPROTONOSUPPORT;
}
return rc;
}
static int
does_provider_have_agent(const char *agent, const char *provider, const char *class)
{
int found = 0;
GList *agents = NULL;
GListPtr gIter2 = NULL;
agents = resources_list_agents(class, provider);
for (gIter2 = agents; gIter2 != NULL; gIter2 = gIter2->next) {
if (safe_str_eq(agent, gIter2->data)) {
found = 1;
}
}
g_list_free_full(agents, free);
return found;
}
static int
lrmd_api_list_ocf_providers(lrmd_t * lrmd, const char *agent, lrmd_list_t ** providers)
{
int rc = pcmk_ok;
char *provider = NULL;
GList *ocf_providers = NULL;
GListPtr gIter = NULL;
ocf_providers = resources_list_providers("ocf");
for (gIter = ocf_providers; gIter != NULL; gIter = gIter->next) {
provider = gIter->data;
if (!agent || does_provider_have_agent(agent, provider, "ocf")) {
*providers = lrmd_list_add(*providers, (const char *)gIter->data);
rc++;
}
}
g_list_free_full(ocf_providers, free);
return rc;
}
static int
lrmd_api_list_standards(lrmd_t * lrmd, lrmd_list_t ** supported)
{
int rc = 0;
GList *standards = NULL;
GListPtr gIter = NULL;
standards = resources_list_standards();
for (gIter = standards; gIter != NULL; gIter = gIter->next) {
*supported = lrmd_list_add(*supported, (const char *)gIter->data);
rc++;
}
if(list_stonith_agents(NULL) > 0) {
*supported = lrmd_list_add(*supported, "stonith");
rc++;
}
g_list_free_full(standards, free);
return rc;
}
lrmd_t *
lrmd_api_new(void)
{
lrmd_t *new_lrmd = NULL;
lrmd_private_t *pvt = NULL;
new_lrmd = calloc(1, sizeof(lrmd_t));
pvt = calloc(1, sizeof(lrmd_private_t));
pvt->remote = calloc(1, sizeof(crm_remote_t));
new_lrmd->cmds = calloc(1, sizeof(lrmd_api_operations_t));
pvt->type = CRM_CLIENT_IPC;
new_lrmd->private = pvt;
new_lrmd->cmds->connect = lrmd_api_connect;
new_lrmd->cmds->connect_async = lrmd_api_connect_async;
new_lrmd->cmds->is_connected = lrmd_api_is_connected;
new_lrmd->cmds->poke_connection = lrmd_api_poke_connection;
new_lrmd->cmds->disconnect = lrmd_api_disconnect;
new_lrmd->cmds->register_rsc = lrmd_api_register_rsc;
new_lrmd->cmds->unregister_rsc = lrmd_api_unregister_rsc;
new_lrmd->cmds->get_rsc_info = lrmd_api_get_rsc_info;
new_lrmd->cmds->set_callback = lrmd_api_set_callback;
new_lrmd->cmds->get_metadata = lrmd_api_get_metadata;
new_lrmd->cmds->exec = lrmd_api_exec;
new_lrmd->cmds->cancel = lrmd_api_cancel;
new_lrmd->cmds->list_agents = lrmd_api_list_agents;
new_lrmd->cmds->list_ocf_providers = lrmd_api_list_ocf_providers;
new_lrmd->cmds->list_standards = lrmd_api_list_standards;
if (!stonith_api) {
stonith_api = stonith_api_new();
}
return new_lrmd;
}
lrmd_t *
lrmd_remote_api_new(const char *nodename, const char *server, int port)
{
#ifdef HAVE_GNUTLS_GNUTLS_H
lrmd_t *new_lrmd = lrmd_api_new();
lrmd_private_t *native = new_lrmd->private;
if (!nodename && !server) {
return NULL;
}
native->type = CRM_CLIENT_TLS;
native->remote_nodename = nodename ? strdup(nodename) : strdup(server);
native->server = server ? strdup(server) : strdup(nodename);
native->port = port ? port : DEFAULT_REMOTE_PORT;
return new_lrmd;
#else
crm_err("GNUTLS is not enabled for this build, remote LRMD client can not be created");
return NULL;
#endif
}
void
lrmd_api_delete(lrmd_t * lrmd)
{
if (!lrmd) {
return;
}
lrmd->cmds->disconnect(lrmd); /* no-op if already disconnected */
free(lrmd->cmds);
if (lrmd->private) {
lrmd_private_t *native = lrmd->private;
#ifdef HAVE_GNUTLS_GNUTLS_H
free(native->server);
#endif
free(native->remote_nodename);
free(native->remote);
}
free(lrmd->private);
free(lrmd);
}
diff --git a/lrmd/lrmd_private.h b/lrmd/lrmd_private.h
index 2642387ae4..1f94d47fa4 100644
--- a/lrmd/lrmd_private.h
+++ b/lrmd/lrmd_private.h
@@ -1,94 +1,94 @@
/*
* Copyright (c) 2012 David Vossel <dvossel@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
- *
+ *
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
- *
+ *
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#ifndef LRMD_PVT__H
# define LRMD_PVT__H
# include <glib.h>
-# include <crm/common/ipc.h>
+# include <crm/common/ipcs.h>
# include <crm/lrmd.h>
# include <crm/stonith-ng.h>
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include <gnutls/gnutls.h>
#endif
GHashTable *rsc_list;
typedef struct lrmd_rsc_s {
char *rsc_id;
char *class;
char *provider;
char *type;
int call_opts;
/* NEVER dereference this pointer,
* It simply exists as a switch to let us know
* when the currently active operation has completed */
void *active;
/* Operations in this list
* have not been executed yet. */
GList *pending_ops;
/* Operations in this list are recurring operations
* that have been handed off from the pending ops list. */
GList *recurring_ops;
int stonith_started;
crm_trigger_t *work;
} lrmd_rsc_t;
#ifdef HAVE_GNUTLS_GNUTLS_H
/* in remote_tls.c */
int lrmd_init_remote_tls_server(int port);
void lrmd_tls_server_destroy(void);
/* Hidden in lrmd client lib */
extern int lrmd_tls_send_msg(crm_remote_t *session, xmlNode *msg, uint32_t id, const char *msg_type);
extern int lrmd_tls_set_key(gnutls_datum_t *key, const char *location);
#endif
int lrmd_server_send_reply(crm_client_t *client, uint32_t id, xmlNode *reply);
int lrmd_server_send_notify(crm_client_t *client, xmlNode *msg);
void process_lrmd_message(crm_client_t * client, uint32_t id, xmlNode * request);
void free_rsc(gpointer data);
void lrmd_shutdown(int nsig);
void client_disconnect_cleanup(const char *client_id);
/*!
- * \brief Don't worry about freeing this connection. It is
+ * \brief Don't worry about freeing this connection. It is
* taken care of after mainloop exits by the main() function.
*/
stonith_t *get_stonith_connection(void);
/*!
* \brief This is a callback that tells the lrmd
* the current stonith connection has gone away. This allows
* us to timeout any pending stonith commands
*/
void stonith_connection_failed(void);
#endif
diff --git a/mcp/pacemaker.c b/mcp/pacemaker.c
index 1a1465a3e1..975321a54e 100644
--- a/mcp/pacemaker.c
+++ b/mcp/pacemaker.c
@@ -1,1005 +1,1005 @@
-/*
+/*
* Copyright (C) 2010 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <pacemaker.h>
#include <pwd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <crm/msg_xml.h>
-#include <crm/common/ipc.h>
+#include <crm/common/ipcs.h>
#include <crm/common/mainloop.h>
#include <crm/cluster.h>
#include <dirent.h>
#include <ctype.h>
gboolean fatal_error = FALSE;
GMainLoop *mainloop = NULL;
GHashTable *peers = NULL;
#define PCMK_PROCESS_CHECK_INTERVAL 5
char *local_name = NULL;
uint32_t local_nodeid = 0;
crm_trigger_t *shutdown_trigger = NULL;
const char *pid_file = "/var/run/pacemaker.pid";
/* *INDENT-OFF* */
enum crm_proc_flag {
crm_proc_none = 0x00000001,
crm_proc_plugin = 0x00000002,
crm_proc_lrmd = 0x00000010,
crm_proc_cib = 0x00000100,
crm_proc_crmd = 0x00000200,
crm_proc_attrd = 0x00001000,
crm_proc_stonithd = 0x00002000,
crm_proc_pe = 0x00010000,
crm_proc_te = 0x00020000,
crm_proc_mgmtd = 0x00040000,
crm_proc_stonith_ng = 0x00100000,
};
/* *INDENT-ON* */
typedef struct pcmk_child_s {
int pid;
long flag;
int start_seq;
int respawn_count;
gboolean respawn;
const char *name;
const char *uid;
const char *command;
gboolean active_before_startup;
} pcmk_child_t;
/* Index into the array below */
#define pcmk_child_crmd 4
#define pcmk_child_mgmtd 8
/* *INDENT-OFF* */
static pcmk_child_t pcmk_children[] = {
{ 0, crm_proc_none, 0, 0, FALSE, "none", NULL, NULL },
{ 0, crm_proc_plugin, 0, 0, FALSE, "ais", NULL, NULL },
{ 0, crm_proc_lrmd, 3, 0, TRUE, "lrmd", NULL, CRM_DAEMON_DIR"/lrmd" },
{ 0, crm_proc_cib, 1, 0, TRUE, "cib", CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib" },
{ 0, crm_proc_crmd, 6, 0, TRUE, "crmd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd" },
{ 0, crm_proc_attrd, 4, 0, TRUE, "attrd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd" },
{ 0, crm_proc_stonithd, 0, 0, TRUE, "stonithd", NULL, NULL },
{ 0, crm_proc_pe, 5, 0, TRUE, "pengine", CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine" },
{ 0, crm_proc_mgmtd, 0, 0, TRUE, "mgmtd", NULL, HB_DAEMON_DIR"/mgmtd" },
{ 0, crm_proc_stonith_ng, 2, 0, TRUE, "stonith-ng", NULL, CRM_DAEMON_DIR"/stonithd" },
};
/* *INDENT-ON* */
static gboolean start_child(pcmk_child_t * child);
static gboolean check_active_before_startup_processes(gpointer user_data);
void
enable_crmd_as_root(gboolean enable)
{
if (enable) {
pcmk_children[pcmk_child_crmd].uid = NULL;
} else {
pcmk_children[pcmk_child_crmd].uid = CRM_DAEMON_USER;
}
}
void
enable_mgmtd(gboolean enable)
{
if (enable) {
pcmk_children[pcmk_child_mgmtd].start_seq = 7;
} else {
pcmk_children[pcmk_child_mgmtd].start_seq = 0;
}
}
static uint32_t
get_process_list(void)
{
int lpc = 0;
uint32_t procs = crm_proc_plugin;
for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) {
if (pcmk_children[lpc].pid != 0) {
procs |= pcmk_children[lpc].flag;
}
}
return procs;
}
static void
pcmk_process_exit(pcmk_child_t *child)
{
child->pid = 0;
child->active_before_startup = FALSE;
/* Broadcast the fact that one of our processes died ASAP
- *
+ *
* Try to get some logging of the cause out first though
* because we're probably about to get fenced
*
* Potentially do this only if respawn_count > N
* to allow for local recovery
*/
update_node_processes(local_nodeid, NULL, get_process_list());
child->respawn_count += 1;
if (child->respawn_count > MAX_RESPAWN) {
crm_err("Child respawn count exceeded by %s", child->name);
child->respawn = FALSE;
}
if (shutdown_trigger) {
mainloop_set_trigger(shutdown_trigger);
update_node_processes(local_nodeid, NULL, get_process_list());
} else if (child->respawn) {
crm_notice("Respawning failed child process: %s", child->name);
start_child(child);
}
}
-static void pcmk_child_exit(GPid pid, gint status, gpointer user_data)
+static void pcmk_child_exit(GPid pid, gint status, gpointer user_data)
{
int exitcode = 0;
pcmk_child_t *child = user_data;
if(WIFSIGNALED(status)) {
int signo = WTERMSIG(status);
int core = WCOREDUMP(status);
crm_notice("Child process %s terminated with signal %d (pid=%d, core=%d)",
child->name, signo, child->pid, core);
} else if(WIFEXITED(status)) {
exitcode = WEXITSTATUS(status);
do_crm_log(exitcode == 0 ? LOG_INFO : LOG_ERR,
"Child process %s exited (pid=%d, rc=%d)", child->name, child->pid, exitcode);
}
if (exitcode == 100) {
crm_warn("Pacemaker child process %s no longer wishes to be respawned. "
"Shutting ourselves down.", child->name);
child->respawn = FALSE;
fatal_error = TRUE;
pcmk_shutdown(15);
}
pcmk_process_exit(child);
}
static gboolean
stop_child(pcmk_child_t * child, int signal)
{
if (signal == 0) {
signal = SIGTERM;
}
if (child->command == NULL) {
crm_debug("Nothing to do for child \"%s\"", child->name);
return TRUE;
}
if (child->pid <= 0) {
crm_trace("Client %s not running", child->name);
return TRUE;
}
errno = 0;
if (kill(child->pid, signal) == 0) {
crm_notice("Stopping %s: Sent -%d to process %d", child->name, signal, child->pid);
} else {
crm_perror(LOG_ERR, "Stopping %s: Could not send -%d to process %d failed",
child->name, signal, child->pid);
}
return TRUE;
}
static char *opts_default[] = { NULL, NULL };
static char *opts_vgrind[] = { NULL, NULL, NULL, NULL, NULL };
static gboolean
start_child(pcmk_child_t * child)
{
int lpc = 0;
uid_t uid = 0;
struct rlimit oflimits;
gboolean use_valgrind = FALSE;
gboolean use_callgrind = FALSE;
const char *devnull = "/dev/null";
const char *env_valgrind = getenv("PCMK_valgrind_enabled");
const char *env_callgrind = getenv("PCMK_callgrind_enabled");
child->active_before_startup = FALSE;
if (child->command == NULL) {
crm_info("Nothing to do for child \"%s\"", child->name);
return TRUE;
}
if (env_callgrind != NULL && crm_is_true(env_callgrind)) {
use_callgrind = TRUE;
use_valgrind = TRUE;
} else if (env_callgrind != NULL && strstr(env_callgrind, child->name)) {
use_callgrind = TRUE;
use_valgrind = TRUE;
} else if (env_valgrind != NULL && crm_is_true(env_valgrind)) {
use_valgrind = TRUE;
} else if (env_valgrind != NULL && strstr(env_valgrind, child->name)) {
use_valgrind = TRUE;
}
if (use_valgrind && strlen(VALGRIND_BIN) == 0) {
crm_warn("Cannot enable valgrind for %s:"
" The location of the valgrind binary is unknown", child->name);
use_valgrind = FALSE;
}
child->pid = fork();
CRM_ASSERT(child->pid != -1);
if (child->pid > 0) {
/* parent */
g_child_watch_add(child->pid, pcmk_child_exit, child);
crm_info("Forked child %d for process %s%s", child->pid, child->name,
use_valgrind ? " (valgrind enabled: " VALGRIND_BIN ")" : "");
update_node_processes(local_nodeid, NULL, get_process_list());
return TRUE;
} else {
/* Start a new session */
(void)setsid();
/* Setup the two alternate arg arrarys */
opts_vgrind[0] = strdup(VALGRIND_BIN);
if (use_callgrind) {
opts_vgrind[1] = strdup("--tool=callgrind");
opts_vgrind[2] = strdup("--callgrind-out-file=" CRM_STATE_DIR "/callgrind.out.%p");
opts_vgrind[3] = strdup(child->command);
opts_vgrind[4] = NULL;
} else {
opts_vgrind[1] = strdup(child->command);
opts_vgrind[2] = NULL;
opts_vgrind[3] = NULL;
opts_vgrind[4] = NULL;
}
opts_default[0] = strdup(child->command);;
#if 0
/* Dont set the group for now - it prevents connection to the cluster */
if (gid && setgid(gid) < 0) {
crm_perror("Could not set group to %d", gid);
}
#endif
if (child->uid) {
if (crm_user_lookup(child->uid, &uid, NULL) < 0) {
crm_err("Invalid uid (%s) specified for %s", child->uid, child->name);
return TRUE;
}
}
if (uid && setuid(uid) < 0) {
crm_perror(LOG_ERR, "Could not set user to %d (%s)", uid, child->uid);
}
/* Close all open file descriptors */
getrlimit(RLIMIT_NOFILE, &oflimits);
for (lpc = 0; lpc < oflimits.rlim_cur; lpc++) {
close(lpc);
}
(void)open(devnull, O_RDONLY); /* Stdin: fd 0 */
(void)open(devnull, O_WRONLY); /* Stdout: fd 1 */
(void)open(devnull, O_WRONLY); /* Stderr: fd 2 */
if (use_valgrind) {
(void)execvp(VALGRIND_BIN, opts_vgrind);
} else {
(void)execvp(child->command, opts_default);
}
crm_perror(LOG_ERR, "FATAL: Cannot exec %s", child->command);
crm_exit(100);
}
return TRUE; /* never reached */
}
static gboolean
escalate_shutdown(gpointer data)
{
pcmk_child_t *child = data;
if (child->pid) {
/* Use SIGSEGV instead of SIGKILL to create a core so we can see what it was up to */
crm_err("Child %s not terminating in a timely manner, forcing", child->name);
stop_child(child, SIGSEGV);
}
return FALSE;
}
static gboolean
pcmk_shutdown_worker(gpointer user_data)
{
static int phase = 0;
static time_t next_log = 0;
static int max = SIZEOF(pcmk_children);
int lpc = 0;
if (phase == 0) {
crm_notice("Shuting down Pacemaker");
phase = max;
/* Add a second, more frequent, check to speed up shutdown */
g_timeout_add_seconds(5, check_active_before_startup_processes, NULL);
}
for (; phase > 0; phase--) {
/* dont stop anything with start_seq < 1 */
for (lpc = max - 1; lpc >= 0; lpc--) {
pcmk_child_t *child = &(pcmk_children[lpc]);
if (phase != child->start_seq) {
continue;
}
if (child->pid) {
time_t now = time(NULL);
if (child->respawn) {
next_log = now + 30;
child->respawn = FALSE;
stop_child(child, SIGTERM);
if (phase < pcmk_children[pcmk_child_crmd].start_seq) {
g_timeout_add(180000 /* 3m */ , escalate_shutdown, child);
}
} else if (now >= next_log) {
next_log = now + 30;
crm_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...",
child->name, child->pid, child->start_seq);
}
return TRUE;
}
/* cleanup */
crm_debug("%s confirmed stopped", child->name);
child->pid = 0;
}
}
/* send_cluster_id(); */
crm_notice("Shutdown complete");
g_main_loop_quit(mainloop);
if(fatal_error) {
crm_notice("Attempting to inhibit respawning after fatal error");
crm_exit(100);
}
-
+
return TRUE;
}
void
pcmk_shutdown(int nsig)
{
if (shutdown_trigger == NULL) {
shutdown_trigger = mainloop_add_trigger(G_PRIORITY_HIGH, pcmk_shutdown_worker, NULL);
}
mainloop_set_trigger(shutdown_trigger);
}
static void
build_path(const char *path_c, mode_t mode)
{
int offset = 1, len = 0;
char *path = strdup(path_c);
CRM_CHECK(path != NULL, return);
for (len = strlen(path); offset < len; offset++) {
if (path[offset] == '/') {
path[offset] = 0;
if (mkdir(path, mode) < 0 && errno != EEXIST) {
crm_perror(LOG_ERR, "Could not create directory '%s'", path);
break;
}
path[offset] = '/';
}
}
if (mkdir(path, mode) < 0 && errno != EEXIST) {
crm_perror(LOG_ERR, "Could not create directory '%s'", path);
}
free(path);
}
static int32_t
pcmk_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
crm_trace("Connection %p", c);
if(crm_client_new(c, uid, gid) == NULL) {
return -EIO;
}
return 0;
}
static void
pcmk_ipc_created(qb_ipcs_connection_t *c)
{
crm_trace("Connection %p", c);
}
/* Exit code means? */
static int32_t
pcmk_ipc_dispatch(qb_ipcs_connection_t *qbc, void *data, size_t size)
{
uint32_t id = 0;
uint32_t flags = 0;
const char *task = NULL;
crm_client_t *c = crm_client_get(qbc);
xmlNode *msg = crm_ipcs_recv(c, data, size, &id, &flags);
if(flags & crm_ipc_client_response) {
crm_ipcs_send_ack(c, id, "ack", __FUNCTION__, __LINE__);
}
if (msg == NULL) {
return 0;
}
task = crm_element_value(msg, F_CRM_TASK);
if(crm_str_eq(task, CRM_OP_QUIT, TRUE)) {
/* Time to quit */
crm_notice("Shutting down in responce to ticket %s (%s)",
crm_element_value(msg, F_CRM_REFERENCE),
crm_element_value(msg, F_CRM_ORIGIN));
pcmk_shutdown(15);
} else {
/* Just send to everyone */
update_process_clients();
}
-
+
free_xml(msg);
return 0;
}
/* Error code means? */
static int32_t
-pcmk_ipc_closed(qb_ipcs_connection_t *c)
+pcmk_ipc_closed(qb_ipcs_connection_t *c)
{
crm_client_t *client = crm_client_get(c);
crm_trace("Connection %p", c);
crm_client_destroy(client);
return 0;
}
static void
-pcmk_ipc_destroy(qb_ipcs_connection_t *c)
+pcmk_ipc_destroy(qb_ipcs_connection_t *c)
{
crm_trace("Connection %p", c);
}
-struct qb_ipcs_service_handlers ipc_callbacks =
+struct qb_ipcs_service_handlers ipc_callbacks =
{
.connection_accept = pcmk_ipc_accept,
.connection_created = pcmk_ipc_created,
.msg_process = pcmk_ipc_dispatch,
.connection_closed = pcmk_ipc_closed,
.connection_destroyed = pcmk_ipc_destroy
};
static void
ghash_send_proc_details(gpointer key, gpointer value, gpointer data)
{
crm_ipcs_send(value, 0, data, TRUE);
}
static void
peer_loop_fn(gpointer key, gpointer value, gpointer user_data)
{
pcmk_peer_t *node = value;
xmlNode *update = user_data;
xmlNode *xml = create_xml_node(update, "node");
crm_xml_add_int(xml, "id", node->id);
crm_xml_add(xml, "uname", node->uname);
crm_xml_add_int(xml, "processes", node->processes);
}
void
update_process_clients(void)
{
xmlNode *update = create_xml_node(NULL, "nodes");
crm_trace("Sending process list to %d children", crm_hash_table_size(client_connections));
g_hash_table_foreach(peers, peer_loop_fn, update);
g_hash_table_foreach(client_connections, ghash_send_proc_details, update);
free_xml(update);
}
void
update_process_peers(void)
{
char buffer[1024];
struct iovec iov;
int rc = 0;
memset(buffer, 0, SIZEOF(buffer));
if (local_name) {
rc = snprintf(buffer, SIZEOF(buffer) - 1, "<node uname=\"%s\" proclist=\"%u\"/>",
local_name, get_process_list());
} else {
rc = snprintf(buffer, SIZEOF(buffer) - 1, "<node proclist=\"%u\"/>", get_process_list());
}
iov.iov_base = buffer;
iov.iov_len = rc + 1;
crm_trace("Sending %s", buffer);
send_cpg_message(&iov);
}
gboolean
update_node_processes(uint32_t id, const char *uname, uint32_t procs)
{
gboolean changed = FALSE;
pcmk_peer_t *node = g_hash_table_lookup(peers, GUINT_TO_POINTER(id));
if (node == NULL) {
changed = TRUE;
node = calloc(1, sizeof(pcmk_peer_t));
node->id = id;
g_hash_table_insert(peers, GUINT_TO_POINTER(id), node);
node = g_hash_table_lookup(peers, GUINT_TO_POINTER(id));
CRM_ASSERT(node != NULL);
}
if (uname != NULL) {
if (node->uname == NULL || safe_str_eq(node->uname, uname) == FALSE) {
int lpc, len = strlen(uname);
crm_notice("%p Node %u now known as %s%s%s", node, id, uname,
node->uname?node->uname:", was: ", node->uname?node->uname:"");
free(node->uname);
node->uname = strdup(uname);
changed = TRUE;
for(lpc = 0; lpc < len; lpc++) {
if(uname[lpc] >= 'A' && uname[lpc] <= 'Z') {
crm_warn("Node names with capitals are discouraged, consider changing '%s' to something else", uname);
break;
}
}
}
} else {
crm_trace("Empty uname for node %u", id);
}
if (procs != 0) {
if(procs != node->processes) {
crm_debug("Node %s now has process list: %.32x (was %.32x)",
node->uname, procs, node->processes);
node->processes = procs;
changed = TRUE;
} else {
crm_trace("Node %s still has process list: %.32x", node->uname, procs);
}
}
if (changed && id == local_nodeid) {
update_process_clients();
update_process_peers();
}
return changed;
}
/* *INDENT-OFF* */
static struct crm_option long_options[] = {
/* Top-level Options */
{"help", 0, 0, '?', "\tThis text"},
{"version", 0, 0, '$', "\tVersion information" },
{"verbose", 0, 0, 'V', "\tIncrease debug output"},
{"shutdown", 0, 0, 'S', "\tInstruct Pacemaker to shutdown on this machine"},
{"features", 0, 0, 'F', "\tDisplay the full version and list of features Pacemaker was built with"},
{"-spacer-", 1, 0, '-', "\nAdditional Options:"},
{"foreground", 0, 0, 'f', "\tRun in the foreground instead of as a daemon"},
{"pid-file", 1, 0, 'p', "\t(Advanced) Daemon pid file location"},
{NULL, 0, 0, 0}
};
/* *INDENT-ON* */
static void
mcp_chown(const char *path, uid_t uid, gid_t gid)
{
int rc = chown(path, uid, gid);
if(rc < 0) {
crm_warn("Cannot change the ownership of %s to user %s and gid %d: %s",
path, CRM_DAEMON_USER, gid, pcmk_strerror(errno));
}
}
static gboolean
check_active_before_startup_processes(gpointer user_data)
{
int start_seq = 1, lpc = 0;
static int max = SIZEOF(pcmk_children);
gboolean keep_tracking = FALSE;
for (start_seq = 1; start_seq < max; start_seq++) {
for (lpc = 0; lpc < max; lpc++) {
if (pcmk_children[lpc].active_before_startup == FALSE) {
/* we are already tracking it as a child process. */
continue;
} else if (start_seq != pcmk_children[lpc].start_seq) {
continue;
} else if (crm_pid_active(pcmk_children[lpc].pid) != 1) {
crm_notice("Process %s terminated (pid=%d)",
pcmk_children[lpc].name,
pcmk_children[lpc].pid);
pcmk_process_exit(&(pcmk_children[lpc]));
continue;
}
/* at least one of the processes found at startup
* is still going, so keep this recurring timer around */
keep_tracking = TRUE;
}
}
return keep_tracking;
}
static void
find_and_track_existing_processes(void)
{
DIR *dp;
struct dirent *entry;
struct stat statbuf;
int start_tracker = 0;
dp = opendir("/proc");
if (!dp) {
/* no proc directory to search through */
crm_notice("Can not read /proc directory to track existing components");
return;
}
while ((entry = readdir(dp)) != NULL) {
char procpath[128];
char value[64];
char key[16];
FILE *file;
int pid;
int max = SIZEOF(pcmk_children);
int i;
strcpy(procpath, "/proc/");
/* strlen("/proc/") + strlen("/status") + 1 = 14
* 128 - 14 = 114 */
strncat(procpath, entry->d_name, 114);
if (lstat(procpath, &statbuf)) {
continue;
}
if (!S_ISDIR(statbuf.st_mode) || !isdigit(entry->d_name[0])) {
continue;
}
strcat(procpath, "/status");
file = fopen(procpath, "r");
if (!file) {
continue;
}
if (fscanf(file, "%15s%63s", key, value) != 2) {
fclose(file);
continue;
}
fclose(file);
pid = atoi(entry->d_name);
if (pid <= 0) {
continue;
}
for (i = 0; i < max; i++) {
const char *name = pcmk_children[i].name;
if (pcmk_children[i].start_seq == 0) {
continue;
}
if (pcmk_children[i].flag == crm_proc_stonith_ng) {
name = "stonithd";
}
if (safe_str_eq(name, value)) {
if (crm_pid_active(pid) != 1) {
continue;
}
crm_notice("Tracking existing %s process (pid=%d)",
value, pid);
pcmk_children[i].pid = pid;
pcmk_children[i].active_before_startup = TRUE;
start_tracker = 1;
}
}
}
if (start_tracker) {
g_timeout_add_seconds(PCMK_PROCESS_CHECK_INTERVAL, check_active_before_startup_processes, NULL);
}
closedir(dp);
}
static void
init_children_processes(void) {
int start_seq = 1, lpc = 0;
static int max = SIZEOF(pcmk_children);
/* start any children that have not been detected */
for (start_seq = 1; start_seq < max; start_seq++) {
/* dont start anything with start_seq < 1 */
for (lpc = 0; lpc < max; lpc++) {
if (pcmk_children[lpc].pid) {
/* we are already tracking it */
continue;
}
if (start_seq == pcmk_children[lpc].start_seq) {
start_child(&(pcmk_children[lpc]));
}
}
}
}
int
main(int argc, char **argv)
{
int rc;
int flag;
int argerr = 0;
int option_index = 0;
gboolean shutdown = FALSE;
uid_t pcmk_uid = 0;
gid_t pcmk_gid = 0;
struct rlimit cores;
crm_ipc_t *old_instance = NULL;
qb_ipcs_service_t *ipcs = NULL;
const char *facility = daemon_option("logfacility");
setenv("LC_ALL", "C", 1);
setenv("HA_LOGFACILITY", facility, 1);
setenv("HA_LOGD", "no", 1);
set_daemon_option("mcp", "true");
set_daemon_option("use_logd", "off");
crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE);
crm_set_options(NULL, "mode [options]", long_options, "Start/Stop Pacemaker\n");
/* Restore the original facility so that read_config() does the right thing */
set_daemon_option("logfacility", facility);
while (1) {
flag = crm_get_option(argc, argv, &option_index);
if (flag == -1)
break;
switch (flag) {
case 'V':
crm_bump_log_level(argc, argv);
break;
case 'f':
/* Legacy */
break;
case 'p':
pid_file = optarg;
break;
case '$':
case '?':
crm_help(flag, EX_OK);
break;
case 'S':
shutdown = TRUE;
break;
case 'F':
printf("Pacemaker %s (Build: %s)\n Supporting: %s\n", VERSION, BUILD_VERSION,
CRM_FEATURES);
crm_exit(0);
default:
printf("Argument code 0%o (%c) is not (?yet?) supported\n", flag, flag);
++argerr;
break;
}
}
if (optind < argc) {
printf("non-option ARGV-elements: ");
while (optind < argc)
printf("%s ", argv[optind++]);
printf("\n");
}
if (argerr) {
crm_help('?', EX_USAGE);
}
crm_debug("Checking for old instances of %s", CRM_SYSTEM_MCP);
old_instance = crm_ipc_new(CRM_SYSTEM_MCP, 0);
crm_ipc_connect(old_instance);
-
+
if(shutdown) {
crm_debug("Terminating previous instance");
while (crm_ipc_connected(old_instance)) {
xmlNode *cmd = create_request(CRM_OP_QUIT, NULL, NULL, CRM_SYSTEM_MCP, CRM_SYSTEM_MCP, NULL);
crm_debug(".");
crm_ipc_send(old_instance, cmd, 0, 0, NULL);
free_xml(cmd);
sleep(2);
}
crm_ipc_close(old_instance);
crm_ipc_destroy(old_instance);
crm_exit(0);
} else if(crm_ipc_connected(old_instance)) {
crm_ipc_close(old_instance);
crm_ipc_destroy(old_instance);
crm_err("Pacemaker is already active, aborting startup");
crm_exit(100);
}
crm_ipc_close(old_instance);
crm_ipc_destroy(old_instance);
if (read_config() == FALSE) {
crm_notice("Could not obtain corosync config data, exiting");
crm_exit(1);
}
crm_notice("Starting Pacemaker %s (Build: %s): %s",
VERSION, BUILD_VERSION, CRM_FEATURES);
mainloop = g_main_new(FALSE);
rc = getrlimit(RLIMIT_CORE, &cores);
if (rc < 0) {
crm_perror(LOG_ERR, "Cannot determine current maximum core size.");
} else {
if (cores.rlim_max == 0 && geteuid() == 0) {
cores.rlim_max = RLIM_INFINITY;
} else {
crm_info("Maximum core file size is: %lu", (unsigned long)cores.rlim_max);
}
cores.rlim_cur = cores.rlim_max;
rc = setrlimit(RLIMIT_CORE, &cores);
if (rc < 0) {
crm_perror(LOG_ERR,
"Core file generation will remain disabled."
" Core files are an important diagnositic tool,"
" please consider enabling them by default.");
}
#if 0
/* system() is not thread-safe, can't call from here
* Actually, its a pretty hacky way to try and achieve this anyway
*/
if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) {
crm_perror(LOG_ERR, "Could not enable /proc/sys/kernel/core_uses_pid");
}
#endif
}
if (crm_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) {
crm_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER);
crm_exit(1);
}
mkdir(CRM_STATE_DIR, 0750);
mcp_chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid);
/* Used by stonithd */
build_path(HA_STATE_DIR "/heartbeat", 0755);
mcp_chown(HA_STATE_DIR "/heartbeat", pcmk_uid, pcmk_gid);
/* Used by RAs - Leave owned by root */
build_path(CRM_RSCTMP_DIR, 0755);
/* Used to store core files in */
build_path(CRM_CORE_DIR, 0755);
mcp_chown(CRM_CORE_DIR, pcmk_uid, pcmk_gid);
/* Used to store blackbox dumps in */
build_path(CRM_BLACKBOX_DIR, 0755);
mcp_chown(CRM_BLACKBOX_DIR, pcmk_uid, pcmk_gid);
/* Used to store policy engine inputs in */
build_path(PE_STATE_DIR, 0755);
mcp_chown(PE_STATE_DIR, pcmk_uid, pcmk_gid);
/* Used to store the cluster configuration */
build_path(CRM_CONFIG_DIR, 0755);
mcp_chown(CRM_CONFIG_DIR, pcmk_uid, pcmk_gid);
peers = g_hash_table_new(g_direct_hash, g_direct_equal);
ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, &ipc_callbacks);
if (ipcs == NULL) {
crm_err("Couldn't start IPC server");
crm_exit(1);
}
if (cluster_connect_cfg(&local_nodeid) == FALSE) {
crm_err("Couldn't connect to Corosync's CFG service");
crm_exit(1);
}
if (cluster_connect_cpg() == FALSE) {
crm_err("Couldn't connect to Corosync's CPG service");
crm_exit(1);
}
local_name = get_local_node_name();
update_node_processes(local_nodeid, local_name, get_process_list());
mainloop_add_signal(SIGTERM, pcmk_shutdown);
mainloop_add_signal(SIGINT, pcmk_shutdown);
find_and_track_existing_processes();
init_children_processes();
crm_info("Starting mainloop");
g_main_run(mainloop);
if(ipcs) {
crm_trace("Closing IPC server");
mainloop_del_ipc_server(ipcs);
ipcs = NULL;
}
g_main_destroy(mainloop);
cluster_disconnect_cpg();
cluster_disconnect_cfg();
crm_info("Exiting %s", crm_system_name);
crm_exit(0);
}
diff --git a/pengine/main.c b/pengine/main.c
index 2f5f7cdc4e..2178b92119 100644
--- a/pengine/main.c
+++ b/pengine/main.c
@@ -1,193 +1,193 @@
-/*
+/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
- *
+ *
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <crm_internal.h>
#include <crm/crm.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
-#include <crm/common/ipc.h>
+#include <crm/common/ipcs.h>
#include <crm/common/mainloop.h>
#include <crm/pengine/internal.h>
#include <crm/msg_xml.h>
#if HAVE_LIBXML2
# include <libxml/parser.h>
#endif
#define OPTARGS "hVc"
GMainLoop *mainloop = NULL;
qb_ipcs_service_t *ipcs = NULL;
void pengine_shutdown(int nsig);
static int32_t
pe_ipc_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
crm_trace("Connection %p", c);
if(crm_client_new(c, uid, gid) == NULL) {
return -EIO;
}
return 0;
}
static void
pe_ipc_created(qb_ipcs_connection_t *c)
{
crm_trace("Connection %p", c);
}
gboolean process_pe_message(xmlNode * msg, xmlNode * xml_data, crm_client_t* sender);
static int32_t
pe_ipc_dispatch(qb_ipcs_connection_t *qbc, void *data, size_t size)
{
uint32_t id = 0;
uint32_t flags = 0;
crm_client_t *c = crm_client_get(qbc);
xmlNode *msg = crm_ipcs_recv(c, data, size, &id, &flags);
if(flags & crm_ipc_client_response) {
crm_ipcs_send_ack(c, id, "ack", __FUNCTION__, __LINE__);
}
if (msg != NULL) {
xmlNode *data = get_message_xml(msg, F_CRM_DATA);
-
+
process_pe_message(msg, data, c);
free_xml(msg);
}
return 0;
}
/* Error code means? */
static int32_t
-pe_ipc_closed(qb_ipcs_connection_t *c)
+pe_ipc_closed(qb_ipcs_connection_t *c)
{
crm_client_t *client = crm_client_get(c);
crm_trace("Connection %p", c);
crm_client_destroy(client);
return 0;
}
static void
-pe_ipc_destroy(qb_ipcs_connection_t *c)
+pe_ipc_destroy(qb_ipcs_connection_t *c)
{
crm_trace("Connection %p", c);
}
-struct qb_ipcs_service_handlers ipc_callbacks =
+struct qb_ipcs_service_handlers ipc_callbacks =
{
.connection_accept = pe_ipc_accept,
.connection_created = pe_ipc_created,
.msg_process = pe_ipc_dispatch,
.connection_closed = pe_ipc_closed,
.connection_destroyed = pe_ipc_destroy
};
/* *INDENT-OFF* */
static struct crm_option long_options[] = {
/* Top-level Options */
{"help", 0, 0, '?', "\tThis text"},
{"verbose", 0, 0, 'V', "\tIncrease debug output"},
{0, 0, 0, 0}
};
/* *INDENT-ON* */
int
main(int argc, char **argv)
{
int flag;
int index = 0;
int argerr = 0;
crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE);
crm_set_options(NULL, "[options]",
long_options, "Daemon for calculating the cluster's response to events");
mainloop_add_signal(SIGTERM, pengine_shutdown);
while (1) {
flag = crm_get_option(argc, argv, &index);
if (flag == -1)
break;
switch (flag) {
case 'V':
crm_bump_log_level(argc, argv);
break;
case 'h': /* Help message */
crm_help('?', EX_OK);
break;
default:
++argerr;
break;
}
}
if (argc - optind == 1 && safe_str_eq("metadata", argv[optind])) {
pe_metadata();
return 0;
}
if (optind > argc) {
++argerr;
}
if (argerr) {
crm_help('?', EX_USAGE);
}
if (crm_is_writable(PE_STATE_DIR, NULL, CRM_DAEMON_USER, CRM_DAEMON_GROUP, FALSE) == FALSE) {
crm_err("Bad permissions on " PE_STATE_DIR ". Terminating");
fprintf(stderr, "ERROR: Bad permissions on " PE_STATE_DIR ". See logs for details\n");
fflush(stderr);
return 100;
}
crm_debug("Init server comms");
ipcs = mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_SHM, &ipc_callbacks);
if (ipcs == NULL) {
crm_err("Failed to create IPC server: shutting down and inhibiting respawn");
crm_exit(100);
}
/* Create the mainloop and run it... */
crm_info("Starting %s", crm_system_name);
mainloop = g_main_new(FALSE);
g_main_run(mainloop);
crm_info("Exiting %s", crm_system_name);
return crm_exit(0);
}
void
pengine_shutdown(int nsig)
{
mainloop_del_ipc_server(ipcs);
crm_exit(EX_OK);
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jul 8, 6:16 PM (12 h, 12 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2002557
Default Alt Text
(237 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment