Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3155475
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
38 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 798d075..d25132d 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -1,405 +1,433 @@
/*
* Copyright (C) 2006-2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>,
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCS_H_DEFINED
#define QB_IPCS_H_DEFINED
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
#include <stdlib.h>
#include <sys/uio.h>
#include <qb/qbipc_common.h>
#include <qb/qbhdb.h>
#include <qb/qbloop.h>
/**
* @file qbipcs.h
*
* Server IPC API.
*
* @example ipcserver.c
*/
enum qb_ipcs_rate_limit {
QB_IPCS_RATE_FAST,
QB_IPCS_RATE_NORMAL,
QB_IPCS_RATE_SLOW,
QB_IPCS_RATE_OFF,
QB_IPCS_RATE_OFF_2,
};
struct qb_ipcs_connection;
typedef struct qb_ipcs_connection qb_ipcs_connection_t;
struct qb_ipcs_service;
typedef struct qb_ipcs_service qb_ipcs_service_t;
struct qb_ipcs_stats {
uint32_t active_connections;
uint32_t closed_connections;
};
struct qb_ipcs_connection_stats {
int32_t client_pid;
uint64_t requests;
uint64_t responses;
uint64_t events;
uint64_t send_retries;
uint64_t recv_retries;
int32_t flow_control_state;
uint64_t flow_control_count;
};
struct qb_ipcs_connection_stats_2 {
int32_t client_pid;
uint64_t requests;
uint64_t responses;
uint64_t events;
uint64_t send_retries;
uint64_t recv_retries;
int32_t flow_control_state;
uint64_t flow_control_count;
uint32_t event_q_length;
};
typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents,
void *data);
typedef int32_t (*qb_ipcs_dispatch_add_fn)(enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_mod_fn)(enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_del_fn)(int32_t fd);
typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p,
void *data,
qb_loop_job_dispatch_fn dispatch_fn);
struct qb_ipcs_poll_handlers {
qb_ipcs_job_add_fn job_add;
qb_ipcs_dispatch_add_fn dispatch_add;
qb_ipcs_dispatch_mod_fn dispatch_mod;
qb_ipcs_dispatch_del_fn dispatch_del;
};
/**
* This callback is to check wether you want to accept a new connection.
*
* The type of checks you should do are authentication, service availabilty
* or process resource constraints.
* @return 0 to accept or -errno to indicate a failure (sent back to the client)
*
* @note you can call qb_ipcs_connection_auth_set() within this function.
*/
typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c,
uid_t uid, gid_t gid);
/**
* This is called after a new connection has been created.
*/
typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c);
/**
* This is called after a connection has been disconnected.
*
* @note if you return anything but 0 this function will be
* repeativily called (until 0 is returned).
*/
typedef int32_t (*qb_ipcs_connection_closed_fn) (qb_ipcs_connection_t *c);
/**
* This is called just before a connection is freed.
*/
typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c);
/**
* This is the message processing calback.
* It is called with the message data.
*/
typedef int32_t (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c,
void *data, size_t size);
struct qb_ipcs_service_handlers {
qb_ipcs_connection_accept_fn connection_accept;
qb_ipcs_connection_created_fn connection_created;
qb_ipcs_msg_process_fn msg_process;
qb_ipcs_connection_closed_fn connection_closed;
qb_ipcs_connection_destroyed_fn connection_destroyed;
};
/**
* Create a new IPC server.
*
* @param name for clients to connect to.
* @param service_id an integer to associate with the service
* @param type transport type.
* @param handlers callbacks.
* @return the new service instance.
*/
qb_ipcs_service_t* qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers);
/**
* Increase the reference counter on the service object.
*
* @param s service instance
*/
void qb_ipcs_ref(qb_ipcs_service_t *s);
/**
* Decrease the reference counter on the service object.
*
* @param s service instance
*/
void qb_ipcs_unref(qb_ipcs_service_t *s);
/**
* Set your poll callbacks.
*
* @param s service instance
* @param handlers the handlers that you want ipcs to use.
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s,
struct qb_ipcs_poll_handlers *handlers);
+/**
+ * Associate a "user" pointer with this service.
+ *
+ * @param s service instance
+ * @param context the pointer to associate with this service.
+ * @see qb_ipcs_service_context_get()
+ */
+void qb_ipcs_service_context_set(qb_ipcs_service_t* s,
+ void *context);
+
+/**
+ * Get the context (set previously)
+ *
+ * @param s service instance
+ * @return the context
+ * @see qb_ipcs_service_context_set()
+ */
+void *qb_ipcs_service_context_get(qb_ipcs_service_t* s);
+
/**
* run the new IPC server.
* @param s service instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_run(qb_ipcs_service_t* s);
/**
* Destroy the IPC server.
*
* @param s service instance to destroy
*/
void qb_ipcs_destroy(qb_ipcs_service_t* s);
/**
* Limit the incomming request rate.
* @param s service instance
* @param rl the new rate
*/
void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s,
enum qb_ipcs_rate_limit rl);
/**
* Send a response to a incomming request.
*
* @param c connection instance
* @param data the message to send
* @param size the size of the message
* @return size sent or -errno for errors
*
* @note the data must include a qb_ipc_response_header at
* the top of the message. The client will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data,
size_t size);
/**
* Send a response to a incomming request.
*
* @param c connection instance
* @param iov the iovec struct that points to the message to send
* @param iov_len the number of iovecs.
* @return size sent or -errno for errors
*
* @note the iov[0] must be a qb_ipc_response_header. The client will
* read the size field to determine how much to recv.
*/
ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c,
const struct iovec * iov, size_t iov_len);
/**
* Send an asyncronous event message to the client.
*
* @param c connection instance
* @param data the message to send
* @param size the size of the message
* @return size sent or -errno for errors
*
* @note the data must include a qb_ipc_response_header at
* the top of the message. The client will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data,
size_t size);
/**
* Send an asyncronous event message to the client.
*
* @param c connection instance
* @param iov the iovec struct that points to the message to send
* @param iov_len the number of iovecs.
* @return size sent or -errno for errors
*
* @note the iov[0] must be a qb_ipc_response_header. The client will
* read the size field to determine how much to recv.
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov,
size_t iov_len);
/**
* Increment the connection's reference counter.
*
* @param c connection instance
*/
void qb_ipcs_connection_ref(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
*
* @param c connection instance
*/
void qb_ipcs_connection_unref(qb_ipcs_connection_t *c);
/**
* Disconnect from this client.
*
* @param c connection instance
*/
void qb_ipcs_disconnect(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
*
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
/**
* Associate a "user" pointer with this connection.
*
* @param context the point to associate with this connection.
* @param c connection instance
* @see qb_ipcs_context_get()
*/
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
/**
* Get the context (set previously)
*
* @param c connection instance
* @return the context
* @see qb_ipcs_context_set()
*/
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
+/**
+ * Get the context previously set on the service backing this connection
+ *
+ * @param c connection instance
+ * @return the context
+ * @see qb_ipcs_service_context_set
+ */
+void *qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c);
+
/**
* Get the connection statistics.
*
* @deprecated from v0.13.0 onwards, use qb_ipcs_connection_stats_get_2
* @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
struct qb_ipcs_connection_stats* stats,
int32_t clear_after_read);
/**
* Get (and allocate) the connection statistics.
*
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @retval NULL if no memory or invalid connection
* @retval allocated statistics structure (user must free it).
*/
struct qb_ipcs_connection_stats_2*
qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
int32_t clear_after_read);
/**
* Get the service statistics.
*
* @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param pt service instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_stats_get(qb_ipcs_service_t* pt,
struct qb_ipcs_stats* stats,
int32_t clear_after_read);
/**
* Get the first connection.
*
* @note call qb_ipcs_connection_ref_dec() after using the connection.
*
* @param pt service instance
* @return first connection
*/
qb_ipcs_connection_t * qb_ipcs_connection_first_get(qb_ipcs_service_t* pt);
/**
* Get the next connection.
*
* @note call qb_ipcs_connection_ref_dec() after using the connection.
*
* @param pt service instance
* @param current current connection
* @return next connection
*/
qb_ipcs_connection_t * qb_ipcs_connection_next_get(qb_ipcs_service_t* pt,
qb_ipcs_connection_t *current);
/**
* Set the permissions on and shared memory files so that both processes can
* read and write to them.
*
* @param conn connection instance
* @param uid the user id to set.
* @param gid the group id to set.
* @param mode the mode to set.
*
* @see chmod() chown()
* @note this must be called within the qb_ipcs_connection_accept_fn()
* callback.
*/
void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *conn, uid_t uid,
gid_t gid, mode_t mode);
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCS_H_DEFINED */
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 50714c5..45489f5 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,205 +1,207 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include "os_base.h"
#include <dirent.h>
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
#define QB_IPC_MAX_WAIT_MS 2000
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr;
uint32_t max_msg_size;
} __attribute__ ((aligned(8)));
struct qb_ipc_event_connection_request {
struct qb_ipc_request_header hdr;
intptr_t connection;
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr;
int32_t connection_type;
uint32_t max_msg_size;
intptr_t connection;
char request[PATH_MAX];
char response[PATH_MAX];
char event[PATH_MAX];
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
enum qb_ipc_type type;
union {
struct {
int32_t sock;
char *sock_name;
void* shared_data;
char shared_file_name[NAME_MAX];
} us;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
int32_t (*fc_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
int32_t needs_sock_for_poll;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
struct qb_ipc_request_header *receive_buf;
uint32_t fc_enable_max;
int32_t is_connected;
void * context;
};
int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
int32_t ms_timeout, int32_t events);
void qb_ipcc_us_sock_close(int32_t sock);
int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*peek)(struct qb_ipc_one_way *one_way, void **data_out, int32_t timeout);
void (*reclaim)(struct qb_ipc_one_way *one_way);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
void (*fc_set)(struct qb_ipc_one_way *one_way, int32_t fc_enable);
ssize_t (*q_len_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
int32_t service_id;
int32_t ref_count;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
struct qb_list_head list;
struct qb_ipcs_stats stats;
+
+ void *context;
};
enum qb_ipcs_connection_state {
QB_IPCS_CONNECTION_INACTIVE,
QB_IPCS_CONNECTION_ACTIVE,
QB_IPCS_CONNECTION_ESTABLISHED,
QB_IPCS_CONNECTION_SHUTTING_DOWN,
};
#define CONNECTION_DESCRIPTION (16)
struct qb_ipcs_connection_auth {
uid_t uid;
gid_t gid;
mode_t mode;
};
struct qb_ipcs_connection {
enum qb_ipcs_connection_state state;
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
struct qb_ipcs_connection_auth auth;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
struct qb_ipc_request_header *receive_buf;
void *context;
int32_t fc_enabled;
int32_t poll_events;
int32_t outstanding_notifiers;
char description[CONNECTION_DESCRIPTION];
struct qb_ipcs_connection_stats_2 stats;
};
void qb_ipcs_us_init(struct qb_ipcs_service *s);
void qb_ipcs_shm_init(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
int32_t qb_ipc_us_sock_error_is_disconnected(int err);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 4365f03..65868e1 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -1,886 +1,908 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "util_int.h"
#include "ipc_int.h"
#include <qb/qbdefs.h>
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
int32_t fc_enable);
static int32_t
new_event_notification(struct qb_ipcs_connection * c);
static QB_LIST_DECLARE(qb_ipc_services);
qb_ipcs_service_t *
qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers)
{
struct qb_ipcs_service *s;
s = calloc(1, sizeof(struct qb_ipcs_service));
if (s == NULL) {
return NULL;
}
if (type == QB_IPC_NATIVE) {
#ifdef DISABLE_IPC_SHM
s->type = QB_IPC_SOCKET;
#else
s->type = QB_IPC_SHM;
#endif /* DISABLE_IPC_SHM */
} else {
s->type = type;
}
s->pid = getpid();
s->needs_sock_for_poll = QB_FALSE;
s->poll_priority = QB_LOOP_MED;
s->ref_count = 1;
s->service_id = service_id;
(void)strlcpy(s->name, name, NAME_MAX);
s->serv_fns.connection_accept = handlers->connection_accept;
s->serv_fns.connection_created = handlers->connection_created;
s->serv_fns.msg_process = handlers->msg_process;
s->serv_fns.connection_closed = handlers->connection_closed;
s->serv_fns.connection_destroyed = handlers->connection_destroyed;
qb_list_init(&s->connections);
qb_list_init(&s->list);
qb_list_add(&s->list, &qb_ipc_services);
return s;
}
void
qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
struct qb_ipcs_poll_handlers *handlers)
{
s->poll_fns.job_add = handlers->job_add;
s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_mod = handlers->dispatch_mod;
s->poll_fns.dispatch_del = handlers->dispatch_del;
}
+void
+qb_ipcs_service_context_set(qb_ipcs_service_t* s,
+ void *context)
+{
+ s->context = context;
+}
+
+void *
+qb_ipcs_service_context_get(qb_ipcs_service_t* s)
+{
+ return s->context;
+}
+
int32_t
qb_ipcs_run(struct qb_ipcs_service *s)
{
int32_t res = 0;
if (s->poll_fns.dispatch_add == NULL ||
s->poll_fns.dispatch_mod == NULL ||
s->poll_fns.dispatch_del == NULL) {
return -EINVAL;
}
switch (s->type) {
case QB_IPC_SOCKET:
qb_ipcs_us_init((struct qb_ipcs_service *)s);
break;
case QB_IPC_SHM:
#ifdef DISABLE_IPC_SHM
res = -ENOTSUP;
#else
qb_ipcs_shm_init((struct qb_ipcs_service *)s);
#endif /* DISABLE_IPC_SHM */
break;
case QB_IPC_POSIX_MQ:
case QB_IPC_SYSV_MQ:
res = -ENOTSUP;
break;
default:
res = -EINVAL;
break;
}
if (res < 0) {
qb_ipcs_unref(s);
return res;
}
res = qb_ipcs_us_publish(s);
if (res < 0) {
(void)qb_ipcs_us_withdraw(s);
qb_ipcs_unref(s);
return res;
}
return res;
}
static int32_t
_modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
{
qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod;
if (c->service->type == QB_IPC_SOCKET) {
return disp_mod(c->service->poll_priority,
c->event.u.us.sock,
c->poll_events, c,
qb_ipcs_dispatch_connection_request);
} else {
return disp_mod(c->service->poll_priority,
c->setup.u.us.sock,
c->poll_events, c,
qb_ipcs_dispatch_connection_request);
}
return -EINVAL;
}
void
qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
enum qb_ipcs_rate_limit rl)
{
struct qb_ipcs_connection *c;
enum qb_loop_priority old_p = s->poll_priority;
struct qb_list_head *pos;
struct qb_list_head *n;
switch (rl) {
case QB_IPCS_RATE_FAST:
s->poll_priority = QB_LOOP_HIGH;
break;
case QB_IPCS_RATE_SLOW:
case QB_IPCS_RATE_OFF:
case QB_IPCS_RATE_OFF_2:
s->poll_priority = QB_LOOP_LOW;
break;
default:
case QB_IPCS_RATE_NORMAL:
s->poll_priority = QB_LOOP_MED;
break;
}
qb_list_for_each_safe(pos, n, &s->connections) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
qb_ipcs_connection_ref(c);
if (rl == QB_IPCS_RATE_OFF) {
qb_ipcs_flowcontrol_set(c, 1);
} else if (rl == QB_IPCS_RATE_OFF_2) {
qb_ipcs_flowcontrol_set(c, 2);
} else {
qb_ipcs_flowcontrol_set(c, QB_FALSE);
}
if (old_p != s->poll_priority) {
(void)_modify_dispatch_descriptor_(c);
}
qb_ipcs_connection_unref(c);
}
}
void
qb_ipcs_ref(struct qb_ipcs_service *s)
{
qb_atomic_int_inc(&s->ref_count);
}
void
qb_ipcs_unref(struct qb_ipcs_service *s)
{
int32_t free_it;
assert(s->ref_count > 0);
free_it = qb_atomic_int_dec_and_test(&s->ref_count);
if (free_it) {
qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
free(s);
}
}
void
qb_ipcs_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *pos;
struct qb_list_head *n;
if (s == NULL) {
return;
}
qb_list_for_each_safe(pos, n, &s->connections) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
(void)qb_ipcs_us_withdraw(s);
qb_ipcs_unref(s);
}
/*
* connection API
*/
static struct qb_ipc_one_way *
_event_sock_one_way_get(struct qb_ipcs_connection * c)
{
if (c->service->needs_sock_for_poll) {
return &c->setup;
}
if (c->event.type == QB_IPC_SOCKET) {
return &c->event;
}
return NULL;
}
static struct qb_ipc_one_way *
_response_sock_one_way_get(struct qb_ipcs_connection * c)
{
if (c->service->needs_sock_for_poll) {
return &c->setup;
}
if (c->response.type == QB_IPC_SOCKET) {
return &c->response;
}
return NULL;
}
ssize_t
qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.send(&c->response, data, size);
if (res == size) {
c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
if (ow) {
ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (res2 < 0) {
res = res2;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
ssize_t
qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov,
size_t iov_len)
{
ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->response, iov, iov_len);
if (res > 0) {
c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
if (ow) {
ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (res2 < 0) {
res = res2;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
static int32_t
resend_event_notifications(struct qb_ipcs_connection *c)
{
ssize_t res = 0;
if (c->outstanding_notifiers > 0) {
res = qb_ipc_us_send(&c->setup, c->receive_buf,
c->outstanding_notifiers);
}
if (res > 0) {
c->outstanding_notifiers -= res;
}
assert(c->outstanding_notifiers >= 0);
if (c->outstanding_notifiers == 0) {
c->poll_events = POLLIN | POLLPRI | POLLNVAL;
(void)_modify_dispatch_descriptor_(c);
}
return res;
}
static int32_t
new_event_notification(struct qb_ipcs_connection * c)
{
ssize_t res = 0;
if (!c->service->needs_sock_for_poll) {
return res;
}
assert(c->outstanding_notifiers >= 0);
if (c->outstanding_notifiers > 0) {
c->outstanding_notifiers++;
} else {
res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1);
if (res == -EAGAIN) {
/*
* notify the client later, when we can.
*/
c->outstanding_notifiers++;
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL;
(void)_modify_dispatch_descriptor_(c);
}
}
return res;
}
ssize_t
qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size)
{
ssize_t res;
ssize_t resn;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
if (size > c->event.max_msg_size) {
return -EMSGSIZE;
}
res = c->service->funcs.send(&c->event, data, size);
if (res == size) {
c->stats.events++;
resn = new_event_notification(c);
if (resn < 0 && resn != -EAGAIN) {
errno = -resn;
qb_util_perror(LOG_WARNING,
"new_event_notification (%s)",
c->description);
res = resn;
}
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
if (ow) {
resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (resn < 0) {
res = resn;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
ssize_t
qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
const struct iovec * iov, size_t iov_len)
{
ssize_t res;
ssize_t resn;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->event, iov, iov_len);
if (res > 0) {
c->stats.events++;
resn = new_event_notification(c);
if (resn < 0 && resn != -EAGAIN) {
errno = -resn;
qb_util_perror(LOG_WARNING,
"new_event_notification (%s)",
c->description);
res = resn;
}
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
if (ow) {
resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (resn < 0) {
res = resn;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
qb_ipcs_connection_t *
qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
{
struct qb_ipcs_connection *c;
if (qb_list_empty(&s->connections)) {
return NULL;
}
c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection,
list);
qb_ipcs_connection_ref(c);
return c;
}
qb_ipcs_connection_t *
qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
struct qb_ipcs_connection * current)
{
struct qb_ipcs_connection *c;
if (current == NULL ||
qb_list_is_last(¤t->list, &s->connections)) {
return NULL;
}
c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection,
list);
qb_ipcs_connection_ref(c);
return c;
}
int32_t
qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
{
if (c == NULL) {
return -EINVAL;
}
return c->service->service_id;
}
struct qb_ipcs_connection *
qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c =
calloc(1, sizeof(struct qb_ipcs_connection));
if (c == NULL) {
return NULL;
}
c->refcount = 1;
c->pid = 0;
c->euid = -1;
c->egid = -1;
c->receive_buf = NULL;
c->context = NULL;
c->fc_enabled = QB_FALSE;
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->poll_events = POLLIN | POLLPRI | POLLNVAL;
c->setup.type = s->type;
c->request.type = s->type;
c->response.type = s->type;
c->event.type = s->type;
(void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION);
/* the connection references the containing service so, make a reference.
*/
qb_ipcs_ref(s);
c->service = s;
qb_list_init(&c->list);
return c;
}
void
qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
{
if (c) {
qb_atomic_int_inc(&c->refcount);
}
}
void
qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
{
int32_t free_it;
if (c == NULL) {
return;
}
if (c->refcount < 1) {
qb_util_log(LOG_ERR, "ref:%d state:%d (%s)",
c->refcount, c->state, c->description);
assert(0);
}
free_it = qb_atomic_int_dec_and_test(&c->refcount);
if (free_it) {
qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
}
c->service->funcs.disconnect(c);
qb_ipcs_unref(c->service);
free(c->receive_buf);
free(c);
}
}
void
qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{
int32_t res = 0;
qb_loop_job_dispatch_fn rerun_job;
if (c == NULL) {
return;
}
qb_util_log(LOG_DEBUG, "%s(%s) state:%d",
__func__, c->description, c->state);
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->service->funcs.disconnect(c);
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->service->stats.closed_connections++;
/* return early as it's an incomplete connection.
*/
return;
}
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
c->service->funcs.disconnect(c);
c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
c->service->stats.active_connections--;
c->service->stats.closed_connections++;
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
res = 0;
if (c->service->serv_fns.connection_closed) {
res = c->service->serv_fns.connection_closed(c);
}
if (res == 0) {
qb_ipcs_connection_unref(c);
} else {
/* OK, so they want the connection_closed
* function re-run */
rerun_job =
(qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
res = c->service->poll_fns.job_add(QB_LOOP_LOW,
c, rerun_job);
if (res != 0) {
/* last ditch attempt to cleanup */
qb_ipcs_connection_unref(c);
}
}
}
}
static void
qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
{
if (c == NULL) {
return;
}
if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable;
c->stats.flow_control_state = fc_enable;
c->stats.flow_control_count++;
}
}
static int32_t
_process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
{
int32_t res = 0;
ssize_t size;
struct qb_ipc_request_header *hdr;
qb_ipcs_connection_ref(c);
if (c->service->funcs.peek && c->service->funcs.reclaim) {
size = c->service->funcs.peek(&c->request, (void **)&hdr,
ms_timeout);
} else {
hdr = c->receive_buf;
size = c->service->funcs.recv(&c->request,
hdr,
c->request.max_msg_size,
ms_timeout);
}
if (size < 0) {
if (size != -EAGAIN && size != -ETIMEDOUT) {
qb_util_perror(LOG_DEBUG,
"recv from client connection failed (%s)",
c->description);
} else {
c->stats.recv_retries++;
}
res = size;
goto cleanup;
} else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)",
c->description);
qb_ipcs_disconnect(c);
c = NULL;
res = -ESHUTDOWN;
} else {
c->stats.requests++;
res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
/* 0 == good, negative == backoff */
if (res < 0) {
res = -ENOBUFS;
} else {
res = size;
}
}
if (c && c->service->funcs.peek && c->service->funcs.reclaim) {
c->service->funcs.reclaim(&c->request);
}
cleanup:
qb_ipcs_connection_unref(c);
return res;
}
#define IPC_REQUEST_TIMEOUT 10
#define MAX_RECV_MSGS 50
int32_t
qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data)
{
int32_t res = _process_request_((struct qb_ipcs_connection *)data,
IPC_REQUEST_TIMEOUT);
if (res > 0) {
return 0;
}
return res;
}
static ssize_t
_request_q_len_get(struct qb_ipcs_connection *c)
{
ssize_t q_len;
if (c->service->funcs.q_len_get) {
q_len = c->service->funcs.q_len_get(&c->request);
if (q_len <= 0) {
return q_len;
}
if (c->service->poll_priority == QB_LOOP_MED) {
q_len = QB_MIN(q_len, 5);
} else if (c->service->poll_priority == QB_LOOP_LOW) {
q_len = 1;
} else {
q_len = QB_MIN(q_len, MAX_RECV_MSGS);
}
} else {
q_len = 1;
}
return q_len;
}
int32_t
qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
char bytes[MAX_RECV_MSGS];
int32_t res;
int32_t res2;
int32_t recvd = 0;
ssize_t avail;
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
return -EINVAL;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
if (revents & POLLOUT) {
res = resend_event_notifications(c);
if (res < 0 && res != -EAGAIN) {
errno = -res;
qb_util_perror(LOG_WARNING,
"resend_event_notifications (%s)",
c->description);
}
if ((revents & POLLIN) == 0) {
return 0;
}
}
if (c->fc_enabled) {
return 0;
}
avail = _request_q_len_get(c);
if (c->service->needs_sock_for_poll && avail == 0) {
res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0);
if (qb_ipc_us_sock_error_is_disconnected(res2)) {
errno = -res2;
qb_util_perror(LOG_WARNING, "conn (%s) disconnected",
c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
} else {
qb_util_log(LOG_WARNING,
"conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)",
c->description, fd, res2);
return 0;
}
}
do {
res = _process_request_(c, IPC_REQUEST_TIMEOUT);
if (res > 0 || res == -ENOBUFS || res == -EINVAL) {
recvd++;
}
if (res > 0) {
avail--;
}
} while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) {
res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
if (res2 < 0) {
errno = -res2;
qb_util_perror(LOG_ERR,
"error receiving from setup sock (%s)",
c->description);
}
}
res = QB_MIN(0, res);
if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
res = 0;
}
if (res != 0) {
if (res != -ENOTCONN) {
/*
* Abnormal state (ENOTCONN is normal shutdown).
*/
errno = -res;
qb_util_perror(LOG_ERR, "request returned error (%s)",
c->description);
}
qb_ipcs_connection_unref(c);
}
return res;
}
void
qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
{
if (c == NULL) {
return;
}
c->context = context;
}
void *
qb_ipcs_context_get(struct qb_ipcs_connection *c)
{
if (c == NULL) {
return NULL;
}
return c->context;
}
+void *
+qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c)
+{
+ if (c == NULL || c->service == NULL) {
+ return NULL;
+ }
+ return c->service->context;
+}
+
int32_t
qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
struct qb_ipcs_connection_stats * stats,
int32_t clear_after_read)
{
if (c == NULL) {
return -EINVAL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
c->stats.client_pid = c->pid;
}
return 0;
}
struct qb_ipcs_connection_stats_2*
qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
int32_t clear_after_read)
{
struct qb_ipcs_connection_stats_2 * stats;
if (c == NULL) {
errno = EINVAL;
return NULL;
}
stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2));
if (stats == NULL) {
return NULL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2));
if (c->service->funcs.q_len_get) {
stats->event_q_length = c->service->funcs.q_len_get(&c->event);
} else {
stats->event_q_length = 0;
}
if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
c->stats.client_pid = c->pid;
}
return stats;
}
int32_t
qb_ipcs_stats_get(struct qb_ipcs_service * s,
struct qb_ipcs_stats * stats, int32_t clear_after_read)
{
if (s == NULL) {
return -EINVAL;
}
memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
if (clear_after_read) {
memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
}
return 0;
}
void
qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid,
gid_t gid, mode_t mode)
{
if (c) {
c->auth.uid = uid;
c->auth.gid = gid;
c->auth.mode = mode;
}
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Feb 26, 10:57 PM (12 h, 19 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465850
Default Alt Text
(38 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment