Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4511790
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
18 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/crm/common/ipc.c b/lib/crm/common/ipc.c
index 90603628c3..8428a56fa9 100644
--- a/lib/crm/common/ipc.c
+++ b/lib/crm/common/ipc.c
@@ -1,718 +1,718 @@
/*
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <crm_internal.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <crm/crm.h>
#include <clplumbing/ipc.h>
#include <clplumbing/Gmain_timeout.h>
#include <clplumbing/cl_log.h>
#include <clplumbing/cl_signal.h>
#include <clplumbing/lsb_exitcodes.h>
#include <clplumbing/uids.h>
#include <clplumbing/realtime.h>
#include <clplumbing/GSource.h>
#include <clplumbing/cl_poll.h>
#include <ha_msg.h>
#include <crm/msg_xml.h>
#include <crm/common/ipc.h>
#include <crm/common/cluster.h>
xmlNode *xmlfromIPC(IPC_Channel *ch, int timeout)
{
xmlNode *xml = NULL;
HA_Message *msg = NULL;
#if HAVE_MSGFROMIPC_TIMEOUT
- int ipc_rc = 0;
+ int ipc_rc = IPC_OK;
msg = msgfromIPC_timeout(ch, MSG_ALLOWINTR, timeout, &ipc_rc);
if(ipc_rc == IPC_TIMEOUT) {
crm_warn("No message received in the required interval (%ds)", timeout);
return NULL;
} else if(ipc_rc == IPC_BROKEN) {
crm_debug("Peer disconnected");
return NULL;
} else if(ipc_rc != IPC_OK) {
crm_err("msgfromIPC_timeout failed: rc=%d", ipc_rc);
return NULL;
} else if(msg == NULL) {
crm_err("Empty reply from msgfromIPC_timeout");
return NULL;
}
#else
static gboolean do_show_error = TRUE;
if(timeout && do_show_error) {
crm_err("Timeouts are not supported by the current heartbeat libraries");
do_show_error = FALSE;
}
msg = msgfromIPC_noauth(ch);
if(msg == NULL) {
- crm_err("Empty reply from msgfromIPC_noauth");
+ crm_debug("Empty reply from msgfromIPC_noauth");
return NULL;
}
#endif
xml = convert_ha_message(NULL, msg, __FUNCTION__);
crm_msg_del(msg);
return xml;
}
static int xml2ipcchan(xmlNode *m, IPC_Channel *ch)
{
HA_Message *msg = NULL;
IPC_Message *imsg = NULL;
if (m == NULL || ch == NULL) {
cl_log(LOG_ERR, "Invalid msg2ipcchan argument");
errno = EINVAL;
return HA_FAIL;
}
msg = convert_xml_message(m);
if ((imsg = hamsg2ipcmsg(msg, ch)) == NULL) {
cl_log(LOG_ERR, "hamsg2ipcmsg() failure");
crm_msg_del(msg);
return HA_FAIL;
}
crm_msg_del(msg);
if (ch->ops->send(ch, imsg) != IPC_OK) {
if (ch->ch_status == IPC_CONNECT) {
snprintf(ch->failreason,MAXFAILREASON,
"send failed,farside_pid=%d, sendq length=%ld(max is %ld)",
ch->farside_pid, (long)ch->send_queue->current_qlen,
(long)ch->send_queue->max_qlen);
}
imsg->msg_done(imsg);
return HA_FAIL;
}
return HA_OK;
}
/* frees msg */
gboolean
send_ipc_message(IPC_Channel *ipc_client, xmlNode *msg)
{
gboolean all_is_good = TRUE;
int fail_level = LOG_WARNING;
if(ipc_client != NULL && ipc_client->conntype == IPC_CLIENT) {
fail_level = LOG_ERR;
}
if (msg == NULL) {
crm_err("cant send NULL message");
all_is_good = FALSE;
} else if (ipc_client == NULL) {
crm_err("cant send message without an IPC Channel");
all_is_good = FALSE;
} else if(ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) {
do_crm_log(fail_level, "IPC Channel to %d is not connected",
(int)ipc_client->farside_pid);
all_is_good = FALSE;
}
if(all_is_good && xml2ipcchan(msg, ipc_client) != HA_OK) {
do_crm_log(fail_level, "Could not send IPC message to %d",
(int)ipc_client->farside_pid);
all_is_good = FALSE;
if(ipc_client->ops->get_chan_status(ipc_client) != IPC_CONNECT) {
do_crm_log(fail_level,
"IPC Channel to %d is no longer connected",
(int)ipc_client->farside_pid);
} else if(ipc_client->conntype == IPC_CLIENT) {
if(ipc_client->send_queue->current_qlen >= ipc_client->send_queue->max_qlen) {
crm_err("Send queue to %d (size=%d) full.",
ipc_client->farside_pid,
(int)ipc_client->send_queue->max_qlen);
}
}
}
/* ipc_client->ops->resume_io(ipc_client); */
crm_log_xml(all_is_good?LOG_MSG:LOG_WARNING,"IPC[outbound]",msg);
return all_is_good;
}
void
default_ipc_connection_destroy(gpointer user_data)
{
return;
}
int
init_server_ipc_comms(
char *channel_name,
gboolean (*channel_client_connect)(IPC_Channel *newclient,gpointer user_data),
void (*channel_connection_destroy)(gpointer user_data))
{
/* the clients wait channel is the other source of events.
* This source delivers the clients connection events.
* listen to this source at a relatively lower priority.
*/
char commpath[SOCKET_LEN];
IPC_WaitConnection *wait_ch;
sprintf(commpath, CRM_SOCK_DIR "/%s", channel_name);
wait_ch = wait_channel_init(commpath);
if (wait_ch == NULL) {
return 1;
}
G_main_add_IPC_WaitConnection(
G_PRIORITY_LOW, wait_ch, NULL, FALSE,
channel_client_connect, channel_name,
channel_connection_destroy);
crm_debug_3("Listening on: %s", commpath);
return 0;
}
GCHSource*
init_client_ipc_comms(const char *channel_name,
gboolean (*dispatch)(
IPC_Channel* source_data, gpointer user_data),
void *client_data, IPC_Channel **ch)
{
IPC_Channel *a_ch = NULL;
GCHSource *the_source = NULL;
void *callback_data = client_data;
a_ch = init_client_ipc_comms_nodispatch(channel_name);
if(ch != NULL) {
*ch = a_ch;
if(callback_data == NULL) {
callback_data = a_ch;
}
}
if(a_ch == NULL) {
crm_warn("Setup of client connection failed,"
" not adding channel to mainloop");
return NULL;
}
if(dispatch == NULL) {
crm_warn("No dispatch method specified..."
"maybe you meant init_client_ipc_comms_nodispatch()?");
} else {
crm_debug_3("Adding dispatch method to channel");
the_source = G_main_add_IPC_Channel(
G_PRIORITY_HIGH, a_ch, FALSE, dispatch, callback_data,
default_ipc_connection_destroy);
}
return the_source;
}
IPC_Channel *
init_client_ipc_comms_nodispatch(const char *channel_name)
{
IPC_Channel *ch;
GHashTable *attrs;
static char path[] = IPC_PATH_ATTR;
char *commpath = NULL;
int local_socket_len = 2; /* 2 = '/' + '\0' */
local_socket_len += strlen(channel_name);
local_socket_len += strlen(CRM_SOCK_DIR);
crm_malloc0(commpath, local_socket_len);
sprintf(commpath, CRM_SOCK_DIR "/%s", channel_name);
commpath[local_socket_len - 1] = '\0';
crm_debug("Attempting to talk on: %s", commpath);
attrs = g_hash_table_new(g_str_hash,g_str_equal);
g_hash_table_insert(attrs, path, commpath);
ch = ipc_channel_constructor(IPC_ANYTYPE, attrs);
g_hash_table_destroy(attrs);
if (ch == NULL) {
crm_err("Could not access channel on: %s", commpath);
crm_free(commpath);
return NULL;
} else if (ch->ops->initiate_connection(ch) != IPC_OK) {
crm_debug("Could not init comms on: %s", commpath);
ch->ops->destroy(ch);
crm_free(commpath);
return NULL;
}
ch->ops->set_recv_qlen(ch, 512);
ch->ops->set_send_qlen(ch, 512);
ch->should_send_block = TRUE;
crm_debug_3("Processing of %s complete", commpath);
crm_free(commpath);
return ch;
}
IPC_WaitConnection *
wait_channel_init(char daemonsocket[])
{
IPC_WaitConnection *wait_ch;
mode_t mask;
char path[] = IPC_PATH_ATTR;
GHashTable * attrs;
attrs = g_hash_table_new(g_str_hash,g_str_equal);
g_hash_table_insert(attrs, path, daemonsocket);
mask = umask(0);
wait_ch = ipc_wait_conn_constructor(IPC_ANYTYPE, attrs);
if (wait_ch == NULL) {
cl_perror("Can't create wait channel of type %s",
IPC_ANYTYPE);
exit(1);
}
mask = umask(mask);
g_hash_table_destroy(attrs);
return wait_ch;
}
longclock_t ipc_call_start = 0;
longclock_t ipc_call_stop = 0;
longclock_t ipc_call_diff = 0;
gboolean
subsystem_msg_dispatch(IPC_Channel *sender, void *user_data)
{
int lpc = 0;
xmlNode *msg = NULL;
xmlNode *data = NULL;
gboolean all_is_well = TRUE;
const char *sys_to;
const char *task;
gboolean (*process_function)
(xmlNode *msg, xmlNode *data, IPC_Channel *sender) = NULL;
while(IPC_ISRCONN(sender)) {
gboolean process = FALSE;
if(sender->ops->is_message_pending(sender) == 0) {
break;
}
msg = xmlfromIPC(sender, 0);
if (msg == NULL) {
break;
}
lpc++;
crm_log_xml(LOG_MSG, __FUNCTION__, msg);
sys_to = crm_element_value(msg, F_CRM_SYS_TO);
task = crm_element_value(msg, F_CRM_TASK);
if(safe_str_eq(task, CRM_OP_HELLO)) {
process = TRUE;
} else if(sys_to == NULL) {
crm_err("Value of %s was NULL!!", F_CRM_SYS_TO);
} else if(task == NULL) {
crm_err("Value of %s was NULL!!", F_CRM_TASK);
} else {
process = TRUE;
}
if(process == FALSE) {
free_xml(msg); msg = NULL;
continue;
}
data = get_message_xml(msg, F_CRM_DATA);
process_function = user_data;
if(ipc_call_diff_max_ms > 0) {
ipc_call_start = time_longclock();
}
if(FALSE == process_function(msg, data, sender)) {
crm_warn("Received a message destined for %s"
" by mistake", sys_to);
}
if(ipc_call_diff_max_ms > 0) {
unsigned int ipc_call_diff_ms = 0;
ipc_call_stop = time_longclock();
ipc_call_diff = sub_longclock(
ipc_call_stop, ipc_call_start);
ipc_call_diff_ms = longclockto_ms(ipc_call_diff);
if(ipc_call_diff_ms > ipc_call_diff_max_ms) {
crm_err("%s took %dms to complete",
sys_to, ipc_call_diff_ms);
}
}
free_xml(msg); msg = NULL;
if(sender->ch_status == IPC_CONNECT) {
break;
}
}
crm_debug_2("Processed %d messages", lpc);
if (sender->ch_status != IPC_CONNECT) {
crm_err("The server %d has left us: Shutting down...NOW",
sender->farside_pid);
exit(1); /* shutdown properly later */
return !all_is_well;
}
return all_is_well;
}
gboolean
is_ipc_empty(IPC_Channel *ch)
{
if(ch == NULL) {
return TRUE;
} else if(ch->send_queue->current_qlen == 0
&& ch->recv_queue->current_qlen == 0) {
return TRUE;
}
return FALSE;
}
void
send_hello_message(IPC_Channel *ipc_client,
const char *uuid,
const char *client_name,
const char *major_version,
const char *minor_version)
{
xmlNode *hello_node = NULL;
xmlNode *hello = NULL;
if (uuid == NULL || strlen(uuid) == 0
|| client_name == NULL || strlen(client_name) == 0
|| major_version == NULL || strlen(major_version) == 0
|| minor_version == NULL || strlen(minor_version) == 0) {
crm_err("Missing fields, Hello message will not be valid.");
return;
}
hello_node = create_xml_node(NULL, XML_TAG_OPTIONS);
crm_xml_add(hello_node, "major_version", major_version);
crm_xml_add(hello_node, "minor_version", minor_version);
crm_xml_add(hello_node, "client_name", client_name);
crm_xml_add(hello_node, "client_uuid", uuid);
crm_debug_4("creating hello message");
hello = create_request(
CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid);
send_ipc_message(ipc_client, hello);
crm_debug_4("hello message sent");
free_xml(hello_node);
free_xml(hello);
}
gboolean
process_hello_message(xmlNode *hello,
char **uuid,
char **client_name,
char **major_version,
char **minor_version)
{
const char *local_uuid;
const char *local_client_name;
const char *local_major_version;
const char *local_minor_version;
*uuid = NULL;
*client_name = NULL;
*major_version = NULL;
*minor_version = NULL;
if(hello == NULL) {
return FALSE;
}
local_uuid = crm_element_value(hello, "client_uuid");
local_client_name = crm_element_value(hello, "client_name");
local_major_version = crm_element_value(hello, "major_version");
local_minor_version = crm_element_value(hello, "minor_version");
if (local_uuid == NULL || strlen(local_uuid) == 0) {
crm_err("Hello message was not valid (field %s not found)",
"uuid");
return FALSE;
} else if (local_client_name==NULL || strlen(local_client_name)==0){
crm_err("Hello message was not valid (field %s not found)",
"client name");
return FALSE;
} else if(local_major_version == NULL
|| strlen(local_major_version) == 0){
crm_err("Hello message was not valid (field %s not found)",
"major version");
return FALSE;
} else if (local_minor_version == NULL
|| strlen(local_minor_version) == 0){
crm_err("Hello message was not valid (field %s not found)",
"minor version");
return FALSE;
}
*uuid = crm_strdup(local_uuid);
*client_name = crm_strdup(local_client_name);
*major_version = crm_strdup(local_major_version);
*minor_version = crm_strdup(local_minor_version);
crm_debug_3("Hello message ok");
return TRUE;
}
xmlNode *
create_request_adv(const char *task, xmlNode *msg_data,
const char *host_to, const char *sys_to,
const char *sys_from, const char *uuid_from,
const char *origin)
{
char *true_from = NULL;
xmlNode *request = NULL;
char *reference = generateReference(task, sys_from);
if (uuid_from != NULL) {
true_from = generate_hash_key(sys_from, uuid_from);
} else if(sys_from != NULL) {
true_from = crm_strdup(sys_from);
} else {
crm_err("No sys from specified");
}
/* host_from will get set for us if necessary by CRMd when routed */
request = create_xml_node(NULL, __FUNCTION__);
crm_xml_add(request, F_CRM_ORIGIN, origin);
crm_xml_add(request, F_TYPE, T_CRM);
crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST);
crm_xml_add(request, XML_ATTR_REFERENCE, reference);
crm_xml_add(request, F_CRM_TASK, task);
crm_xml_add(request, F_CRM_SYS_TO, sys_to);
crm_xml_add(request, F_CRM_SYS_FROM, true_from);
/* HOSTTO will be ignored if it is to the DC anyway. */
if(host_to != NULL && strlen(host_to) > 0) {
crm_xml_add(request, F_CRM_HOST_TO, host_to);
}
if (msg_data != NULL) {
add_message_xml(request, F_CRM_DATA, msg_data);
}
crm_free(reference);
crm_free(true_from);
return request;
}
ha_msg_input_t *
new_ha_msg_input(xmlNode *orig)
{
ha_msg_input_t *input_copy = NULL;
crm_malloc0(input_copy, sizeof(ha_msg_input_t));
input_copy->msg = copy_xml(orig);
input_copy->xml = get_message_xml(input_copy->msg, F_CRM_DATA);
return input_copy;
}
ha_msg_input_t *
new_ipc_msg_input(xmlNode *orig)
{
/* HA_Message *msg = NULL; */
ha_msg_input_t *input_copy = NULL;
crm_malloc0(input_copy, sizeof(ha_msg_input_t));
/* msg = ipcmsg2hamsg(orig); */
input_copy->msg = copy_xml(orig);
input_copy->xml = get_message_xml(input_copy->msg, F_CRM_DATA);
return input_copy;
}
void
delete_ha_msg_input(ha_msg_input_t *orig)
{
if(orig == NULL) {
return;
}
free_xml(orig->msg);
crm_free(orig);
}
xmlNode *
validate_crm_message(
xmlNode *msg, const char *sys, const char *uuid, const char *msg_type)
{
const char *from = NULL;
const char *to = NULL;
const char *type = NULL;
const char *crm_msg_reference = NULL;
xmlNode *action = NULL;
const char *true_sys;
char *local_sys = NULL;
if (msg == NULL) {
return NULL;
}
from = crm_element_value(msg, F_CRM_SYS_FROM);
to = crm_element_value(msg, F_CRM_SYS_TO);
type = crm_element_value(msg, F_CRM_MSG_TYPE);
crm_msg_reference = crm_element_value(msg, XML_ATTR_REFERENCE);
action = msg;
true_sys = sys;
if (uuid != NULL) {
local_sys = generate_hash_key(sys, uuid);
true_sys = local_sys;
}
if (to == NULL) {
crm_info("No sub-system defined.");
action = NULL;
} else if (true_sys != NULL && strcasecmp(to, true_sys) != 0) {
crm_debug_3("The message is not for this sub-system (%s != %s).",
to, true_sys);
action = NULL;
}
crm_free(local_sys);
if (type == NULL) {
crm_info("No message type defined.");
return NULL;
} else if (msg_type != NULL && strcasecmp(msg_type, type) != 0) {
crm_info("Expecting a (%s) message but received a (%s).",
msg_type, type);
action = NULL;
}
if (crm_msg_reference == NULL) {
crm_info("No message crm_msg_reference defined.");
action = NULL;
}
/*
if(action != NULL)
crm_debug_3(
"XML is valid and node with message type (%s) found.",
type);
crm_debug_3("Returning node (%s)", crm_element_name(action));
*/
return action;
}
/*
* This method adds a copy of xml_response_data
*/
xmlNode *
create_reply_adv(xmlNode *original_request,
xmlNode *xml_response_data, const char *origin)
{
xmlNode *reply = NULL;
const char *host_from= crm_element_value(original_request, F_CRM_HOST_FROM);
const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM);
const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO);
const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE);
const char *operation= crm_element_value(original_request, F_CRM_TASK);
const char *crm_msg_reference = crm_element_value(
original_request, XML_ATTR_REFERENCE);
if (type == NULL) {
crm_err("Cannot create new_message,"
" no message type in original message");
CRM_ASSERT(type != NULL);
return NULL;
#if 0
} else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) {
crm_err("Cannot create new_message,"
" original message was not a request");
return NULL;
#endif
}
reply = create_xml_node(NULL, __FUNCTION__);
crm_xml_add(reply, F_CRM_ORIGIN, origin);
crm_xml_add(reply, F_TYPE, T_CRM);
crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE);
crm_xml_add(reply, XML_ATTR_REFERENCE, crm_msg_reference);
crm_xml_add(reply, F_CRM_TASK, operation);
/* since this is a reply, we reverse the from and to */
crm_xml_add(reply, F_CRM_SYS_TO, sys_from);
crm_xml_add(reply, F_CRM_SYS_FROM, sys_to);
/* HOSTTO will be ignored if it is to the DC anyway. */
if(host_from != NULL && strlen(host_from) > 0) {
crm_xml_add(reply, F_CRM_HOST_TO, host_from);
}
if (xml_response_data != NULL) {
add_message_xml(reply, F_CRM_DATA, xml_response_data);
}
return reply;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Jun 25, 2:31 AM (14 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1951884
Default Alt Text
(18 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment