Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 798d075..d25132d 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -1,405 +1,433 @@
/*
* Copyright (C) 2006-2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>,
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCS_H_DEFINED
#define QB_IPCS_H_DEFINED
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
#include <stdlib.h>
#include <sys/uio.h>
#include <qb/qbipc_common.h>
#include <qb/qbhdb.h>
#include <qb/qbloop.h>
/**
* @file qbipcs.h
*
* Server IPC API.
*
* @example ipcserver.c
*/
enum qb_ipcs_rate_limit {
QB_IPCS_RATE_FAST,
QB_IPCS_RATE_NORMAL,
QB_IPCS_RATE_SLOW,
QB_IPCS_RATE_OFF,
QB_IPCS_RATE_OFF_2,
};
struct qb_ipcs_connection;
typedef struct qb_ipcs_connection qb_ipcs_connection_t;
struct qb_ipcs_service;
typedef struct qb_ipcs_service qb_ipcs_service_t;
struct qb_ipcs_stats {
uint32_t active_connections;
uint32_t closed_connections;
};
struct qb_ipcs_connection_stats {
int32_t client_pid;
uint64_t requests;
uint64_t responses;
uint64_t events;
uint64_t send_retries;
uint64_t recv_retries;
int32_t flow_control_state;
uint64_t flow_control_count;
};
struct qb_ipcs_connection_stats_2 {
int32_t client_pid;
uint64_t requests;
uint64_t responses;
uint64_t events;
uint64_t send_retries;
uint64_t recv_retries;
int32_t flow_control_state;
uint64_t flow_control_count;
uint32_t event_q_length;
};
typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents,
void *data);
typedef int32_t (*qb_ipcs_dispatch_add_fn)(enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_mod_fn)(enum qb_loop_priority p,
int32_t fd,
int32_t events,
void *data,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_del_fn)(int32_t fd);
typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p,
void *data,
qb_loop_job_dispatch_fn dispatch_fn);
struct qb_ipcs_poll_handlers {
qb_ipcs_job_add_fn job_add;
qb_ipcs_dispatch_add_fn dispatch_add;
qb_ipcs_dispatch_mod_fn dispatch_mod;
qb_ipcs_dispatch_del_fn dispatch_del;
};
/**
* This callback is to check wether you want to accept a new connection.
*
* The type of checks you should do are authentication, service availabilty
* or process resource constraints.
* @return 0 to accept or -errno to indicate a failure (sent back to the client)
*
* @note you can call qb_ipcs_connection_auth_set() within this function.
*/
typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c,
uid_t uid, gid_t gid);
/**
* This is called after a new connection has been created.
*/
typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c);
/**
* This is called after a connection has been disconnected.
*
* @note if you return anything but 0 this function will be
* repeativily called (until 0 is returned).
*/
typedef int32_t (*qb_ipcs_connection_closed_fn) (qb_ipcs_connection_t *c);
/**
* This is called just before a connection is freed.
*/
typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c);
/**
* This is the message processing calback.
* It is called with the message data.
*/
typedef int32_t (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c,
void *data, size_t size);
struct qb_ipcs_service_handlers {
qb_ipcs_connection_accept_fn connection_accept;
qb_ipcs_connection_created_fn connection_created;
qb_ipcs_msg_process_fn msg_process;
qb_ipcs_connection_closed_fn connection_closed;
qb_ipcs_connection_destroyed_fn connection_destroyed;
};
/**
* Create a new IPC server.
*
* @param name for clients to connect to.
* @param service_id an integer to associate with the service
* @param type transport type.
* @param handlers callbacks.
* @return the new service instance.
*/
qb_ipcs_service_t* qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers);
/**
* Increase the reference counter on the service object.
*
* @param s service instance
*/
void qb_ipcs_ref(qb_ipcs_service_t *s);
/**
* Decrease the reference counter on the service object.
*
* @param s service instance
*/
void qb_ipcs_unref(qb_ipcs_service_t *s);
/**
* Set your poll callbacks.
*
* @param s service instance
* @param handlers the handlers that you want ipcs to use.
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s,
struct qb_ipcs_poll_handlers *handlers);
+/**
+ * Associate a "user" pointer with this service.
+ *
+ * @param s service instance
+ * @param context the pointer to associate with this service.
+ * @see qb_ipcs_service_context_get()
+ */
+void qb_ipcs_service_context_set(qb_ipcs_service_t* s,
+ void *context);
+
+/**
+ * Get the context (set previously)
+ *
+ * @param s service instance
+ * @return the context
+ * @see qb_ipcs_service_context_set()
+ */
+void *qb_ipcs_service_context_get(qb_ipcs_service_t* s);
+
/**
* run the new IPC server.
* @param s service instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_run(qb_ipcs_service_t* s);
/**
* Destroy the IPC server.
*
* @param s service instance to destroy
*/
void qb_ipcs_destroy(qb_ipcs_service_t* s);
/**
* Limit the incomming request rate.
* @param s service instance
* @param rl the new rate
*/
void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s,
enum qb_ipcs_rate_limit rl);
/**
* Send a response to a incomming request.
*
* @param c connection instance
* @param data the message to send
* @param size the size of the message
* @return size sent or -errno for errors
*
* @note the data must include a qb_ipc_response_header at
* the top of the message. The client will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data,
size_t size);
/**
* Send a response to a incomming request.
*
* @param c connection instance
* @param iov the iovec struct that points to the message to send
* @param iov_len the number of iovecs.
* @return size sent or -errno for errors
*
* @note the iov[0] must be a qb_ipc_response_header. The client will
* read the size field to determine how much to recv.
*/
ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c,
const struct iovec * iov, size_t iov_len);
/**
* Send an asyncronous event message to the client.
*
* @param c connection instance
* @param data the message to send
* @param size the size of the message
* @return size sent or -errno for errors
*
* @note the data must include a qb_ipc_response_header at
* the top of the message. The client will read the size field
* to determine how much to recv.
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data,
size_t size);
/**
* Send an asyncronous event message to the client.
*
* @param c connection instance
* @param iov the iovec struct that points to the message to send
* @param iov_len the number of iovecs.
* @return size sent or -errno for errors
*
* @note the iov[0] must be a qb_ipc_response_header. The client will
* read the size field to determine how much to recv.
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov,
size_t iov_len);
/**
* Increment the connection's reference counter.
*
* @param c connection instance
*/
void qb_ipcs_connection_ref(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
*
* @param c connection instance
*/
void qb_ipcs_connection_unref(qb_ipcs_connection_t *c);
/**
* Disconnect from this client.
*
* @param c connection instance
*/
void qb_ipcs_disconnect(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
*
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
/**
* Associate a "user" pointer with this connection.
*
* @param context the point to associate with this connection.
* @param c connection instance
* @see qb_ipcs_context_get()
*/
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
/**
* Get the context (set previously)
*
* @param c connection instance
* @return the context
* @see qb_ipcs_context_set()
*/
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
+/**
+ * Get the context previously set on the service backing this connection
+ *
+ * @param c connection instance
+ * @return the context
+ * @see qb_ipcs_service_context_set
+ */
+void *qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c);
+
/**
* Get the connection statistics.
*
* @deprecated from v0.13.0 onwards, use qb_ipcs_connection_stats_get_2
* @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
struct qb_ipcs_connection_stats* stats,
int32_t clear_after_read);
/**
* Get (and allocate) the connection statistics.
*
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @retval NULL if no memory or invalid connection
* @retval allocated statistics structure (user must free it).
*/
struct qb_ipcs_connection_stats_2*
qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
int32_t clear_after_read);
/**
* Get the service statistics.
*
* @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param pt service instance
* @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_stats_get(qb_ipcs_service_t* pt,
struct qb_ipcs_stats* stats,
int32_t clear_after_read);
/**
* Get the first connection.
*
* @note call qb_ipcs_connection_ref_dec() after using the connection.
*
* @param pt service instance
* @return first connection
*/
qb_ipcs_connection_t * qb_ipcs_connection_first_get(qb_ipcs_service_t* pt);
/**
* Get the next connection.
*
* @note call qb_ipcs_connection_ref_dec() after using the connection.
*
* @param pt service instance
* @param current current connection
* @return next connection
*/
qb_ipcs_connection_t * qb_ipcs_connection_next_get(qb_ipcs_service_t* pt,
qb_ipcs_connection_t *current);
/**
* Set the permissions on and shared memory files so that both processes can
* read and write to them.
*
* @param conn connection instance
* @param uid the user id to set.
* @param gid the group id to set.
* @param mode the mode to set.
*
* @see chmod() chown()
* @note this must be called within the qb_ipcs_connection_accept_fn()
* callback.
*/
void qb_ipcs_connection_auth_set(qb_ipcs_connection_t *conn, uid_t uid,
gid_t gid, mode_t mode);
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCS_H_DEFINED */
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 50714c5..45489f5 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,205 +1,207 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include "os_base.h"
#include <dirent.h>
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
#define QB_IPC_MAX_WAIT_MS 2000
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr;
uint32_t max_msg_size;
} __attribute__ ((aligned(8)));
struct qb_ipc_event_connection_request {
struct qb_ipc_request_header hdr;
intptr_t connection;
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr;
int32_t connection_type;
uint32_t max_msg_size;
intptr_t connection;
char request[PATH_MAX];
char response[PATH_MAX];
char event[PATH_MAX];
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
enum qb_ipc_type type;
union {
struct {
int32_t sock;
char *sock_name;
void* shared_data;
char shared_file_name[NAME_MAX];
} us;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
int32_t (*fc_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
int32_t needs_sock_for_poll;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
struct qb_ipc_request_header *receive_buf;
uint32_t fc_enable_max;
int32_t is_connected;
void * context;
};
int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
int32_t ms_timeout, int32_t events);
void qb_ipcc_us_sock_close(int32_t sock);
int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*peek)(struct qb_ipc_one_way *one_way, void **data_out, int32_t timeout);
void (*reclaim)(struct qb_ipc_one_way *one_way);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
void (*fc_set)(struct qb_ipc_one_way *one_way, int32_t fc_enable);
ssize_t (*q_len_get)(struct qb_ipc_one_way *one_way);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
int32_t service_id;
int32_t ref_count;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
struct qb_list_head list;
struct qb_ipcs_stats stats;
+
+ void *context;
};
enum qb_ipcs_connection_state {
QB_IPCS_CONNECTION_INACTIVE,
QB_IPCS_CONNECTION_ACTIVE,
QB_IPCS_CONNECTION_ESTABLISHED,
QB_IPCS_CONNECTION_SHUTTING_DOWN,
};
#define CONNECTION_DESCRIPTION (16)
struct qb_ipcs_connection_auth {
uid_t uid;
gid_t gid;
mode_t mode;
};
struct qb_ipcs_connection {
enum qb_ipcs_connection_state state;
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
struct qb_ipcs_connection_auth auth;
struct qb_ipc_one_way setup;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
struct qb_ipc_request_header *receive_buf;
void *context;
int32_t fc_enabled;
int32_t poll_events;
int32_t outstanding_notifiers;
char description[CONNECTION_DESCRIPTION];
struct qb_ipcs_connection_stats_2 stats;
};
void qb_ipcs_us_init(struct qb_ipcs_service *s);
void qb_ipcs_shm_init(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
int32_t qb_ipc_us_sock_error_is_disconnected(int err);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 4365f03..65868e1 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -1,886 +1,908 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "util_int.h"
#include "ipc_int.h"
#include <qb/qbdefs.h>
#include <qb/qbatomic.h>
#include <qb/qbipcs.h>
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
int32_t fc_enable);
static int32_t
new_event_notification(struct qb_ipcs_connection * c);
static QB_LIST_DECLARE(qb_ipc_services);
qb_ipcs_service_t *
qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers)
{
struct qb_ipcs_service *s;
s = calloc(1, sizeof(struct qb_ipcs_service));
if (s == NULL) {
return NULL;
}
if (type == QB_IPC_NATIVE) {
#ifdef DISABLE_IPC_SHM
s->type = QB_IPC_SOCKET;
#else
s->type = QB_IPC_SHM;
#endif /* DISABLE_IPC_SHM */
} else {
s->type = type;
}
s->pid = getpid();
s->needs_sock_for_poll = QB_FALSE;
s->poll_priority = QB_LOOP_MED;
s->ref_count = 1;
s->service_id = service_id;
(void)strlcpy(s->name, name, NAME_MAX);
s->serv_fns.connection_accept = handlers->connection_accept;
s->serv_fns.connection_created = handlers->connection_created;
s->serv_fns.msg_process = handlers->msg_process;
s->serv_fns.connection_closed = handlers->connection_closed;
s->serv_fns.connection_destroyed = handlers->connection_destroyed;
qb_list_init(&s->connections);
qb_list_init(&s->list);
qb_list_add(&s->list, &qb_ipc_services);
return s;
}
void
qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
struct qb_ipcs_poll_handlers *handlers)
{
s->poll_fns.job_add = handlers->job_add;
s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_mod = handlers->dispatch_mod;
s->poll_fns.dispatch_del = handlers->dispatch_del;
}
+void
+qb_ipcs_service_context_set(qb_ipcs_service_t* s,
+ void *context)
+{
+ s->context = context;
+}
+
+void *
+qb_ipcs_service_context_get(qb_ipcs_service_t* s)
+{
+ return s->context;
+}
+
int32_t
qb_ipcs_run(struct qb_ipcs_service *s)
{
int32_t res = 0;
if (s->poll_fns.dispatch_add == NULL ||
s->poll_fns.dispatch_mod == NULL ||
s->poll_fns.dispatch_del == NULL) {
return -EINVAL;
}
switch (s->type) {
case QB_IPC_SOCKET:
qb_ipcs_us_init((struct qb_ipcs_service *)s);
break;
case QB_IPC_SHM:
#ifdef DISABLE_IPC_SHM
res = -ENOTSUP;
#else
qb_ipcs_shm_init((struct qb_ipcs_service *)s);
#endif /* DISABLE_IPC_SHM */
break;
case QB_IPC_POSIX_MQ:
case QB_IPC_SYSV_MQ:
res = -ENOTSUP;
break;
default:
res = -EINVAL;
break;
}
if (res < 0) {
qb_ipcs_unref(s);
return res;
}
res = qb_ipcs_us_publish(s);
if (res < 0) {
(void)qb_ipcs_us_withdraw(s);
qb_ipcs_unref(s);
return res;
}
return res;
}
static int32_t
_modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
{
qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod;
if (c->service->type == QB_IPC_SOCKET) {
return disp_mod(c->service->poll_priority,
c->event.u.us.sock,
c->poll_events, c,
qb_ipcs_dispatch_connection_request);
} else {
return disp_mod(c->service->poll_priority,
c->setup.u.us.sock,
c->poll_events, c,
qb_ipcs_dispatch_connection_request);
}
return -EINVAL;
}
void
qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
enum qb_ipcs_rate_limit rl)
{
struct qb_ipcs_connection *c;
enum qb_loop_priority old_p = s->poll_priority;
struct qb_list_head *pos;
struct qb_list_head *n;
switch (rl) {
case QB_IPCS_RATE_FAST:
s->poll_priority = QB_LOOP_HIGH;
break;
case QB_IPCS_RATE_SLOW:
case QB_IPCS_RATE_OFF:
case QB_IPCS_RATE_OFF_2:
s->poll_priority = QB_LOOP_LOW;
break;
default:
case QB_IPCS_RATE_NORMAL:
s->poll_priority = QB_LOOP_MED;
break;
}
qb_list_for_each_safe(pos, n, &s->connections) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
qb_ipcs_connection_ref(c);
if (rl == QB_IPCS_RATE_OFF) {
qb_ipcs_flowcontrol_set(c, 1);
} else if (rl == QB_IPCS_RATE_OFF_2) {
qb_ipcs_flowcontrol_set(c, 2);
} else {
qb_ipcs_flowcontrol_set(c, QB_FALSE);
}
if (old_p != s->poll_priority) {
(void)_modify_dispatch_descriptor_(c);
}
qb_ipcs_connection_unref(c);
}
}
void
qb_ipcs_ref(struct qb_ipcs_service *s)
{
qb_atomic_int_inc(&s->ref_count);
}
void
qb_ipcs_unref(struct qb_ipcs_service *s)
{
int32_t free_it;
assert(s->ref_count > 0);
free_it = qb_atomic_int_dec_and_test(&s->ref_count);
if (free_it) {
qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
free(s);
}
}
void
qb_ipcs_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *pos;
struct qb_list_head *n;
if (s == NULL) {
return;
}
qb_list_for_each_safe(pos, n, &s->connections) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
(void)qb_ipcs_us_withdraw(s);
qb_ipcs_unref(s);
}
/*
* connection API
*/
static struct qb_ipc_one_way *
_event_sock_one_way_get(struct qb_ipcs_connection * c)
{
if (c->service->needs_sock_for_poll) {
return &c->setup;
}
if (c->event.type == QB_IPC_SOCKET) {
return &c->event;
}
return NULL;
}
static struct qb_ipc_one_way *
_response_sock_one_way_get(struct qb_ipcs_connection * c)
{
if (c->service->needs_sock_for_poll) {
return &c->setup;
}
if (c->response.type == QB_IPC_SOCKET) {
return &c->response;
}
return NULL;
}
ssize_t
qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.send(&c->response, data, size);
if (res == size) {
c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
if (ow) {
ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (res2 < 0) {
res = res2;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
ssize_t
qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov,
size_t iov_len)
{
ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->response, iov, iov_len);
if (res > 0) {
c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
if (ow) {
ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (res2 < 0) {
res = res2;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
static int32_t
resend_event_notifications(struct qb_ipcs_connection *c)
{
ssize_t res = 0;
if (c->outstanding_notifiers > 0) {
res = qb_ipc_us_send(&c->setup, c->receive_buf,
c->outstanding_notifiers);
}
if (res > 0) {
c->outstanding_notifiers -= res;
}
assert(c->outstanding_notifiers >= 0);
if (c->outstanding_notifiers == 0) {
c->poll_events = POLLIN | POLLPRI | POLLNVAL;
(void)_modify_dispatch_descriptor_(c);
}
return res;
}
static int32_t
new_event_notification(struct qb_ipcs_connection * c)
{
ssize_t res = 0;
if (!c->service->needs_sock_for_poll) {
return res;
}
assert(c->outstanding_notifiers >= 0);
if (c->outstanding_notifiers > 0) {
c->outstanding_notifiers++;
} else {
res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1);
if (res == -EAGAIN) {
/*
* notify the client later, when we can.
*/
c->outstanding_notifiers++;
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL;
(void)_modify_dispatch_descriptor_(c);
}
}
return res;
}
ssize_t
qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size)
{
ssize_t res;
ssize_t resn;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
if (size > c->event.max_msg_size) {
return -EMSGSIZE;
}
res = c->service->funcs.send(&c->event, data, size);
if (res == size) {
c->stats.events++;
resn = new_event_notification(c);
if (resn < 0 && resn != -EAGAIN) {
errno = -resn;
qb_util_perror(LOG_WARNING,
"new_event_notification (%s)",
c->description);
res = resn;
}
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
if (ow) {
resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (resn < 0) {
res = resn;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
ssize_t
qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
const struct iovec * iov, size_t iov_len)
{
ssize_t res;
ssize_t resn;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->event, iov, iov_len);
if (res > 0) {
c->stats.events++;
resn = new_event_notification(c);
if (resn < 0 && resn != -EAGAIN) {
errno = -resn;
qb_util_perror(LOG_WARNING,
"new_event_notification (%s)",
c->description);
res = resn;
}
} else if (res == -EAGAIN || res == -ETIMEDOUT) {
struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
if (ow) {
resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
if (resn < 0) {
res = resn;
}
}
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
return res;
}
qb_ipcs_connection_t *
qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
{
struct qb_ipcs_connection *c;
if (qb_list_empty(&s->connections)) {
return NULL;
}
c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection,
list);
qb_ipcs_connection_ref(c);
return c;
}
qb_ipcs_connection_t *
qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
struct qb_ipcs_connection * current)
{
struct qb_ipcs_connection *c;
if (current == NULL ||
qb_list_is_last(&current->list, &s->connections)) {
return NULL;
}
c = qb_list_first_entry(&current->list, struct qb_ipcs_connection,
list);
qb_ipcs_connection_ref(c);
return c;
}
int32_t
qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
{
if (c == NULL) {
return -EINVAL;
}
return c->service->service_id;
}
struct qb_ipcs_connection *
qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c =
calloc(1, sizeof(struct qb_ipcs_connection));
if (c == NULL) {
return NULL;
}
c->refcount = 1;
c->pid = 0;
c->euid = -1;
c->egid = -1;
c->receive_buf = NULL;
c->context = NULL;
c->fc_enabled = QB_FALSE;
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->poll_events = POLLIN | POLLPRI | POLLNVAL;
c->setup.type = s->type;
c->request.type = s->type;
c->response.type = s->type;
c->event.type = s->type;
(void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION);
/* the connection references the containing service so, make a reference.
*/
qb_ipcs_ref(s);
c->service = s;
qb_list_init(&c->list);
return c;
}
void
qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
{
if (c) {
qb_atomic_int_inc(&c->refcount);
}
}
void
qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
{
int32_t free_it;
if (c == NULL) {
return;
}
if (c->refcount < 1) {
qb_util_log(LOG_ERR, "ref:%d state:%d (%s)",
c->refcount, c->state, c->description);
assert(0);
}
free_it = qb_atomic_int_dec_and_test(&c->refcount);
if (free_it) {
qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
}
c->service->funcs.disconnect(c);
qb_ipcs_unref(c->service);
free(c->receive_buf);
free(c);
}
}
void
qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{
int32_t res = 0;
qb_loop_job_dispatch_fn rerun_job;
if (c == NULL) {
return;
}
qb_util_log(LOG_DEBUG, "%s(%s) state:%d",
__func__, c->description, c->state);
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->service->funcs.disconnect(c);
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->service->stats.closed_connections++;
/* return early as it's an incomplete connection.
*/
return;
}
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
c->service->funcs.disconnect(c);
c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
c->service->stats.active_connections--;
c->service->stats.closed_connections++;
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
res = 0;
if (c->service->serv_fns.connection_closed) {
res = c->service->serv_fns.connection_closed(c);
}
if (res == 0) {
qb_ipcs_connection_unref(c);
} else {
/* OK, so they want the connection_closed
* function re-run */
rerun_job =
(qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
res = c->service->poll_fns.job_add(QB_LOOP_LOW,
c, rerun_job);
if (res != 0) {
/* last ditch attempt to cleanup */
qb_ipcs_connection_unref(c);
}
}
}
}
static void
qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
{
if (c == NULL) {
return;
}
if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable;
c->stats.flow_control_state = fc_enable;
c->stats.flow_control_count++;
}
}
static int32_t
_process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
{
int32_t res = 0;
ssize_t size;
struct qb_ipc_request_header *hdr;
qb_ipcs_connection_ref(c);
if (c->service->funcs.peek && c->service->funcs.reclaim) {
size = c->service->funcs.peek(&c->request, (void **)&hdr,
ms_timeout);
} else {
hdr = c->receive_buf;
size = c->service->funcs.recv(&c->request,
hdr,
c->request.max_msg_size,
ms_timeout);
}
if (size < 0) {
if (size != -EAGAIN && size != -ETIMEDOUT) {
qb_util_perror(LOG_DEBUG,
"recv from client connection failed (%s)",
c->description);
} else {
c->stats.recv_retries++;
}
res = size;
goto cleanup;
} else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)",
c->description);
qb_ipcs_disconnect(c);
c = NULL;
res = -ESHUTDOWN;
} else {
c->stats.requests++;
res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
/* 0 == good, negative == backoff */
if (res < 0) {
res = -ENOBUFS;
} else {
res = size;
}
}
if (c && c->service->funcs.peek && c->service->funcs.reclaim) {
c->service->funcs.reclaim(&c->request);
}
cleanup:
qb_ipcs_connection_unref(c);
return res;
}
#define IPC_REQUEST_TIMEOUT 10
#define MAX_RECV_MSGS 50
int32_t
qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data)
{
int32_t res = _process_request_((struct qb_ipcs_connection *)data,
IPC_REQUEST_TIMEOUT);
if (res > 0) {
return 0;
}
return res;
}
static ssize_t
_request_q_len_get(struct qb_ipcs_connection *c)
{
ssize_t q_len;
if (c->service->funcs.q_len_get) {
q_len = c->service->funcs.q_len_get(&c->request);
if (q_len <= 0) {
return q_len;
}
if (c->service->poll_priority == QB_LOOP_MED) {
q_len = QB_MIN(q_len, 5);
} else if (c->service->poll_priority == QB_LOOP_LOW) {
q_len = 1;
} else {
q_len = QB_MIN(q_len, MAX_RECV_MSGS);
}
} else {
q_len = 1;
}
return q_len;
}
int32_t
qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
char bytes[MAX_RECV_MSGS];
int32_t res;
int32_t res2;
int32_t recvd = 0;
ssize_t avail;
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
return -EINVAL;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
if (revents & POLLOUT) {
res = resend_event_notifications(c);
if (res < 0 && res != -EAGAIN) {
errno = -res;
qb_util_perror(LOG_WARNING,
"resend_event_notifications (%s)",
c->description);
}
if ((revents & POLLIN) == 0) {
return 0;
}
}
if (c->fc_enabled) {
return 0;
}
avail = _request_q_len_get(c);
if (c->service->needs_sock_for_poll && avail == 0) {
res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0);
if (qb_ipc_us_sock_error_is_disconnected(res2)) {
errno = -res2;
qb_util_perror(LOG_WARNING, "conn (%s) disconnected",
c->description);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
} else {
qb_util_log(LOG_WARNING,
"conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)",
c->description, fd, res2);
return 0;
}
}
do {
res = _process_request_(c, IPC_REQUEST_TIMEOUT);
if (res > 0 || res == -ENOBUFS || res == -EINVAL) {
recvd++;
}
if (res > 0) {
avail--;
}
} while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) {
res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
if (res2 < 0) {
errno = -res2;
qb_util_perror(LOG_ERR,
"error receiving from setup sock (%s)",
c->description);
}
}
res = QB_MIN(0, res);
if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
res = 0;
}
if (res != 0) {
if (res != -ENOTCONN) {
/*
* Abnormal state (ENOTCONN is normal shutdown).
*/
errno = -res;
qb_util_perror(LOG_ERR, "request returned error (%s)",
c->description);
}
qb_ipcs_connection_unref(c);
}
return res;
}
void
qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
{
if (c == NULL) {
return;
}
c->context = context;
}
void *
qb_ipcs_context_get(struct qb_ipcs_connection *c)
{
if (c == NULL) {
return NULL;
}
return c->context;
}
+void *
+qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c)
+{
+ if (c == NULL || c->service == NULL) {
+ return NULL;
+ }
+ return c->service->context;
+}
+
int32_t
qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
struct qb_ipcs_connection_stats * stats,
int32_t clear_after_read)
{
if (c == NULL) {
return -EINVAL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
c->stats.client_pid = c->pid;
}
return 0;
}
struct qb_ipcs_connection_stats_2*
qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
int32_t clear_after_read)
{
struct qb_ipcs_connection_stats_2 * stats;
if (c == NULL) {
errno = EINVAL;
return NULL;
}
stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2));
if (stats == NULL) {
return NULL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2));
if (c->service->funcs.q_len_get) {
stats->event_q_length = c->service->funcs.q_len_get(&c->event);
} else {
stats->event_q_length = 0;
}
if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
c->stats.client_pid = c->pid;
}
return stats;
}
int32_t
qb_ipcs_stats_get(struct qb_ipcs_service * s,
struct qb_ipcs_stats * stats, int32_t clear_after_read)
{
if (s == NULL) {
return -EINVAL;
}
memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
if (clear_after_read) {
memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
}
return 0;
}
void
qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid,
gid_t gid, mode_t mode)
{
if (c) {
c->auth.uid = uid;
c->auth.gid = gid;
c->auth.mode = mode;
}
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Feb 26, 10:57 PM (9 h, 12 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465850
Default Alt Text
(38 KB)

Event Timeline