Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2022287
msg.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
56 KB
Referenced Files
None
Subscribers
None
msg.c
View Options
/*
* Copyright (c) 2005-2006 MontaVista Software, Inc.
* Copyright (c) 2006 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake (sdake@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <arpa/inet.h>
#include "../include/saAis.h"
#include "../include/saMsg.h"
#include "../include/ipc_msg.h"
#include "../include/list.h"
#include "../include/queue.h"
#include "../lcr/lcr_comp.h"
#include "objdb.h"
#include "totem.h"
#include "service.h"
#include "mempool.h"
#include "util.h"
#include "main.h"
#include "flow.h"
#include "tlist.h"
#include "ipc.h"
#include "totempg.h"
#include "logsys.h"
LOGSYS_DECLARE_SUBSYS ("MSG", LOG_INFO);
enum msg_exec_message_req_types {
MESSAGE_REQ_EXEC_MSG_QUEUEOPEN = 0,
MESSAGE_REQ_EXEC_MSG_QUEUECLOSE = 1,
MESSAGE_REQ_EXEC_MSG_QUEUESTATUSGET = 2,
MESSAGE_REQ_EXEC_MSG_QUEUEUNLINK = 3,
MESSAGE_REQ_EXEC_MSG_QUEUEGROUPCREATE = 4,
MESSAGE_REQ_EXEC_MSG_QUEUEGROUPINSERT = 5,
MESSAGE_REQ_EXEC_MSG_QUEUEGROUPREMOVE = 6,
MESSAGE_REQ_EXEC_MSG_QUEUEGROUPDELETE = 7,
MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACK = 8,
MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACKSTOP = 9,
MESSAGE_REQ_EXEC_MSG_MESSAGESEND = 10,
MESSAGE_REQ_EXEC_MSG_MESSAGEGET = 11,
MESSAGE_REQ_EXEC_MSG_MESSAGECANCEL = 12,
MESSAGE_REQ_EXEC_MSG_MESSAGESENDRECEIVE = 13,
MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY = 14
};
struct message_entry {
SaTimeT time;
SaMsgMessageT message;
struct list_head list;
};
struct message_queue {
SaNameT name;
int refcount;
struct list_head list;
struct list_head message_list_head;
};
struct queue_group {
SaNameT name;
struct list_head list;
struct list_head message_queue_head;
};
struct queue_group_entry {
struct message_queue *message_queue;
struct list_head list;
};
/*
struct queue_cleanup {
struct message_queue *queue;
SaMsgResourceHandleT queue_handle;
struct list_head queue_lock_list_head;
struct list_head list;
};
*/
DECLARE_LIST_INIT(queue_list_head);
DECLARE_LIST_INIT(queue_group_list_head);
static int msg_exec_init_fn (struct objdb_iface_ver0 *objdb);
static int msg_lib_exit_fn (void *conn);
static int msg_lib_init_fn (void *conn);
static void message_handler_req_exec_msg_queueopen (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queueclose (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuestatusget (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queueunlink (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuegroupcreate (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuegroupinsert (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuegroupremove (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuegroupdelete (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuegrouptrack (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_queuegrouptrackstop (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_messagesend (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_messageget (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_messagecancel (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_messagesendreceive (
void *message,
unsigned int nodeid);
static void message_handler_req_exec_msg_messagereply (
void *message,
unsigned int nodeid);
static void message_handler_req_lib_msg_queueopen (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queueopenasync (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queueclose (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuestatusget (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queueunlink (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuegroupcreate (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuegroupinsert (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuegroupremove (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuegroupdelete (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuegrouptrack (
void *conn,
void *msg);
static void message_handler_req_lib_msg_queuegrouptrackstop (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messagesend (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messagesendasync (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messageget (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messagecancel (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messagesendreceive (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messagereply (
void *conn,
void *msg);
static void message_handler_req_lib_msg_messagereplyasync (
void *conn,
void *msg);
#ifdef TODO
static void msg_sync_init (void);
#endif /* TODO */
static void msg_sync_activate (void);
static int msg_sync_process (void);
static void msg_sync_abort(void);
void queue_release (struct message_queue *queue);
static void msg_confchg_fn (
enum totem_configuration_type configuration_type,
unsigned int *member_list, int member_list_entries,
unsigned int *left_list, int left_list_entries,
unsigned int *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id);
struct msg_pd {
struct list_head queue_list;
struct list_head queue_cleanup_list;
};
/*
* Executive Handler Definition
*/
struct openais_lib_handler msg_lib_service[] =
{
{ /* 0 */
.lib_handler_fn = message_handler_req_lib_msg_queueopen,
.response_size = sizeof (struct res_lib_msg_queueopen),
.response_id = MESSAGE_RES_MSG_QUEUEOPEN,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 1 */
.lib_handler_fn = message_handler_req_lib_msg_queueopenasync,
.response_size = sizeof (struct res_lib_msg_queueopenasync),
.response_id = MESSAGE_RES_MSG_QUEUEOPENASYNC,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 2 */
.lib_handler_fn = message_handler_req_lib_msg_queueclose,
.response_size = sizeof (struct res_lib_msg_queueclose),
.response_id = MESSAGE_RES_MSG_QUEUECLOSE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 3 */
.lib_handler_fn = message_handler_req_lib_msg_queuestatusget,
.response_size = sizeof (struct res_lib_msg_queuestatusget),
.response_id = MESSAGE_RES_MSG_QUEUESTATUSGET,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 4 */
.lib_handler_fn = message_handler_req_lib_msg_queueunlink,
.response_size = sizeof (struct res_lib_msg_queueunlink),
.response_id = MESSAGE_RES_MSG_QUEUEUNLINK,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 5 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupcreate,
.response_size = sizeof (struct res_lib_msg_queuegroupcreate),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPCREATE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 6 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupinsert,
.response_size = sizeof (struct res_lib_msg_queuegroupinsert),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPINSERT,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 7 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupremove,
.response_size = sizeof (struct res_lib_msg_queuegroupremove),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPREMOVE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 8 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupdelete,
.response_size = sizeof (struct res_lib_msg_queuegroupdelete),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPDELETE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 9 */
.lib_handler_fn = message_handler_req_lib_msg_queuegrouptrack,
.response_size = sizeof (struct res_lib_msg_queuegrouptrack),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPTRACK,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 10 */
.lib_handler_fn = message_handler_req_lib_msg_queuegrouptrackstop,
.response_size = sizeof (struct res_lib_msg_queuegrouptrackstop),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 11 */
.lib_handler_fn = message_handler_req_lib_msg_messagesend,
.response_size = sizeof (struct res_lib_msg_messagesend),
.response_id = MESSAGE_RES_MSG_MESSAGESEND,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 12 */
.lib_handler_fn = message_handler_req_lib_msg_messagesendasync,
.response_size = sizeof (struct res_lib_msg_messagesendasync),
.response_id = MESSAGE_RES_MSG_MESSAGESENDASYNC,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 13 */
.lib_handler_fn = message_handler_req_lib_msg_messageget,
.response_size = sizeof (struct res_lib_msg_messageget),
.response_id = MESSAGE_RES_MSG_MESSAGEGET,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 14 */
.lib_handler_fn = message_handler_req_lib_msg_messagecancel,
.response_size = sizeof (struct res_lib_msg_messagecancel),
.response_id = MESSAGE_RES_MSG_MESSAGECANCEL,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 15 */
.lib_handler_fn = message_handler_req_lib_msg_messagesendreceive,
.response_size = sizeof (struct res_lib_msg_messagesendreceive),
.response_id = MESSAGE_RES_MSG_MESSAGESENDRECEIVE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 16 */
.lib_handler_fn = message_handler_req_lib_msg_messagereply,
.response_size = sizeof (struct res_lib_msg_messagereply),
.response_id = MESSAGE_RES_MSG_MESSAGEREPLY,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 17 */
.lib_handler_fn = message_handler_req_lib_msg_messagereplyasync,
.response_size = sizeof (struct res_lib_msg_messagereplyasync),
.response_id = MESSAGE_RES_MSG_MESSAGEREPLYASYNC,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
};
static struct openais_exec_handler msg_exec_service[] = {
{
.exec_handler_fn = message_handler_req_exec_msg_queueopen,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queueclose,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuestatusget,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queueunlink,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuegroupcreate,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuegroupinsert,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuegroupremove,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuegroupdelete,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuegrouptrack,
},
{
.exec_handler_fn = message_handler_req_exec_msg_queuegrouptrackstop,
},
{
.exec_handler_fn = message_handler_req_exec_msg_messagesend,
},
{
.exec_handler_fn = message_handler_req_exec_msg_messageget,
},
{
.exec_handler_fn = message_handler_req_exec_msg_messagecancel,
},
{
.exec_handler_fn = message_handler_req_exec_msg_messagesendreceive,
},
{
.exec_handler_fn = message_handler_req_exec_msg_messagereply
}
};
struct openais_service_handler msg_service_handler = {
.name = "openais message service B.01.01",
.id = MSG_SERVICE,
.private_data_size = sizeof (struct msg_pd),
.flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
.lib_init_fn = msg_lib_init_fn,
.lib_exit_fn = msg_lib_exit_fn,
.lib_service = msg_lib_service,
.lib_service_count = sizeof (msg_lib_service) / sizeof (struct openais_lib_handler),
.exec_init_fn = msg_exec_init_fn,
.exec_service = msg_exec_service,
.exec_service_count = sizeof (msg_exec_service) / sizeof (struct openais_exec_handler),
.confchg_fn = msg_confchg_fn,
.exec_dump_fn = NULL,
.sync_init = NULL, // TODO msg_sync_init,
.sync_process = msg_sync_process,
.sync_activate = msg_sync_activate,
.sync_abort = msg_sync_abort
};
static struct openais_service_handler *msg_get_handler_ver0 (void);
static struct openais_service_handler_iface_ver0 msg_service_handler_iface = {
.openais_get_service_handler_ver0 = msg_get_handler_ver0
};
static struct lcr_iface openais_msg_ver0[1] = {
{
.name = "openais_msg",
.version = 0,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = NULL
}
};
static struct lcr_comp msg_comp_ver0 = {
.iface_count = 1,
.ifaces = openais_msg_ver0
};
static struct openais_service_handler *msg_get_handler_ver0 (void)
{
return (&msg_service_handler);
}
__attribute__ ((constructor)) static void register_this_component (void) {
lcr_interfaces_set (&openais_msg_ver0[0], &msg_service_handler_iface);
lcr_component_register (&msg_comp_ver0);
}
/*
* All data types used for executive messages
*/
struct req_exec_msg_queueopen {
mar_req_header_t header;
mar_message_source_t source;
int async_call;
SaNameT queue_name;
SaInvocationT invocation;
SaMsgQueueHandleT queue_handle;
SaMsgQueueCreationAttributesT creation_attributes;
SaMsgQueueOpenFlagsT openFlags;
SaTimeT timeout;
};
struct req_exec_msg_queueclose {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
struct req_exec_msg_queuestatusget {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
struct req_exec_msg_queueunlink {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
struct req_exec_msg_queuegroupcreate {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
struct req_exec_msg_queuegroupinsert {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
SaNameT queue_group_name;
};
struct req_exec_msg_queuegroupremove {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
SaNameT queue_group_name;
};
struct req_exec_msg_queuegroupdelete {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
struct req_exec_msg_queuegrouptrack {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
struct req_exec_msg_queuegrouptrackstop {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
struct req_exec_msg_messagesend {
mar_req_header_t header;
mar_message_source_t source;
SaNameT destination;
SaTimeT timeout;
SaMsgMessageT message;
SaInvocationT invocation;
SaMsgAckFlagsT ack_flags;
int async_call;
};
struct req_exec_msg_messageget {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
struct req_exec_msg_messagecancel {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
struct req_exec_msg_messagesendreceive {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
struct req_exec_msg_messagereply {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
int async_call;
};
#ifdef TODO
static void msg_sync_init (void)
{
return;
}
#endif /* TODO */
static int msg_sync_process (void)
{
return (0);
}
static void msg_sync_activate (void)
{
return;
}
static void msg_sync_abort (void)
{
return;
}
static void msg_confchg_fn (
enum totem_configuration_type configuration_type,
unsigned int *member_list, int member_list_entries,
unsigned int *left_list, int left_list_entries,
unsigned int *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id)
{
return;
}
static void print_message_list (struct message_queue *queue)
{
struct list_head *list;
struct message_entry *entry;
for (list = queue->message_list_head.next;
list != &queue->message_list_head;
list = list->next)
{
entry = list_entry (list, struct message_entry, list);
log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n",
(char *)(entry->message.data), (unsigned long long)(entry->time));
}
}
static struct message_queue *queue_find (SaNameT *name)
{
struct list_head *list;
struct message_queue *queue;
for (list = queue_list_head.next;
list != &queue_list_head;
list = list->next)
{
queue = list_entry (list, struct message_queue, list);
if (name_match (name, &queue->name)) {
return (queue);
}
}
return (0);
}
static struct queue_group *queue_group_find (SaNameT *name)
{
struct list_head *list;
struct queue_group *queue_group;
for (list = queue_group_list_head.next;
list != &queue_group_list_head;
list = list->next)
{
queue_group = list_entry (list, struct queue_group, list);
if (name_match (name, &queue_group->name)) {
return (queue_group);
}
}
return (0);
}
static struct queue_group_entry *queue_group_entry_find (
struct queue_group *queue_group,
struct message_queue *queue)
{
struct list_head *list;
struct queue_group_entry *queue_group_entry;
for (list = queue_group->message_queue_head.next;
list != &queue_group->message_queue_head;
list = list->next)
{
queue_group_entry = list_entry (list, struct queue_group_entry, list);
if (queue_group_entry->message_queue == queue) {
return (queue_group_entry);
}
}
return (0);
}
static int msg_exec_init_fn (struct objdb_iface_ver0 *objdb)
{
/*
* Initialize the saved ring ID.
*/
/* saved_ring_id.seq = 0; */
/* saved_ring_id.rep.s_addr = this_ip->sin_addr.s_addr; */
return (0);
}
static int msg_lib_exit_fn (void *conn)
{
/*
* struct msg_pd *msg_pd = (struct msg_pd *)openais_conn_private_data_get (conn);
*/
#ifdef COMPILE_OUT
struct queue_cleanup *queue_cleanup;
struct list_head *list;
/*
* close all queues opened on this fd
*/
list = conn_info->conn_info_partner->ais_ci.u.libmsg_ci.queue_cleanup_list.next;
while (!list_empty(&conn_info->conn_info_partner->ais_ci.u.libmsg_ci.queue_cleanup_list)) {
queue_cleanup = list_entry (list, struct queue_cleanup, list);
if (queue_cleanup->queue->name.length > 0) {
msg_queue_cleanup_lock_remove (queue_cleanup);
msg_queue_close (queue_cleanup->queue);
}
list_del (&queue_cleanup->list);
free (queue_cleanup);
list = conn_info->conn_info_partner->ais_ci.u.libmsg_ci.queue_cleanup_list.next;
}
#endif /* COMPILE_OUT */
return (0);
}
static int msg_lib_init_fn (void *conn)
{
struct msg_pd *msg_pd = (struct msg_pd *)openais_conn_private_data_get (conn);
list_init (&msg_pd->queue_list);
list_init (&msg_pd->queue_cleanup_list);
return (0);
}
static void message_handler_req_exec_msg_queueopen (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_queueopen *req_exec_msg_queueopen =
(struct req_exec_msg_queueopen *)message;
struct res_lib_msg_queueopen res_lib_msg_queueopen;
struct res_lib_msg_queueopenasync res_lib_msg_queueopenasync;
struct message_queue *queue;
/* struct queue_cleanup *queue_cleanup; */
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueOpen %s\n",
getSaNameT (&req_exec_msg_queueopen->queue_name));
queue = queue_find (&req_exec_msg_queueopen->queue_name);
/*
* If queue doesn't exist, create one
*/
if (queue == 0) {
if ((req_exec_msg_queueopen->openFlags & SA_MSG_QUEUE_CREATE) == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
queue = malloc (sizeof (struct message_queue));
if (queue == 0) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
memset (queue, 0, sizeof (struct message_queue));
memcpy (&queue->name,
&req_exec_msg_queueopen->queue_name,
sizeof (SaNameT));
list_init (&queue->list);
list_init (&queue->message_list_head);
list_add (&queue->list, &queue_list_head);
queue->refcount = 0;
}
queue->refcount += 1;
#ifdef COMPILE_OUT
/*
* Setup connection information and mark queue as referenced
*/
queue_cleanup = malloc (sizeof (struct queue_cleanup));
if (queue_cleanup == 0) {
free (queue);
error = SA_AIS_ERR_NO_MEMORY;
} else {
list_init (&queue_cleanup->list);
list_init (&queue_cleanup->queue_lock_list_head);
queue_cleanup->queue = queue;
queue_cleanup->queue_handle = req_exec_msg_queueopen->queue_handle;
list_add (
&queue_cleanup->list,
&req_exec_msg_queueopen->source.conn_info->ais_ci.u.libmsg_ci.queue_cleanup_list);
}
queue->refcount += 1;
#endif /* COMPILE_OUT */
/*
* Send error result to MSG library
*/
error_exit:
/*
* If this node was the source of the message, respond to this node
*/
if (message_source_is_local (&req_exec_msg_queueopen->source)) {
/*
* If its an async call respond with the invocation and handle
*/
if (req_exec_msg_queueopen->async_call)
{
res_lib_msg_queueopenasync.header.size =
sizeof (struct res_lib_msg_queueopenasync);
res_lib_msg_queueopenasync.header.id =
MESSAGE_RES_MSG_QUEUEOPENASYNC;
res_lib_msg_queueopenasync.header.error = error;
res_lib_msg_queueopenasync.invocation =
req_exec_msg_queueopen->invocation;
res_lib_msg_queueopenasync.queueHandle =
req_exec_msg_queueopen->queue_handle;
memcpy (&res_lib_msg_queueopenasync.source,
&req_exec_msg_queueopen->source,
sizeof (mar_message_source_t));
openais_conn_send_response (
req_exec_msg_queueopen->source.conn,
&res_lib_msg_queueopenasync,
sizeof (struct res_lib_msg_queueopenasync));
openais_conn_send_response (
openais_conn_partner_get (req_exec_msg_queueopen->source.conn),
&res_lib_msg_queueopenasync,
sizeof (struct res_lib_msg_queueopenasync));
} else {
/*
* otherwise respond with the normal queueopen response
*/
res_lib_msg_queueopen.header.size =
sizeof (struct res_lib_msg_queueopen);
res_lib_msg_queueopen.header.id =
MESSAGE_RES_MSG_QUEUEOPEN;
res_lib_msg_queueopen.header.error = error;
res_lib_msg_queueopen.queueHandle =
req_exec_msg_queueopen->queue_handle;
memcpy (&res_lib_msg_queueopen.source,
&req_exec_msg_queueopen->source,
sizeof (mar_message_source_t));
openais_conn_send_response (
req_exec_msg_queueopen->source.conn,
&res_lib_msg_queueopen,
sizeof (struct res_lib_msg_queueopen));
}
}
}
static void message_handler_req_exec_msg_queueclose (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_queueclose *req_exec_msg_queueclose =
(struct req_exec_msg_queueclose *)message;
struct res_lib_msg_queueclose res_lib_msg_queueclose;
struct message_queue *queue = 0;
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueClose %s\n",
getSaNameT (&req_exec_msg_queueclose->queue_name));
queue = queue_find (&req_exec_msg_queueclose->queue_name);
if (queue == 0) {
goto error_exit;
}
queue->refcount -= 1;
if (queue->refcount == 0) {
/* free queue */
}
error_exit:
if (message_source_is_local(&req_exec_msg_queueclose->source))
{
/* TODO */
/*
* msg_queue_cleanup_remove (
* req_exec_msg_queueclose->source.conn_info,
* req_exec_msg_queueclose->queue_handle);
*/
res_lib_msg_queueclose.header.size =
sizeof (struct res_lib_msg_queueclose);
res_lib_msg_queueclose.header.id =
MESSAGE_RES_MSG_QUEUECLOSE;
res_lib_msg_queueclose.header.error = error;
openais_conn_send_response (
req_exec_msg_queueclose->source.conn,
&res_lib_msg_queueclose,
sizeof (struct res_lib_msg_queueclose));
}
}
static void message_handler_req_exec_msg_queuestatusget (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_queuestatusget *req_exec_msg_queuestatusget =
(struct req_exec_msg_queuestatusget *)message;
struct res_lib_msg_queuestatusget res_lib_msg_queuestatusget;
#endif
}
static void message_handler_req_exec_msg_queueunlink (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_queueunlink *req_exec_msg_queueunlink =
(struct req_exec_msg_queueunlink *)message;
struct res_lib_msg_queueunlink res_lib_msg_queueunlink;
#endif
}
static void message_handler_req_exec_msg_queuegroupcreate (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_queuegroupcreate *req_exec_msg_queuegroupcreate =
(struct req_exec_msg_queuegroupcreate *)message;
struct res_lib_msg_queuegroupcreate res_lib_msg_queuegroupcreate;
struct queue_group *queue_group;
SaAisErrorT error = SA_AIS_OK;
queue_group = queue_group_find (&req_exec_msg_queuegroupcreate->queue_group_name);
if (queue_group == 0) {
queue_group = malloc (sizeof (struct queue_group));
if (queue_group == 0) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
memset (queue_group, 0, sizeof (struct queue_group));
memcpy (&queue_group->name,
&req_exec_msg_queuegroupcreate->queue_group_name,
sizeof (SaNameT));
list_init (&queue_group->list);
list_init (&queue_group->message_queue_head);
list_add (&queue_group->list, &queue_group_list_head);
} else {
error = SA_AIS_ERR_EXIST;
}
error_exit:
if (message_source_is_local(&req_exec_msg_queuegroupcreate->source)) {
res_lib_msg_queuegroupcreate.header.size =
sizeof (struct res_lib_msg_queuegroupcreate);
res_lib_msg_queuegroupcreate.header.id =
MESSAGE_RES_MSG_QUEUEGROUPCREATE;
res_lib_msg_queuegroupcreate.header.error = error;
openais_conn_send_response (
req_exec_msg_queuegroupcreate->source.conn,
&res_lib_msg_queuegroupcreate,
sizeof (struct res_lib_msg_queuegroupcreate));
}
}
static void message_handler_req_exec_msg_queuegroupinsert (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_queuegroupinsert *req_exec_msg_queuegroupinsert =
(struct req_exec_msg_queuegroupinsert *)message;
struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert;
struct message_queue *queue;
struct queue_group *queue_group;
struct queue_group_entry *queue_group_entry;
SaAisErrorT error = SA_AIS_OK;
queue_group = queue_group_find (&req_exec_msg_queuegroupinsert->queue_group_name);
if (queue_group == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
queue = queue_find (&req_exec_msg_queuegroupinsert->queue_name);
if (queue == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
queue_group_entry = malloc (sizeof (struct queue_group_entry));
if (queue_group_entry == 0) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
list_init (&queue_group_entry->list);
list_add (&queue_group_entry->list, &queue_group->message_queue_head);
list_add (&queue->list, &queue_list_head);
queue_group_entry->message_queue = queue;
error_exit:
if (message_source_is_local(&req_exec_msg_queuegroupinsert->source)) {
res_lib_msg_queuegroupinsert.header.size =
sizeof (struct res_lib_msg_queuegroupinsert);
res_lib_msg_queuegroupinsert.header.id =
MESSAGE_RES_MSG_QUEUEGROUPINSERT;
res_lib_msg_queuegroupinsert.header.error = error;
openais_conn_send_response (
req_exec_msg_queuegroupinsert->source.conn,
&res_lib_msg_queuegroupinsert,
sizeof (struct res_lib_msg_queuegroupinsert));
}
}
static void message_handler_req_exec_msg_queuegroupremove (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_queuegroupremove *req_exec_msg_queuegroupremove =
(struct req_exec_msg_queuegroupremove *)message;
struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove;
struct queue_group *queue_group;
struct message_queue *queue;
struct queue_group_entry *queue_group_entry;
SaAisErrorT error = SA_AIS_OK;
queue_group = queue_group_find (&req_exec_msg_queuegroupremove->queue_group_name);
if (queue_group == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
queue = queue_find (&req_exec_msg_queuegroupremove->queue_name);
if (queue == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
queue_group_entry = queue_group_entry_find (queue_group, queue);
if (queue_group_entry == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
list_del (&queue_group_entry->list);
error_exit:
if (message_source_is_local(&req_exec_msg_queuegroupremove->source)) {
res_lib_msg_queuegroupremove.header.size =
sizeof (struct res_lib_msg_queuegroupremove);
res_lib_msg_queuegroupremove.header.id =
MESSAGE_RES_MSG_QUEUEGROUPREMOVE;
res_lib_msg_queuegroupremove.header.error = error;
openais_conn_send_response (
req_exec_msg_queuegroupremove->source.conn,
&res_lib_msg_queuegroupremove,
sizeof (struct res_lib_msg_queuegroupremove));
}
}
static void message_handler_req_exec_msg_queuegroupdelete (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_queuegroupdelete *req_exec_msg_queuegroupdelete =
(struct req_exec_msg_queuegroupdelete *)message;
struct res_lib_msg_queuegroupdelete res_lib_msg_queuegroupdelete;
struct queue_group *queue_group;
SaAisErrorT error = SA_AIS_OK;
queue_group = queue_group_find (&req_exec_msg_queuegroupdelete->queue_group_name);
if (queue_group) {
list_del (&queue_group->list);
free (queue_group);
} else {
error = SA_AIS_ERR_NOT_EXIST;
}
if (message_source_is_local(&req_exec_msg_queuegroupdelete->source)) {
res_lib_msg_queuegroupdelete.header.size =
sizeof (struct res_lib_msg_queuegroupdelete);
res_lib_msg_queuegroupdelete.header.id =
MESSAGE_RES_MSG_QUEUEGROUPDELETE;
res_lib_msg_queuegroupdelete.header.error = error;
openais_conn_send_response (
req_exec_msg_queuegroupdelete->source.conn,
&res_lib_msg_queuegroupdelete,
sizeof (struct res_lib_msg_queuegroupdelete));
}
}
static void message_handler_req_exec_msg_queuegrouptrack (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_queuegrouptrack *req_exec_msg_queuegrouptrack =
(struct req_exec_msg_queuegrouptrack *)message;
struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
#endif
}
static void message_handler_req_exec_msg_queuegrouptrackstop (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_queuegrouptrackstop *req_exec_msg_queuegrouptrackstop =
(struct req_exec_msg_queuegrouptrackstop *)message;
struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
#endif
}
static void message_handler_req_exec_msg_messagesend (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_messagesend *req_exec_msg_messagesend =
(struct req_exec_msg_messagesend *)message;
struct res_lib_msg_messagesend res_lib_msg_messagesend;
struct res_lib_msg_messagesendasync res_lib_msg_messagesendasync;
struct message_queue *queue;
struct message_entry *entry;
SaAisErrorT error = SA_AIS_OK;
char *data = ((char *)(req_exec_msg_messagesend) +
sizeof (struct req_exec_msg_messagesend));
log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgMessageSend %s\n",
getSaNameT (&req_exec_msg_messagesend->destination));
queue = queue_find (&req_exec_msg_messagesend->destination);
if (queue == NULL) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
entry = malloc (sizeof (struct message_entry));
if (entry == NULL) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
memset (entry, 0, sizeof (struct message_entry));
memcpy (&entry->message, &req_exec_msg_messagesend->message,
sizeof (SaMsgMessageT));
entry->message.data = malloc (entry->message.size);
if (entry->message.data == NULL) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
memset (entry->message.data, 0, entry->message.size);
memcpy (entry->message.data, (void *)(data), entry->message.size);
entry->time = clust_time_now();
list_add_tail (&entry->list, &queue->message_list_head);
/* DEBUG */
print_message_list (queue);
error_exit:
if (message_source_is_local(&req_exec_msg_messagesend->source)) {
if (req_exec_msg_messagesend->async_call) {
res_lib_msg_messagesendasync.header.size =
sizeof (struct res_lib_msg_messagesendasync);
res_lib_msg_messagesendasync.header.id =
MESSAGE_RES_MSG_MESSAGESENDASYNC;
res_lib_msg_messagesendasync.header.error = error;
res_lib_msg_messagesendasync.invocation =
req_exec_msg_messagesend->invocation;
memcpy (&res_lib_msg_messagesendasync.source,
&req_exec_msg_messagesend->source,
sizeof (mar_message_source_t));
openais_conn_send_response (
req_exec_msg_messagesend->source.conn,
&res_lib_msg_messagesendasync,
sizeof (struct res_lib_msg_messagesendasync));
openais_conn_send_response (
openais_conn_partner_get (req_exec_msg_messagesend->source.conn),
&res_lib_msg_messagesendasync,
sizeof (struct res_lib_msg_messagesendasync));
} else {
res_lib_msg_messagesend.header.size =
sizeof (struct res_lib_msg_messagesend);
res_lib_msg_messagesend.header.id =
MESSAGE_RES_MSG_MESSAGESEND;
res_lib_msg_messagesend.header.error = error;
memcpy (&res_lib_msg_messagesend.source,
&req_exec_msg_messagesend->source,
sizeof (mar_message_source_t));
openais_conn_send_response (
req_exec_msg_messagesend->source.conn,
&res_lib_msg_messagesend,
sizeof (struct res_lib_msg_messagesend));
}
}
}
static void message_handler_req_exec_msg_messageget (
void *message,
unsigned int nodeid)
{
struct req_exec_msg_messageget *req_exec_msg_messageget =
(struct req_exec_msg_messageget *)message;
struct res_lib_msg_messageget res_lib_msg_messageget;
struct message_queue *queue;
struct message_entry *entry;
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgMessageGet %s\n",
getSaNameT (&req_exec_msg_messageget->queue_name));
queue = queue_find (&req_exec_msg_messageget->queue_name);
if (queue == NULL) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
if (list_empty (queue->message_list_head.next)) {
error = SA_AIS_ERR_TIMEOUT; /* FIX ME */
goto error_exit;
}
entry = list_entry (queue->message_list_head.next, struct message_entry, list);
if (entry == NULL) {
error = SA_AIS_ERR_LIBRARY; /* FIX ME */
goto error_exit;
}
list_del (queue->message_list_head.next);
error_exit:
if (message_source_is_local(&req_exec_msg_messageget->source)) {
res_lib_msg_messageget.header.size =
sizeof (struct res_lib_msg_messageget);
res_lib_msg_messageget.header.id =
MESSAGE_RES_MSG_MESSAGEGET;
res_lib_msg_messageget.header.error = error;
memcpy (&res_lib_msg_messageget.message, &entry->message,
sizeof (SaMsgMessageT));
memcpy (&res_lib_msg_messageget.source,
&req_exec_msg_messageget->source,
sizeof (mar_message_source_t));
openais_conn_send_response (
req_exec_msg_messageget->source.conn,
&res_lib_msg_messageget,
sizeof (struct res_lib_msg_messageget));
openais_conn_send_response (
req_exec_msg_messageget->source.conn,
res_lib_msg_messageget.message.data,
res_lib_msg_messageget.message.size);
}
}
static void message_handler_req_exec_msg_messagecancel (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_messagecancel *req_exec_msg_messagecancel =
(struct req_exec_msg_messagecancel *)message;
struct res_lib_msg_messagecancel res_lib_msg_messagecancel;
#endif
}
static void message_handler_req_exec_msg_messagesendreceive (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_messagesendreceive *req_exec_msg_messagesendreceive =
(struct req_exec_msg_messagesendreceive *)message;
struct res_lib_msg_messagesendreceive res_lib_msg_messagesendreceive;
#endif
}
static void message_handler_req_exec_msg_messagereply (
void *message,
unsigned int nodeid)
{
#if 0
struct req_exec_msg_messagereply *req_exec_msg_messagereply =
(struct req_exec_msg_messagereply *)message;
struct res_lib_msg_messagereply res_lib_msg_messagereply;
#endif
}
static void message_handler_req_lib_msg_queueopen (
void *conn,
void *msg)
{
struct req_lib_msg_queueopen *req_lib_msg_queueopen =
(struct req_lib_msg_queueopen *)msg;
struct req_exec_msg_queueopen req_exec_msg_queueopen;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueOpen %s\n",
getSaNameT (&req_lib_msg_queueopen->queueName));
req_exec_msg_queueopen.header.size =
sizeof (struct req_exec_msg_queueopen);
req_exec_msg_queueopen.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEOPEN);
message_source_set (&req_exec_msg_queueopen.source, conn);
memcpy (&req_exec_msg_queueopen.queue_name,
&req_lib_msg_queueopen->queueName, sizeof (SaNameT));
memcpy (&req_exec_msg_queueopen.creation_attributes,
&req_lib_msg_queueopen->creationAttributes,
sizeof (SaMsgQueueCreationAttributesT));
req_exec_msg_queueopen.async_call = 0;
req_exec_msg_queueopen.invocation = 0;
req_exec_msg_queueopen.queue_handle = req_lib_msg_queueopen->queueHandle;
req_exec_msg_queueopen.openFlags = req_lib_msg_queueopen->openFlags;
req_exec_msg_queueopen.timeout = req_lib_msg_queueopen->timeout;
iovec.iov_base = (char *)&req_exec_msg_queueopen;
iovec.iov_len = sizeof (req_exec_msg_queueopen);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queueopenasync (
void *conn,
void *msg)
{
struct req_lib_msg_queueopen *req_lib_msg_queueopen =
(struct req_lib_msg_queueopen *)msg;
struct req_exec_msg_queueopen req_exec_msg_queueopen;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueOpenAsync %s\n",
getSaNameT (&req_lib_msg_queueopen->queueName));
req_exec_msg_queueopen.header.size =
sizeof (struct req_exec_msg_queueopen);
req_exec_msg_queueopen.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEOPEN);
message_source_set (&req_exec_msg_queueopen.source, conn);
memcpy (&req_exec_msg_queueopen.queue_name,
&req_lib_msg_queueopen->queueName, sizeof (SaNameT));
memcpy (&req_exec_msg_queueopen.creation_attributes,
&req_lib_msg_queueopen->creationAttributes,
sizeof (SaMsgQueueCreationAttributesT));
req_exec_msg_queueopen.async_call = 1;
req_exec_msg_queueopen.invocation = req_lib_msg_queueopen->invocation;
req_exec_msg_queueopen.queue_handle = req_lib_msg_queueopen->queueHandle;
req_exec_msg_queueopen.openFlags = req_lib_msg_queueopen->openFlags;
req_exec_msg_queueopen.timeout = SA_TIME_END;
iovec.iov_base = (char *)&req_exec_msg_queueopen;
iovec.iov_len = sizeof (req_exec_msg_queueopen);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queueclose (
void *conn,
void *msg)
{
struct req_lib_msg_queueclose *req_lib_msg_queueclose =
(struct req_lib_msg_queueclose *)msg;
struct req_exec_msg_queueclose req_exec_msg_queueclose;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueClose %s\n",
getSaNameT (&req_lib_msg_queueclose->queueName));
req_exec_msg_queueclose.header.size =
sizeof (struct req_exec_msg_queueclose);
req_exec_msg_queueclose.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUECLOSE);
message_source_set (&req_exec_msg_queueclose.source, conn);
memcpy (&req_exec_msg_queueclose.queue_name,
&req_lib_msg_queueclose->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queueclose;
iovec.iov_len = sizeof (req_exec_msg_queueclose);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuestatusget (
void *conn,
void *msg)
{
struct req_lib_msg_queuestatusget *req_lib_msg_queuestatusget =
(struct req_lib_msg_queuestatusget *)msg;
struct req_exec_msg_queuestatusget req_exec_msg_queuestatusget;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueStatusGet %s\n",
getSaNameT (&req_lib_msg_queuestatusget->queueName));
req_exec_msg_queuestatusget.header.size =
sizeof (struct req_exec_msg_queuestatusget);
req_exec_msg_queuestatusget.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUESTATUSGET);
message_source_set (&req_exec_msg_queuestatusget.source, conn);
memcpy (&req_exec_msg_queuestatusget.queue_name,
&req_lib_msg_queuestatusget->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuestatusget;
iovec.iov_len = sizeof (req_exec_msg_queuestatusget);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queueunlink (
void *conn,
void *msg)
{
struct req_lib_msg_queueunlink *req_lib_msg_queueunlink =
(struct req_lib_msg_queueunlink *)msg;
struct req_exec_msg_queueunlink req_exec_msg_queueunlink;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueUnlink %s\n",
getSaNameT (&req_lib_msg_queueunlink->queueName));
req_exec_msg_queueunlink.header.size =
sizeof (struct req_exec_msg_queueunlink);
req_exec_msg_queueunlink.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEUNLINK);
message_source_set (&req_exec_msg_queueunlink.source, conn);
memcpy (&req_exec_msg_queueunlink.queue_name,
&req_lib_msg_queueunlink->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queueunlink;
iovec.iov_len = sizeof (req_exec_msg_queueunlink);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuegroupcreate (
void *conn,
void *msg)
{
struct req_lib_msg_queuegroupcreate *req_lib_msg_queuegroupcreate =
(struct req_lib_msg_queuegroupcreate *)msg;
struct req_exec_msg_queuegroupcreate req_exec_msg_queuegroupcreate;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueGroupCreate %s\n",
getSaNameT (&req_lib_msg_queuegroupcreate->queueGroupName));
req_exec_msg_queuegroupcreate.header.size =
sizeof (struct req_exec_msg_queuegroupcreate);
req_exec_msg_queuegroupcreate.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEGROUPCREATE);
message_source_set (&req_exec_msg_queuegroupcreate.source, conn);
memcpy (&req_exec_msg_queuegroupcreate.queue_group_name,
&req_lib_msg_queuegroupcreate->queueGroupName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuegroupcreate;
iovec.iov_len = sizeof (req_exec_msg_queuegroupcreate);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuegroupinsert (
void *conn,
void *msg)
{
struct req_lib_msg_queuegroupinsert *req_lib_msg_queuegroupinsert =
(struct req_lib_msg_queuegroupinsert *)msg;
struct req_exec_msg_queuegroupinsert req_exec_msg_queuegroupinsert;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueGroupInsert %s\n",
getSaNameT (&req_lib_msg_queuegroupinsert->queueGroupName));
req_exec_msg_queuegroupinsert.header.size =
sizeof (struct req_exec_msg_queuegroupinsert);
req_exec_msg_queuegroupinsert.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEGROUPINSERT);
message_source_set (&req_exec_msg_queuegroupinsert.source, conn);
memcpy (&req_exec_msg_queuegroupinsert.queue_name,
&req_lib_msg_queuegroupinsert->queueName, sizeof (SaNameT));
memcpy (&req_exec_msg_queuegroupinsert.queue_group_name,
&req_lib_msg_queuegroupinsert->queueGroupName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuegroupinsert;
iovec.iov_len = sizeof (req_exec_msg_queuegroupinsert);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuegroupremove (
void *conn,
void *msg)
{
struct req_lib_msg_queuegroupremove *req_lib_msg_queuegroupremove =
(struct req_lib_msg_queuegroupremove *)msg;
struct req_exec_msg_queuegroupremove req_exec_msg_queuegroupremove;
struct iovec iovec;
req_exec_msg_queuegroupremove.header.size =
sizeof (struct req_exec_msg_queuegroupremove);
req_exec_msg_queuegroupremove.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEGROUPREMOVE);
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueGroupRemove %s\n",
getSaNameT (&req_lib_msg_queuegroupremove->queueGroupName));
message_source_set (&req_exec_msg_queuegroupremove.source, conn);
memcpy (&req_exec_msg_queuegroupremove.queue_name,
&req_lib_msg_queuegroupremove->queueName, sizeof (SaNameT));
memcpy (&req_exec_msg_queuegroupremove.queue_group_name,
&req_lib_msg_queuegroupremove->queueGroupName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuegroupremove;
iovec.iov_len = sizeof (req_exec_msg_queuegroupremove);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuegroupdelete (
void *conn,
void *msg)
{
struct req_lib_msg_queuegroupdelete *req_lib_msg_queuegroupdelete =
(struct req_lib_msg_queuegroupdelete *)msg;
struct req_exec_msg_queuegroupdelete req_exec_msg_queuegroupdelete;
struct iovec iovec;
req_exec_msg_queuegroupdelete.header.size =
sizeof (struct req_exec_msg_queuegroupdelete);
req_exec_msg_queuegroupdelete.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEGROUPDELETE);
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueGroupDelete %s\n",
getSaNameT (&req_lib_msg_queuegroupdelete->queueGroupName));
message_source_set (&req_exec_msg_queuegroupdelete.source, conn);
memcpy (&req_exec_msg_queuegroupdelete.queue_group_name,
&req_lib_msg_queuegroupdelete->queueGroupName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuegroupdelete;
iovec.iov_len = sizeof (req_exec_msg_queuegroupdelete);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuegrouptrack (
void *conn,
void *msg)
{
struct req_lib_msg_queuegrouptrack *req_lib_msg_queuegrouptrack =
(struct req_lib_msg_queuegrouptrack *)msg;
struct req_exec_msg_queuegrouptrack req_exec_msg_queuegrouptrack;
struct iovec iovec;
req_exec_msg_queuegrouptrack.header.size =
sizeof (struct req_exec_msg_queuegrouptrack);
req_exec_msg_queuegrouptrack.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACK);
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueGroupTrack %s\n",
getSaNameT (&req_lib_msg_queuegrouptrack->queueGroupName));
message_source_set (&req_exec_msg_queuegrouptrack.source, conn);
memcpy (&req_exec_msg_queuegrouptrack.queue_group_name,
&req_lib_msg_queuegrouptrack->queueGroupName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuegrouptrack;
iovec.iov_len = sizeof (req_exec_msg_queuegrouptrack);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_queuegrouptrackstop (
void *conn,
void *msg)
{
struct req_lib_msg_queuegrouptrackstop *req_lib_msg_queuegrouptrackstop =
(struct req_lib_msg_queuegrouptrackstop *)msg;
struct req_exec_msg_queuegrouptrackstop req_exec_msg_queuegrouptrackstop;
struct iovec iovec;
req_exec_msg_queuegrouptrackstop.header.size =
sizeof (struct req_exec_msg_queuegrouptrackstop);
req_exec_msg_queuegrouptrackstop.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACKSTOP);
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueGroupTrackStop %s\n",
getSaNameT (&req_lib_msg_queuegrouptrackstop->queueGroupName));
message_source_set (&req_exec_msg_queuegrouptrackstop.source, conn);
memcpy (&req_exec_msg_queuegrouptrackstop.queue_group_name,
&req_lib_msg_queuegrouptrackstop->queueGroupName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_queuegrouptrackstop;
iovec.iov_len = sizeof (req_exec_msg_queuegrouptrackstop);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_messagesend (
void *conn,
void *msg)
{
struct req_lib_msg_messagesend *req_lib_msg_messagesend =
(struct req_lib_msg_messagesend *)msg;
struct req_exec_msg_messagesend req_exec_msg_messagesend;
struct iovec iovecs[2];
req_exec_msg_messagesend.header.size =
sizeof (struct req_exec_msg_messagesend);
req_exec_msg_messagesend.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGESEND);
req_exec_msg_messagesend.async_call = 0;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageSend %s\n",
getSaNameT (&req_lib_msg_messagesend->destination));
message_source_set (&req_exec_msg_messagesend.source, conn);
memcpy (&req_exec_msg_messagesend.destination,
&req_lib_msg_messagesend->destination, sizeof (SaNameT));
memcpy (&req_exec_msg_messagesend.message,
&req_lib_msg_messagesend->message, sizeof (SaMsgMessageT));
req_exec_msg_messagesend.async_call = 0;
req_exec_msg_messagesend.invocation = 0;
req_exec_msg_messagesend.ack_flags = req_lib_msg_messagesend->ackFlags;
req_exec_msg_messagesend.timeout = req_lib_msg_messagesend->timeout;
iovecs[0].iov_base = (char *)&req_exec_msg_messagesend;
iovecs[0].iov_len = sizeof (req_exec_msg_messagesend);
iovecs[1].iov_base = ((char *)req_lib_msg_messagesend) +
sizeof (struct req_lib_msg_messagesend);
iovecs[1].iov_len = req_lib_msg_messagesend->header.size -
sizeof (struct req_lib_msg_messagesend);
req_exec_msg_messagesend.header.size += iovecs[1].iov_len;
if (iovecs[1].iov_len > 0) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2,
TOTEMPG_AGREED) == 0);
} else {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1,
TOTEMPG_AGREED) == 0);
}
}
static void message_handler_req_lib_msg_messagesendasync (
void *conn,
void *msg)
{
struct req_lib_msg_messagesend *req_lib_msg_messagesend =
(struct req_lib_msg_messagesend *)msg;
struct req_exec_msg_messagesend req_exec_msg_messagesend;
struct iovec iovecs[2];
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageSendAsync %s\n",
getSaNameT (&req_lib_msg_messagesend->destination));
req_exec_msg_messagesend.header.size =
sizeof (struct req_exec_msg_messagesend);
req_exec_msg_messagesend.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGESEND);
message_source_set (&req_exec_msg_messagesend.source, conn);
memcpy (&req_exec_msg_messagesend.destination,
&req_lib_msg_messagesend->destination, sizeof (SaNameT));
memcpy (&req_exec_msg_messagesend.message,
&req_lib_msg_messagesend->message, sizeof (SaMsgMessageT));
req_exec_msg_messagesend.async_call = 1;
req_exec_msg_messagesend.invocation = req_lib_msg_messagesend->invocation;
req_exec_msg_messagesend.ack_flags = req_lib_msg_messagesend->ackFlags;
req_exec_msg_messagesend.timeout = SA_TIME_END;
iovecs[0].iov_base = (char *)&req_exec_msg_messagesend;
iovecs[0].iov_len = sizeof (req_exec_msg_messagesend);
iovecs[1].iov_base = ((char *)req_lib_msg_messagesend) +
sizeof (struct req_lib_msg_messagesend);
iovecs[1].iov_len = req_lib_msg_messagesend->header.size -
sizeof (struct req_lib_msg_messagesend);
req_exec_msg_messagesend.header.size += iovecs[1].iov_len;
if (iovecs[1].iov_len > 0) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2,
TOTEMPG_AGREED) == 0);
} else {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2,
TOTEMPG_AGREED) == 0);
}
}
static void message_handler_req_lib_msg_messageget (
void *conn,
void *msg)
{
struct req_lib_msg_messageget *req_lib_msg_messageget =
(struct req_lib_msg_messageget *)msg;
struct req_exec_msg_messageget req_exec_msg_messageget;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageGet %s\n",
getSaNameT (&req_lib_msg_messageget->queueName));
req_exec_msg_messageget.header.size =
sizeof (struct req_exec_msg_messageget);
req_exec_msg_messageget.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGEGET);
message_source_set (&req_exec_msg_messageget.source, conn);
memcpy (&req_exec_msg_messageget.queue_name,
&req_lib_msg_messageget->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_messageget;
iovec.iov_len = sizeof (req_exec_msg_messageget);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_messagecancel (
void *conn,
void *msg)
{
struct req_lib_msg_messagecancel *req_lib_msg_messagecancel =
(struct req_lib_msg_messagecancel *)msg;
struct req_exec_msg_messagecancel req_exec_msg_messagecancel;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageCancel %s\n",
getSaNameT (&req_lib_msg_messagecancel->queueName));
req_exec_msg_messagecancel.header.size =
sizeof (struct req_exec_msg_messagecancel);
req_exec_msg_messagecancel.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGECANCEL);
message_source_set (&req_exec_msg_messagecancel.source, conn);
memcpy (&req_exec_msg_messagecancel.queue_name,
&req_lib_msg_messagecancel->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_messagecancel;
iovec.iov_len = sizeof (req_exec_msg_messagecancel);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_messagesendreceive (
void *conn,
void *msg)
{
struct req_lib_msg_messagesendreceive *req_lib_msg_messagesendreceive =
(struct req_lib_msg_messagesendreceive *)msg;
struct req_exec_msg_messagesendreceive req_exec_msg_messagesendreceive;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageSendReceive %s\n",
getSaNameT (&req_lib_msg_messagesendreceive->queueName));
req_exec_msg_messagesendreceive.header.size =
sizeof (struct req_exec_msg_messagesendreceive);
req_exec_msg_messagesendreceive.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGESENDRECEIVE);
message_source_set (&req_exec_msg_messagesendreceive.source, conn);
memcpy (&req_exec_msg_messagesendreceive.queue_name,
&req_lib_msg_messagesendreceive->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_messagesendreceive;
iovec.iov_len = sizeof (req_exec_msg_messagesendreceive);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_messagereply (
void *conn,
void *msg)
{
struct req_lib_msg_messagereply *req_lib_msg_messagereply =
(struct req_lib_msg_messagereply *)msg;
struct req_exec_msg_messagereply req_exec_msg_messagereply;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageReply %s\n",
getSaNameT (&req_lib_msg_messagereply->queueName));
req_exec_msg_messagereply.header.size =
sizeof (struct req_exec_msg_messagereply);
req_exec_msg_messagereply.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY);
req_exec_msg_messagereply.async_call = 0;
message_source_set (&req_exec_msg_messagereply.source, conn);
memcpy (&req_exec_msg_messagereply.queue_name,
&req_lib_msg_messagereply->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_messagereply;
iovec.iov_len = sizeof (req_exec_msg_messagereply);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
static void message_handler_req_lib_msg_messagereplyasync (
void *conn,
void *msg)
{
struct req_lib_msg_messagereply *req_lib_msg_messagereply =
(struct req_lib_msg_messagereply *)msg;
struct req_exec_msg_messagereply req_exec_msg_messagereply;
struct iovec iovec;
log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageReplyAsync %s\n",
getSaNameT (&req_lib_msg_messagereply->queueName));
req_exec_msg_messagereply.header.size =
sizeof (struct req_exec_msg_messagereply);
req_exec_msg_messagereply.header.id =
SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY);
req_exec_msg_messagereply.async_call = 1;
message_source_set (&req_exec_msg_messagereply.source, conn);
memcpy (&req_exec_msg_messagereply.queue_name,
&req_lib_msg_messagereply->queueName, sizeof (SaNameT));
iovec.iov_base = (char *)&req_exec_msg_messagereply;
iovec.iov_len = sizeof (req_exec_msg_messagereply);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Mon, Dec 23, 10:55 PM (1 d, 19 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1129271
Default Alt Text
msg.c (56 KB)
Attached To
Mode
rC Corosync
Attached
Detach File
Event Timeline
Log In to Comment