Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/docs/Makefile.am b/docs/Makefile.am
index 6dfd264..a2005eb 100644
--- a/docs/Makefile.am
+++ b/docs/Makefile.am
@@ -1,46 +1,46 @@
# Copyright (C) 2010 Red Hat, Inc.
#
# Authors: Angus Salkeld <asalkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
MAINTAINERCLEANFILES = Makefile.in
EXTRA_DIST = man.dox html.dox
if HAVE_DOXYGEN
inc_dir = $(top_srcdir)/include/qb
dependant_headers = $(wildcard $(inc_dir)/qb*.h)
dist_man_MANS = man3/qbipcc.h.3 man3/qbipcs.h.3 \
man3/qbhdb.h.3 man3/qbipc_common.h.3 man3/qblist.h.3 \
- man3/qbpoll.h.3 man3/qbtimer.h.3 man3/qbutil.h.3
+ man3/qbloop.h.3 man3/qbtimer.h.3 man3/qbutil.h.3
$(dist_man_MANS): man.dox $(dependant_headers)
@mkdir -p man3
@doxygen man.dox
doxygen: html.dox
@mkdir -p html
@doxygen html.dox
else
doxygen:
@echo WARNING: no doxygen to build man pages!
endif
clean-generic:
rm -rf html man3
diff --git a/include/qb/Makefile.am b/include/qb/Makefile.am
index 28a3a74..4cc1b3e 100644
--- a/include/qb/Makefile.am
+++ b/include/qb/Makefile.am
@@ -1,26 +1,27 @@
# Copyright (c) 2010 Red Hat, Inc.
#
# Authors: Angus Salkeld <asalkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
#
MAINTAINERCLEANFILES = Makefile.in
instdir = $(includedir)/qb/
-inst_HEADERS = qbhdb.h qblist.h qbpoll.h qbtimer.h \
- qbipcc.h qbipcs.h qbipc_common.h \
- qbrb.h qbutil.h qbdefs.h
+inst_HEADERS = qbhdb.h qblist.h qbdefs.h qbtimer.h \
+ qbloop.h qbrb.h qbutil.h \
+ qbipcc.h qbipcs.h qbipc_common.h
+
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 727234d..6d98471 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -1,155 +1,155 @@
/*
* Copyright (C) 2006-2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>,
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCS_H_DEFINED
#define QB_IPCS_H_DEFINED
#include <stdlib.h>
#include <qb/qbipc_common.h>
#include <qb/qbhdb.h>
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
struct qb_ipcs_connection;
typedef struct qb_ipcs_connection qb_ipcs_connection_t;
typedef qb_handle_t qb_ipcs_service_pt;
typedef int32_t (*qb_ipcs_dispatch_fn_t) (qb_ipcs_service_pt s, int32_t fd, int32_t revents,
void *data);
typedef int32_t (*qb_ipcs_dispatch_add_fn)(qb_ipcs_service_pt s, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_rm_fn)(qb_ipcs_service_pt s, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn);
struct qb_ipcs_poll_handlers {
qb_ipcs_dispatch_add_fn dispatch_add;
qb_ipcs_dispatch_rm_fn dispatch_rm;
};
/**
* This callback is to check wether you want to accept a new connection.
*
* The type of checks you should do are authentication, service availabilty
* or process resource constraints.
* @return 0 to accept or -errno to indicate a failure (sent back to the client)
*/
typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c, uid_t uid, gid_t gid);
/**
* This is called after a new connection has been created.
*/
typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c);
/**
* This is called after a connection has been destroyed.
*/
typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c);
/**
* This is the message processing calback.
* It is called with the message data.
*/
typedef void (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c,
void *data, size_t size);
struct qb_ipcs_service_handlers {
qb_ipcs_connection_accept_fn connection_accept;
qb_ipcs_connection_created_fn connection_created;
qb_ipcs_msg_process_fn msg_process;
qb_ipcs_connection_destroyed_fn connection_destroyed;
};
/**
* Create a new IPC server.
*/
qb_ipcs_service_pt qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers);
/**
* Set your callbacks.
*/
void qb_ipcs_service_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_service_handlers *handlers);
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_poll_handlers *handlers);
/**
* run the new IPC server.
*/
-int32_t qb_ipcs_run(qb_ipcs_service_pt s, qb_handle_t poll);
+int32_t qb_ipcs_run(qb_ipcs_service_pt s, void *loop_pt);
/**
* Destroy the IPC server.
*/
void qb_ipcs_destroy(qb_ipcs_service_pt s);
/**
* send a response to a incomming request.
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Increment the connection's reference counter.
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCS_H_DEFINED */
diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h
new file mode 100644
index 0000000..6d1bf6f
--- /dev/null
+++ b/include/qb/qbloop.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef QB_LOOP_DEFINED
+#define QB_LOOP_DEFINED
+
+/*
+ * Main loop functions
+ */
+enum qb_loop_priority {
+ QB_LOOP_LOW = 0,
+ QB_LOOP_MED = 1,
+ QB_LOOP_HIGH = 2,
+};
+
+typedef struct qb_loop qb_loop_t;
+
+qb_loop_t * qb_loop_create(void);
+void qb_loop_stop(qb_loop_t *l);
+void qb_loop_run(qb_loop_t *l);
+
+
+/*
+ * Job API
+ */
+
+typedef void *qb_loop_job_handle;
+
+typedef void (*qb_loop_job_dispatch_fn)(void *data);
+
+int32_t qb_loop_job_add(qb_loop_t *l,
+ enum qb_loop_priority p,
+ void *data,
+ qb_loop_job_dispatch_fn dispatch_fn);
+
+/*
+ * Timer API
+ */
+
+typedef void *qb_loop_timer_handle;
+#define qb_poll_timer_handle qb_loop_timer_handle
+
+typedef void (*qb_loop_timer_dispatch_fn)(void *data);
+
+int32_t qb_loop_timer_add(qb_loop_t *l,
+ enum qb_loop_priority p,
+ int32_t msec_duration,
+ void *data,
+ qb_loop_timer_dispatch_fn timer_fn,
+ qb_loop_timer_handle * timer_handle_out);
+
+int32_t qb_loop_timer_del(qb_loop_t *l, qb_loop_timer_handle th);
+
+/*
+ * Poll API
+ */
+typedef void (*qb_loop_poll_low_fds_event_fn) (int32_t not_enough, int32_t fds_available);
+int32_t qb_loop_poll_low_fds_event_set(
+ qb_loop_t *loop,
+ qb_loop_poll_low_fds_event_fn fn);
+
+typedef int32_t (*qb_loop_poll_dispatch_fn) (int32_t fd, int32_t revents, void *data);
+
+
+int32_t qb_loop_poll_add(qb_loop_t *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ void *data,
+ qb_loop_poll_dispatch_fn dispatch_fn);
+
+int32_t qb_loop_poll_mod(qb_loop_t *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ qb_loop_poll_dispatch_fn dispatch_fn);
+
+int32_t qb_loop_poll_del(qb_loop_t *l, int32_t fd);
+
+#endif /* QB_LOOP_DEFINED */
+
diff --git a/include/qb/qbpoll.h b/include/qb/qbpoll.h
deleted file mode 100644
index 009ca3c..0000000
--- a/include/qb/qbpoll.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (C) 2003-2010 Red Hat, Inc.
- *
- * Author: Steven Dake <sdake@redhat.com>
- *
- * This file is part of libqb.
- *
- * libqb is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 2.1 of the License, or
- * (at your option) any later version.
- *
- * libqb is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with libqb. If not, see <http://www.gnu.org/licenses/>.
- */
-#ifndef QB_POLL_H_DEFINED
-#define QB_POLL_H_DEFINED
-
-#include <qb/qbhdb.h>
-#include <pthread.h>
-
-/* *INDENT-OFF* */
-#ifdef __cplusplus
-extern "C" {
-#endif
-/* *INDENT-ON* */
-
-typedef void (*qb_poll_low_fds_event_fn) (int32_t not_enough, int32_t fds_available);
-
-typedef void *qb_poll_timer_handle;
-typedef void *qb_poll_job_handle;
-
-/**
- * return < 0 for removal
- * return == 0 for no-op
- * return > 0 for work done
- */
-typedef int32_t (*qb_poll_job_execute_fn_t) (void *data);
-
-qb_handle_t qb_poll_create(void);
-
-int32_t qb_poll_destroy(qb_handle_t hdb_handle);
-
-int32_t qb_poll_dispatch_add(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- void *data,
- int32_t (*dispatch_fn) (qb_handle_t handle,
- int32_t fd, int32_t revents, void *data));
-
-int32_t qb_poll_dispatch_modify(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- int32_t (*dispatch_fn) (qb_handle_t hdb_handle_t,
- int32_t fd,
- int32_t revents, void *data));
-
-int32_t qb_poll_dispatch_delete(qb_handle_t handle, int32_t fd);
-
-int32_t qb_poll_low_fds_event_set(
- qb_handle_t handle,
- qb_poll_low_fds_event_fn fn);
-
-int32_t qb_poll_timer_add(qb_handle_t handle,
- int32_t msec_in_future, void *data,
- void (*timer_fn) (void *data),
- qb_poll_timer_handle * timer_handle_out);
-
-int32_t qb_poll_timer_delete(qb_handle_t handle, qb_poll_timer_handle timer_handle);
-
-int32_t qb_poll_job_add(qb_handle_t handle,
- void *data,
- qb_poll_job_execute_fn_t execute_fn,
- qb_poll_job_handle * handle_out);
-int32_t qb_poll_job_delete(qb_handle_t poll_handle, qb_poll_job_handle job_handle);
-
-int32_t qb_poll_run(qb_handle_t handle);
-
-int32_t qb_poll_stop(qb_handle_t handle);
-
-/* *INDENT-OFF* */
-#ifdef __cplusplus
-}
-#endif
-/* *INDENT-ON* */
-
-#endif /* QB_POLL_H_DEFINED */
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 6758208..63ca286 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -1,47 +1,48 @@
#
# Copyright (C) 2010 Red Hat, Inc.
#
# Author: Angus Salkeld <asaslkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
MAINTAINERCLEANFILES = Makefile.in
-noinst_HEADERS = ipc_int.h util_int.h ringbuffer_int.h
+noinst_HEADERS = ipc_int.h util_int.h ringbuffer_int.h loop_int.h
#
# Here are a set of rules to help you update your library version information:
# Start with version information of ‘0:0:0’ for each libtool library.
# Update the version information only immediately before a public release of your software.
# More frequent updates are unnecessary, and only guarantee that the current interface number gets larger faster.
# If the library source code has changed at all since the last update, then increment revision (‘c:r:a’ becomes ‘c:r+1:a’).
# If any interfaces have been added, removed, or changed since the last update, increment current, and set revision to 0.
# If any interfaces have been added since the last public release, then increment age.
# If any interfaces have been removed since the last public release, then set age to 0.
#
lib_LTLIBRARIES = libqb.la
libqb_la_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
libqb_la_LDFLAGS = -version-info 0:0:0
-libqb_la_SOURCES = util.c poll.c timer.c hdb.c ringbuffer.c ringbuffer_helper.c \
+libqb_la_SOURCES = util.c hdb.c ringbuffer.c ringbuffer_helper.c timer.c \
+ loop.c loop_poll.c loop_timer.c loop_job.c \
ipcc.c ipcs.c ipc_posix_mq.c ipc_sysv_mq.c ipc_shm.c ipc_us.c
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libqb.pc
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 76cdb41..9555d48 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,191 +1,189 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include <unistd.h>
#include "config.h"
#include <dirent.h>
#include <mqueue.h>
#include <qb/qblist.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
/*
* Darwin claims to support process shared synchronization
* but it really does not. The unistd.h header file is wrong.
*/
#if defined(QB_DARWIN) || defined(__UCLIBC__)
#undef _POSIX_THREAD_PROCESS_SHARED
#define _POSIX_THREAD_PROCESS_SHARED -1
#endif
#ifndef _POSIX_THREAD_PROCESS_SHARED
#define _POSIX_THREAD_PROCESS_SHARED -1
#endif
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
#endif
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
uint32_t max_msg_size __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
int32_t connection_type __attribute__ ((aligned(8)));
uint32_t max_msg_size __attribute__ ((aligned(8)));
char request[PATH_MAX] __attribute__ ((aligned(8)));
char response[PATH_MAX] __attribute__ ((aligned(8)));
char event[PATH_MAX] __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
union {
struct {
mqd_t q;
char name[NAME_MAX];
} pmq;
struct {
int32_t q;
int32_t key;
} smq;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
enum qb_ipc_type type;
int32_t needs_sock_for_poll;
int32_t sock;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
char *receive_buf;
};
int32_t qb_ipc_us_send(int32_t s, const void *msg, size_t len);
int32_t qb_ipc_us_recv (int32_t s, void *msg, size_t len);
int32_t qb_ipc_us_recv_ready(int32_t s, int32_t ms_timeout);
int32_t qb_ipcc_us_connect(const char *socket_name, int32_t *sock_pt);
void qb_ipcc_us_disconnect (int32_t sock);
int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_soc_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
void (*destroy)(struct qb_ipcs_service *s);
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
int32_t service_id;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
- qb_handle_t poll_handle;
+ void* loop_pt;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
struct qb_list_head connections;
};
struct qb_ipcs_connection {
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
int32_t sock;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
char *receive_buf;
void *context;
};
int32_t qb_ipcs_pmq_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_soc_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_smq_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_shm_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
-int32_t qb_ipcs_dispatch_connection_request(qb_handle_t hdb_handle_t,
- int32_t fd, int32_t revents, void *data);
-int32_t qb_ipcs_dispatch_service_request(qb_handle_t hdb_handle_t,
- int32_t fd, int32_t revents, void *data);
+int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
+int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
void qb_ipcs_disconnect(struct qb_ipcs_connection *c);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipc_posix_mq.c b/lib/ipc_posix_mq.c
index 2616f33..3cb0f08 100644
--- a/lib/ipc_posix_mq.c
+++ b/lib/ipc_posix_mq.c
@@ -1,418 +1,419 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <mqueue.h>
#include <sys/resource.h>
#include "ipc_int.h"
#include "util_int.h"
#include <qb/qbdefs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#define QB_REQUEST_Q_LEN 3
#define QB_RESPONSE_Q_LEN 1
#define QB_EVENT_Q_LEN 3
static size_t q_space_used = 0;
#ifdef QB_LINUX
#define QB_RLIMIT_CHANGE_NEEDED 1
#endif /* QB_LINUX */
/*
* utility functions
* --------------------------------------------------------
*/
static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t q_len)
{
int32_t res = 0;
#ifdef QB_RLIMIT_CHANGE_NEEDED
struct rlimit rlim;
size_t q_space_needed;
#endif /* QB_RLIMIT_CHANGE_NEEDED */
#ifdef QB_RLIMIT_CHANGE_NEEDED
if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno;
qb_util_log(LOG_ERR, "getrlimit failed");
return res;
}
q_space_needed = q_space_used + (max_msg_size * q_len * 4 / 3);
qb_util_log(LOG_DEBUG, "rlimit:%d needed:%zu used:%zu",
(int)rlim.rlim_cur, q_space_needed, q_space_used);
if (rlim.rlim_cur < q_space_needed) {
rlim.rlim_cur = q_space_needed;
}
if (rlim.rlim_max < q_space_needed) {
rlim.rlim_max = q_space_needed;
}
if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno;
qb_util_log(LOG_ERR, "setrlimit failed");
}
#endif /* QB_RLIMIT_CHANGE_NEEDED */
return res;
}
static int32_t posix_mq_open(struct qb_ipc_one_way *one_way,
const char *name, size_t q_len)
{
int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len);
if (res != 0) {
return res;
}
one_way->u.pmq.q = mq_open(name, O_RDWR);
if (one_way->u.pmq.q == (mqd_t) - 1) {
res = -errno;
perror("mq_open");
return res;
}
strcpy(one_way->u.pmq.name, name);
q_space_used += one_way->max_msg_size * q_len;
return 0;
}
static int32_t posix_mq_create(struct qb_ipcs_connection *c,
struct qb_ipc_one_way *one_way,
const char *name, size_t q_len)
{
struct mq_attr attr;
mqd_t q = 0;
int32_t res = 0;
mode_t m = 0600;
size_t max_msg_size = one_way->max_msg_size;
res = posix_mq_increase_limits(max_msg_size, q_len);
if (res != 0) {
return res;
}
try_smaller:
if (q != 0) {
max_msg_size = max_msg_size / 2;
q_len--;
}
// qb_util_log(LOG_DEBUG, "%s() max_msg_size:%zu q_len:%zu", __func__,
// max_msg_size, q_len);
attr.mq_flags = O_NONBLOCK;
attr.mq_maxmsg = q_len;
attr.mq_msgsize = max_msg_size;
q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr);
if (q == (mqd_t) - 1 && errno == ENOMEM) {
if (max_msg_size > 9000 && q_len > 3) {
goto try_smaller;
}
}
if (q == (mqd_t) - 1) {
res = -errno;
qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s",
name, strerror(errno));
return res;
}
q_space_used += max_msg_size * q_len;
one_way->max_msg_size = max_msg_size;
one_way->u.pmq.q = q;
strcpy(one_way->u.pmq.name, name);
res = fchown(q, c->euid, c->egid);
if (res == -1) {
res = -errno;
qb_util_log(LOG_ERR, "fchown:%s %s", name, strerror(errno));
mq_close(q);
mq_unlink(name);
}
return res;
}
static ssize_t qb_ipc_pmq_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1);
if (res != 0) {
return -errno;
}
return msg_len;
}
static ssize_t qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way,
const struct iovec* iov,
size_t iov_len)
{
int32_t total_size = 0;
int32_t i;
int32_t res = 0;
char *data = NULL;
char *pt = NULL;
for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len;
}
data = malloc(total_size);
pt = data;
for (i = 0; i < iov_len; i++) {
memcpy(pt, iov[i].iov_base, iov[i].iov_len);
pt += iov[i].iov_len;
}
res = mq_send(one_way->u.pmq.q, data, total_size, 1);
free(data);
if (res != 0) {
return -errno;
}
return total_size;
}
static ssize_t qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way,
void *msg_ptr,
size_t msg_len,
int32_t ms_timeout)
{
uint32_t msg_prio;
struct timespec ts_timeout;
ssize_t res;
if (ms_timeout >= 0) {
clock_gettime(CLOCK_REALTIME, &ts_timeout);
qb_timespec_add_ms(&ts_timeout, ms_timeout);
}
mq_receive_again:
if (ms_timeout >= 0) {
res = mq_timedreceive(one_way->u.pmq.q,
(char *)msg_ptr,
one_way->max_msg_size,
&msg_prio,
&ts_timeout);
} else {
res = mq_receive(one_way->u.pmq.q,
(char *)msg_ptr,
one_way->max_msg_size,
&msg_prio);
}
if (res == -1) {
switch (errno) {
case EINTR:
goto mq_receive_again;
break;
case EAGAIN:
res = -ETIMEDOUT;
break;
case ETIMEDOUT:
res = -errno;
break;
default:
res = -errno;
qb_util_log(LOG_ERR,
"error waiting for mq_timedreceive : %s",
strerror(errno));
break;
}
}
return res;
}
/*
* client functions
* --------------------------------------------------------
*/
static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_request_header hdr;
qb_util_log(LOG_DEBUG, "%s()\n", __func__);
if (c->needs_sock_for_poll) {
return;
}
hdr.id = QB_IPC_MSG_DISCONNECT;
hdr.size = sizeof(hdr);
mq_send(c->request.u.pmq.q, (const char *)&hdr, hdr.size, 30);
mq_close(c->event.u.pmq.q);
mq_close(c->response.u.pmq.q);
mq_close(c->request.u.pmq.q);
mq_unlink(c->event.u.pmq.name);
mq_unlink(c->request.u.pmq.name);
mq_unlink(c->response.u.pmq.name);
}
int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{
int32_t res = 0;
c->funcs.send = qb_ipc_pmq_send;
c->funcs.sendv = qb_ipc_pmq_sendv;
c->funcs.recv = qb_ipc_pmq_recv;
c->funcs.disconnect = qb_ipcc_pmq_disconnect;
#if defined(QB_LINUX) || defined(QB_BSD)
c->needs_sock_for_poll = QB_FALSE;
#else
c->needs_sock_for_poll = QB_TRUE;
#endif
if (strlen(c->name) > (NAME_MAX - 20)) {
return -EINVAL;
}
res = posix_mq_open(&c->request, response->request,
QB_REQUEST_Q_LEN);
if (res != 0) {
perror("mq_open:REQUEST");
return res;
}
res = posix_mq_open(&c->response, response->response,
QB_RESPONSE_Q_LEN);
if (res != 0) {
perror("mq_open:RESPONSE");
goto cleanup_request;
}
res = posix_mq_open(&c->event, response->event, QB_EVENT_Q_LEN);
if (res != 0) {
perror("mq_open:EVENT");
goto cleanup_request_response;
}
return 0;
cleanup_request_response:
mq_close(c->response.u.pmq.q);
cleanup_request:
mq_close(c->request.u.pmq.q);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static void qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c)
{
struct qb_ipc_response_header msg;
msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg);
msg.error = 0;
qb_ipc_pmq_send(&c->event, &msg, msg.size);
mq_close(c->event.u.pmq.q);
mq_close(c->response.u.pmq.q);
mq_close(c->request.u.pmq.q);
mq_unlink(c->event.u.pmq.name);
mq_unlink(c->request.u.pmq.name);
mq_unlink(c->response.u.pmq.name);
}
static void qb_ipcs_pmq_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
qb_util_log(LOG_DEBUG, "%s\n", __func__);
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
iter_next = iter->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
}
static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res = 0;
snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid);
snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid);
snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid);
res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN);
if (res < 0) {
goto cleanup;
}
res = posix_mq_create(c, &c->response, r->response, QB_RESPONSE_Q_LEN);
if (res < 0) {
goto cleanup_request;
}
res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN);
if (res < 0) {
goto cleanup_request_response;
}
if (!s->needs_sock_for_poll) {
- qb_poll_dispatch_add(s->poll_handle, c->request.u.pmq.q,
+ qb_loop_poll_add(s->loop_pt, QB_LOOP_HIGH,
+ c->request.u.pmq.q,
POLLIN | POLLPRI | POLLNVAL,
c, qb_ipcs_dispatch_service_request);
}
r->hdr.error = 0;
return 0;
cleanup_request_response:
mq_close(c->response.u.pmq.q);
mq_unlink(r->response);
cleanup_request:
mq_close(c->request.u.pmq.q);
mq_unlink(r->request);
cleanup:
r->hdr.error = res;
return res;
}
int32_t qb_ipcs_pmq_create(struct qb_ipcs_service * s)
{
s->funcs.recv = qb_ipc_pmq_recv;
s->funcs.send = qb_ipc_pmq_send;
s->funcs.sendv = qb_ipc_pmq_sendv;
s->funcs.connect = qb_ipcs_pmq_connect;
s->funcs.disconnect = qb_ipcs_pmq_disconnect;
s->funcs.destroy = qb_ipcs_pmq_destroy;
#if defined(QB_LINUX) || defined(QB_BSD)
s->needs_sock_for_poll = QB_FALSE;
#else
s->needs_sock_for_poll = QB_TRUE;
#endif
return 0;
}
diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c
index 7dd41c3..b679f04 100644
--- a/lib/ipc_shm.c
+++ b/lib/ipc_shm.c
@@ -1,274 +1,274 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "ipc_int.h"
#include "util_int.h"
#include <qb/qbdefs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include <qb/qbrb.h>
/*
* utility functions
* --------------------------------------------------------
*/
/*
* client functions
* --------------------------------------------------------
*/
static void qb_ipcc_shm_disconnect(struct qb_ipcc_connection *c)
{
qb_rb_close(c->request.u.shm.rb, QB_FALSE);
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
qb_rb_close(c->event.u.shm.rb, QB_FALSE);
}
static ssize_t qb_ipc_shm_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
return qb_rb_chunk_write(one_way->u.shm.rb, msg_ptr, msg_len);
}
static ssize_t qb_ipc_shm_sendv(struct qb_ipc_one_way *one_way,
const struct iovec* iov,
size_t iov_len)
{
char *dest;
int32_t res = 0;
int32_t total_size = 0;
int32_t i;
char *pt = NULL;
for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len;
}
dest = qb_rb_chunk_alloc(one_way->u.shm.rb, total_size);
if (dest == NULL) {
return -errno;
}
pt = dest;
for (i = 0; i < iov_len; i++) {
memcpy(pt, iov[i].iov_base, iov[i].iov_len);
pt += iov[i].iov_len;
}
res = qb_rb_chunk_commit(one_way->u.shm.rb, total_size);
if (res < 0) {
return res;
}
return total_size;
}
static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way,
void *msg_ptr,
size_t msg_len,
int32_t ms_timeout)
{
ssize_t res = qb_rb_chunk_read(one_way->u.shm.rb,
(void *)msg_ptr,
msg_len,
ms_timeout);
if (res == -ETIMEDOUT) {
return -EAGAIN;
}
return res;
}
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{
int32_t res = 0;
c->funcs.send = qb_ipc_shm_send;
c->funcs.sendv = qb_ipc_shm_sendv;
c->funcs.recv = qb_ipc_shm_recv;
c->funcs.disconnect = qb_ipcc_shm_disconnect;
c->needs_sock_for_poll = QB_TRUE;
if (strlen(c->name) > (NAME_MAX - 20)) {
errno = EINVAL;
return -errno;
}
c->request.u.shm.rb = qb_rb_open(response->request, c->request.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS);
if (c->request.u.shm.rb == NULL) {
perror("qb_rb_open:REQUEST");
return -errno;
}
c->response.u.shm.rb = qb_rb_open(response->response,
c->response.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS);
if (c->response.u.shm.rb == NULL) {
perror("qb_rb_open:RESPONSE");
goto cleanup_request;
}
c->event.u.shm.rb = qb_rb_open(response->event,
c->response.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS);
if (c->event.u.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:EVENT");
goto cleanup_request_response;
}
return 0;
cleanup_request_response:
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
cleanup_request:
qb_rb_close(c->request.u.shm.rb, QB_FALSE);
qb_util_log(LOG_DEBUG, "connection failed %d\n", res);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static void qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c)
{
struct qb_ipc_response_header msg;
int32_t peer_alive = QB_TRUE;
if (c->sock == -1) {
peer_alive = QB_FALSE;
}
msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg);
msg.error = 0;
if (c->response.u.shm.rb) {
if (peer_alive) {
qb_ipc_shm_send(&c->event, &msg, msg.size);
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
} else {
qb_rb_close(c->response.u.shm.rb, QB_TRUE);
}
}
if (c->event.u.shm.rb) {
qb_rb_close(c->event.u.shm.rb, !peer_alive);
}
if (c->request.u.shm.rb) {
qb_rb_close(c->request.u.shm.rb, !peer_alive);
}
}
static void qb_ipcs_shm_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
qb_util_log(LOG_DEBUG, "destroying server\n");
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
iter_next = iter->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
}
static int32_t qb_ipcs_shm_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
qb_util_log(LOG_DEBUG, "connecting to client [%d]\n", c->pid);
snprintf(r->request, NAME_MAX, "%s-request-%d", s->name, c->pid);
snprintf(r->response, NAME_MAX, "%s-response-%d", s->name, c->pid);
snprintf(r->event, NAME_MAX, "%s-event-%d", s->name, c->pid);
qb_util_log(LOG_DEBUG, "rb_open:%s", r->request);
c->request.u.shm.rb = qb_rb_open(r->request,
c->request.max_msg_size,
QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS);
if (c->request.u.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:REQUEST");
goto cleanup;
}
res = qb_rb_chown(c->request.u.shm.rb, c->euid, c->egid);
c->response.u.shm.rb = qb_rb_open(r->response,
c->response.max_msg_size,
QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS);
if (c->response.u.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:RESPONSE");
goto cleanup_request;
}
res = qb_rb_chown(c->response.u.shm.rb, c->euid, c->egid);
c->event.u.shm.rb = qb_rb_open(r->event,
c->event.max_msg_size,
QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS);
if (c->event.u.shm.rb == NULL) {
res = -errno;
perror("mq_open:EVENT");
goto cleanup_request_response;
}
res = qb_rb_chown(c->event.u.shm.rb, c->euid, c->egid);
r->hdr.error = 0;
return 0;
cleanup_request_response:
qb_rb_close(c->request.u.shm.rb, QB_FALSE);
cleanup_request:
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
cleanup:
r->hdr.error = res;
return res;
}
int32_t qb_ipcs_shm_create(struct qb_ipcs_service *s)
{
s->funcs.destroy = qb_ipcs_shm_destroy;
s->funcs.recv = qb_ipc_shm_recv;
s->funcs.send = qb_ipc_shm_send;
s->funcs.sendv = qb_ipc_shm_sendv;
s->funcs.connect = qb_ipcs_shm_connect;
s->funcs.disconnect = qb_ipcs_shm_disconnect;
s->needs_sock_for_poll = QB_TRUE;
return 0;
}
diff --git a/lib/ipc_sysv_mq.c b/lib/ipc_sysv_mq.c
index ef59192..dd224b1 100644
--- a/lib/ipc_sysv_mq.c
+++ b/lib/ipc_sysv_mq.c
@@ -1,398 +1,398 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <sys/ipc.h>
#include <sys/msg.h>
#include <qb/qbdefs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include "ipc_int.h"
#include "util_int.h"
#ifndef MSGMAX
#define MSGMAX 8192
#endif
#define MY_DATA_SIZE 8000
struct my_msgbuf {
int32_t id __attribute__ ((aligned(8)));
char data[MY_DATA_SIZE] __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
/*
* utility functions
* --------------------------------------------------------
*/
static int32_t sysv_mq_unnamed_create(struct qb_ipcs_connection *c,
struct qb_ipc_one_way *queue)
{
struct msqid_ds info;
int32_t res = 0;
retry_creating_the_q:
queue->u.smq.key = random();
queue->u.smq.q =
msgget(queue->u.smq.key,
IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR);
if (queue->u.smq.q == -1 && errno == EEXIST) {
goto retry_creating_the_q;
} else if (queue->u.smq.q == -1) {
return -errno;
}
/*
* change the queue size and change the ownership to that of
* the client so they can access it.
*/
res = msgctl(queue->u.smq.q, IPC_STAT, &info);
if (res != 0) {
res = -errno;
qb_util_log(LOG_ERR, "error getting sysv-mq info : %s",
strerror(errno));
return res;
}
if (info.msg_perm.uid != 0) {
qb_util_log(LOG_WARNING,
"not enough privileges to increase msg_qbytes");
return res;
}
info.msg_qbytes = 2 * queue->max_msg_size;
info.msg_perm.uid = c->euid;
info.msg_perm.gid = c->egid;
res = msgctl(queue->u.smq.q, IPC_SET, &info);
if (res != 0) {
res = -errno;
qb_util_log(LOG_ERR,
"error modifing the SYSV message queue : %s",
strerror(errno));
return res;
}
return 0;
}
static int32_t sysv_split_and_send(mqd_t q, const void *msg_ptr,
size_t msg_len, int32_t last_chunk)
{
int32_t res;
int32_t sent = 0;
#ifdef PACK_MESSAGES
char *progress = (char *)msg_ptr;
struct my_msgbuf buf;
size_t to_send_now; /* to send in this message */
size_t to_send_next; /* to send in next message */
do {
to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE);
to_send_next = msg_len - (sent + to_send_now);
/* receiver used the ID to check to see if there
* is more to recieve for this message.
*/
if (last_chunk) {
buf.id = to_send_next + 1;
} else {
buf.id = to_send_next + 1 + msg_len;
}
memcpy(buf.data, progress, to_send_now);
res = msgsnd(q, &buf, to_send_now, IPC_NOWAIT);
if (res == 0) {
sent += to_send_now;
progress += to_send_now;
} else {
goto return_status;
}
} while (sent < msg_len);
return_status:
#else
res = msgsnd(q, msg_ptr, msg_len, IPC_NOWAIT);
sent = msg_len;
#endif
if (res == -1) {
return -errno;
}
return sent;
}
/*
* client functions
* --------------------------------------------------------
*/
static ssize_t qb_ipc_smq_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
return sysv_split_and_send(one_way->u.smq.q, msg_ptr, msg_len, QB_TRUE);
}
static ssize_t qb_ipc_smq_sendv(struct qb_ipc_one_way *one_way,
const struct iovec *iov, size_t iov_len)
{
int32_t res;
int32_t sent = 0;
struct my_msgbuf buf;
int32_t i;
for (i = 0; i < iov_len; i++) {
if (iov[i].iov_len <= MY_DATA_SIZE) {
if (i == iov_len-1) {
buf.id = 1;
} else {
buf.id = i + iov[i].iov_len;
}
memcpy(buf.data, iov[i].iov_base, iov[i].iov_len);
res = msgsnd(one_way->u.smq.q, &buf, iov[i].iov_len, IPC_NOWAIT);
if (res == 0) {
res = iov[i].iov_len;
} else {
res = -errno;
}
} else {
res = sysv_split_and_send(one_way->u.smq.q, iov[i].iov_base,
iov[i].iov_len, (i == iov_len-1));
}
if (res > 0) {
sent += res;
} else {
return res;
}
}
return sent;
}
static ssize_t qb_ipc_smq_recv(struct qb_ipc_one_way *one_way,
void *msg_ptr,
size_t msg_len,
int32_t ms_timeout)
{
ssize_t res;
ssize_t received = 0;
#ifdef PACK_MESSAGES
char *progress = (char *)msg_ptr;
struct my_msgbuf buf;
do {
try_again:
res = msgrcv(one_way->u.smq.q, &buf, MY_DATA_SIZE, 0, IPC_NOWAIT);
if (res == -1 && errno == ENOMSG) {
goto try_again;
}
//printf("res:%zd, ID:%d\n", res, buf.id);
if (res == -1) {
goto return_status;
}
memcpy(progress, buf.data, res);
received += res;
progress += res;
} while (buf.id > 1);
return_status:
#else
res = msgrcv(one_way->u.smq.q, msg_ptr, msg_len, 0, IPC_NOWAIT);
received = res;
#endif
if (res == -1 && errno == ENOMSG) {
/* just to be consistent with other IPC types.
*/
return -EAGAIN;
}
if (res == -1) {
perror(__func__);
return -errno;
}
return received;
}
static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_request_header hdr;
qb_util_log(LOG_DEBUG, "%s()\n", __func__);
//if (c->needs_sock_for_poll) {
// return;
//}
hdr.id = QB_IPC_MSG_DISCONNECT;
hdr.size = sizeof(hdr);
sysv_split_and_send(c->request.u.smq.q, (const char *)&hdr, hdr.size,
QB_TRUE);
msgctl(c->event.u.smq.q, IPC_RMID, NULL);
msgctl(c->response.u.smq.q, IPC_RMID, NULL);
msgctl(c->request.u.smq.q, IPC_RMID, NULL);
}
int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{
int32_t res = 0;
c->funcs.send = qb_ipc_smq_send;
c->funcs.sendv = qb_ipc_smq_sendv;
c->funcs.recv = qb_ipc_smq_recv;
c->funcs.disconnect = qb_ipcc_smq_disconnect;
c->type = QB_IPC_SYSV_MQ;
c->needs_sock_for_poll = QB_TRUE;
if (strlen(c->name) > (NAME_MAX - 20)) {
return -EINVAL;
}
memcpy(&c->request.u.smq.key, response->request, sizeof(uint32_t));
c->request.u.smq.q = msgget(c->request.u.smq.key, IPC_NOWAIT);
if (c->request.u.smq.q == -1) {
res = -errno;
perror("msgget:REQUEST");
goto cleanup;
}
memcpy(&c->response.u.smq.key, response->response, sizeof(uint32_t));
c->response.u.smq.q = msgget(c->response.u.smq.key, IPC_NOWAIT);
if (c->response.u.smq.q == -1) {
res = -errno;
perror("msgget:RESPONSE");
goto cleanup;
}
memcpy(&c->event.u.smq.key, response->event, sizeof(uint32_t));
c->event.u.smq.q = msgget(c->event.u.smq.key, IPC_NOWAIT);
if (c->event.u.smq.q == -1) {
res = -errno;
perror("msgget:EVENT");
goto cleanup;
}
cleanup:
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static void qb_ipcs_smq_disconnect(struct qb_ipcs_connection *c)
{
struct qb_ipc_response_header msg;
if (c->sock != -1) {
msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg);
msg.error = 0;
qb_ipc_smq_send(&c->event, &msg, msg.size);
} else {
msgctl(c->event.u.smq.q, IPC_RMID, NULL);
msgctl(c->response.u.smq.q, IPC_RMID, NULL);
msgctl(c->request.u.smq.q, IPC_RMID, NULL);
}
}
static void qb_ipcs_smq_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
qb_util_log(LOG_DEBUG, "%s\n", __func__);
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
iter_next = iter->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
}
static int32_t qb_ipcs_smq_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res = 0;
res = sysv_mq_unnamed_create(c, &c->request);
if (res < 0) {
res = -errno;
goto cleanup;
}
memcpy(r->request, &c->request.u.smq.key, sizeof(int32_t));
res = sysv_mq_unnamed_create(c, &c->response);
if (res < 0) {
res = -errno;
goto cleanup_request;
}
memcpy(r->response, &c->response.u.smq.key, sizeof(int32_t));
res = sysv_mq_unnamed_create(c, &c->event);
if (res < 0) {
res = -errno;
goto cleanup_request_response;
}
memcpy(r->event, &c->event.u.smq.key, sizeof(int32_t));
r->hdr.error = 0;
return 0;
cleanup_request:
msgctl(c->request.u.smq.q, IPC_RMID, NULL);
cleanup_request_response:
msgctl(c->response.u.smq.q, IPC_RMID, NULL);
cleanup:
r->hdr.error = res;
return res;
}
#if 0
static int32_t qb_ipcs_smq_is_msg_ready(struct qb_ipcs_service *s)
{
struct msqid_ds info;
int32_t res = msgctl(s->u.smq.q, IPC_STAT, &info);
if (res == 0) {
return info.msg_qnum;
} else {
perror("is_msg_ready");
}
return -errno;
}
#endif
int32_t qb_ipcs_smq_create(struct qb_ipcs_service *s)
{
s->funcs.destroy = qb_ipcs_smq_destroy;
s->funcs.connect = qb_ipcs_smq_connect;
s->funcs.disconnect = qb_ipcs_smq_disconnect;
s->funcs.send = qb_ipc_smq_send;
s->funcs.sendv = qb_ipc_smq_sendv;
s->funcs.recv = qb_ipc_smq_recv;
s->needs_sock_for_poll = QB_TRUE;
return 0;
}
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index b4581e5..48fa6e0 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -1,632 +1,630 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#include <qb/qbipcs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include "util_int.h"
#include "ipc_int.h"
#define SERVER_BACKLOG 5
#if defined(QB_LINUX) || defined(QB_SOLARIS)
#define QB_SUN_LEN(a) sizeof(*(a))
#else
#define QB_SUN_LEN(a) SUN_LEN(a)
#endif
-static int32_t qb_ipcs_us_connection_acceptor(qb_handle_t handle,
- int fd, int revent, void *data);
+static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
#ifdef SO_NOSIGPIPE
static void socket_nosigpipe(int32_t s)
{
int32_t on = 1;
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on));
}
#endif
static void set_cloexec_flag(int32_t fd)
{
int32_t oldflags = fcntl(fd, F_GETFD, 0);
if (oldflags < 0) {
oldflags = 0;
}
oldflags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, oldflags);
}
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
int32_t qb_ipc_us_send(int32_t s, const void *msg, size_t len)
{
int32_t result;
struct msghdr msg_send;
struct iovec iov_send;
char *rbuf = (char *)msg;
int32_t processed = 0;
msg_send.msg_iov = &iov_send;
msg_send.msg_iovlen = 1;
msg_send.msg_name = 0;
msg_send.msg_namelen = 0;
#if !defined(QB_SOLARIS)
msg_send.msg_control = 0;
msg_send.msg_controllen = 0;
msg_send.msg_flags = 0;
#else
msg_send.msg_accrights = NULL;
msg_send.msg_accrightslen = 0;
#endif
retry_send:
iov_send.iov_base = &rbuf[processed];
iov_send.iov_len = len - processed;
result = sendmsg(s, &msg_send, MSG_NOSIGNAL);
if (result == -1 && errno == EAGAIN) {
goto retry_send;
}
if (result == -1) {
return -errno;
}
processed += result;
if (processed != len) {
goto retry_send;
}
return processed;
}
static ssize_t qb_ipc_us_recv_msghdr(int32_t s,
struct msghdr *hdr, char *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
retry_recv:
hdr->msg_iov->iov_base = &msg[processed];
hdr->msg_iov->iov_len = len - processed;
result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
return -errno;
}
#if defined(QB_SOLARIS) || defined(QB_BSD) || defined(QB_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (result == 0) {
return -errno; //ENOTCONN
}
#endif
processed += result;
if (processed != len) {
goto retry_recv;
}
assert(processed == len);
return processed;
}
int32_t qb_ipc_us_recv_ready(int32_t s, int32_t ms_timeout)
{
struct pollfd ufds;
int32_t poll_events;
ufds.fd = s;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll (&ufds, 1, ms_timeout);
if ((poll_events == -1 && errno == EINTR) ||
poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
} else if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
return -ESHUTDOWN;
}
return 0;
}
int32_t qb_ipc_us_recv(int32_t s, void *msg, size_t len)
{
struct msghdr msg_recv;
struct iovec iov_recv;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#if !defined (QB_SOLARIS)
msg_recv.msg_control = 0;
msg_recv.msg_controllen = 0;
msg_recv.msg_flags = 0;
#else
msg_recv.msg_accrights = NULL;
msg_recv.msg_accrightslen = 0;
#endif
return qb_ipc_us_recv_msghdr(s, &msg_recv, msg, len);
}
static int32_t qb_ipcs_uc_recv_and_auth(struct qb_ipcs_connection *c)
{
int32_t res = 0;
struct msghdr msg_recv;
struct iovec iov_recv;
struct qb_ipc_connection_request setup_msg;
#ifdef QB_LINUX
struct cmsghdr *cmsg;
char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))];
int off = 0;
int on = 1;
struct ucred *cred;
#endif
msg_recv.msg_flags = 0;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#ifdef QB_LINUX
msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof(cmsg_cred);
#endif
#ifdef QB_SOLARIS
msg_recv.msg_accrights = 0;
msg_recv.msg_accrightslen = 0;
#endif /* QB_SOLARIS */
iov_recv.iov_base = &setup_msg;
iov_recv.iov_len = sizeof(struct qb_ipc_connection_request);
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = qb_ipc_us_recv_msghdr(c->sock, &msg_recv, (char *)&setup_msg,
sizeof(struct qb_ipc_connection_request));
if (res < 0) {
goto cleanup_and_return;
}
if (res != sizeof(struct qb_ipc_connection_request)) {
res = -EIO;
goto cleanup_and_return;
}
c->request.max_msg_size = setup_msg.max_msg_size;
c->response.max_msg_size = setup_msg.max_msg_size;
c->event.max_msg_size = setup_msg.max_msg_size;
res = -EBADMSG;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(c->sock, &uc) == 0) {
res = 0;
c->euid = ucred_geteuid(uc);
c->egid = ucred_getegid(uc);
c->pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif HAVE_GETPEEREID
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(c->sock, &c->euid, &c->egid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif SO_PASSCRED
/*
* Usually Linux systems
*/
cmsg = CMSG_FIRSTHDR(&msg_recv);
assert(cmsg);
cred = (struct ucred *)CMSG_DATA(cmsg);
if (cred) {
res = 0;
c->pid = cred->pid;
c->euid = cred->uid;
c->egid = cred->gid;
} else {
res = -EBADMSG;
}
#else /* no credentials */
res = -ENOTSUP;
#endif /* no credentials */
cleanup_and_return:
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
if (res == 0) {
if (c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid,
c->egid);
} else {
res = 0;
}
}
return res;
}
int32_t qb_ipcc_us_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
#if defined(QB_SOLARIS)
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
#else
request_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif
if (request_fd == -1) {
return -errno;
}
#ifdef SO_NOSIGPIPE
socket_nosigpipe(request_fd);
#endif /* SO_NOSIGPIPE */
set_cloexec_flag(request_fd);
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
address.sun_len = SUN_LEN(&address);
#endif
#if defined(QB_LINUX)
sprintf(address.sun_path + 1, "%s", socket_name);
#else
sprintf(address.sun_path, "%s/%s", SOCKETDIR, socket_name);
#endif
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return -errno;
}
void qb_ipcc_us_disconnect(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
#if 0
cs_error_t coroipcc_dispatch_get(hdb_handle_t handle, void **data, int timeout)
{
struct pollfd ufds;
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
char *data_addr;
cs_error_t error = CS_OK;
int res;
error =
hdb_error_to_cs(hdb_handle_get
(&ipc_hdb, handle, (void **)&ipc_instance));
if (error != CS_OK) {
return (error);
}
*data = NULL;
ufds.fd = ipc_instance->fd;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll(&ufds, 1, timeout);
if (poll_events == -1 && errno == EINTR) {
error = CS_ERR_TRY_AGAIN;
goto error_put;
} else if (poll_events == -1) {
error = CS_ERR_LIBRARY;
goto error_put;
} else if (poll_events == 0) {
error = CS_ERR_TRY_AGAIN;
goto error_put;
}
if (poll_events == 1 && (ufds.revents & (POLLERR | POLLHUP))) {
error = CS_ERR_LIBRARY;
goto error_put;
}
error = socket_recv(ipc_instance->fd, &buf, 1);
assert(error == CS_OK);
if (shared_mem_dispatch_bytes_left(ipc_instance) > 500000) {
/*
* Notify coroipcs to flush any pending dispatch messages
*/
res =
ipc_sem_post(ipc_instance->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
if (res != CS_OK) {
error = CS_ERR_LIBRARY;
goto error_put;
}
}
data_addr = ipc_instance->dispatch_buffer;
data_addr = &data_addr[ipc_instance->control_buffer->read];
*data = (void *)data_addr;
return (CS_OK);
error_put:
hdb_handle_put(&ipc_hdb, handle);
return (error);
}
#endif
/*
**************************************************************************
* SERVER
*/
int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
char error_str[100];
/*
* Create socket for IPC clients, name socket, listen for connections
*/
#if defined(QB_SOLARIS)
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
#else
s->server_sock = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif
if (s->server_sock == -1) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Cannot create server socket: %s\n", error_str);
return res;
}
set_cloexec_flag(s->server_sock);
res = fcntl(s->server_sock, F_SETFL, O_NONBLOCK);
if (res == -1) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_CRIT,
"Could not set non-blocking operation on server socket: %s\n",
error_str);
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
#if defined(QB_LINUX)
sprintf(un_addr.sun_path + 1, "%s", s->name);
#else
{
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s\n",
SOCKETDIR);
goto error_close;
}
sprintf(un_addr.sun_path, "%s/%s", SOCKETDIR, s->name);
unlink(un_addr.sun_path);
}
#endif
res =
bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_CRIT,
"Could not bind AF_UNIX (%s): %s.\n",
un_addr.sun_path, error_str);
goto error_close;
}
/*
* Allow eveyrone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(QB_LINUX)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR, "listen failed: %s.\n", error_str);
}
- qb_poll_dispatch_add(s->poll_handle, s->server_sock,
+ qb_loop_poll_add(s->loop_pt, QB_LOOP_HIGH, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return 0;
error_close:
close(s->server_sock);
return res;
}
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets\n");
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
return 0;
}
-static int32_t qb_ipcs_us_connection_acceptor(qb_handle_t handle,
- int fd, int revent, void *data)
+static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_connection *c;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
struct qb_ipc_connection_response response;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
char error_str[100];
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection:(fd: %d) [%d] %s\n",
fd, errno, error_str);
return -1;
}
if (new_fd == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection: [%d] %s\n",
errno, error_str);
return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
set_cloexec_flag(new_fd);
res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not set non-blocking operation on library connection: %s\n",
error_str);
close(new_fd);
return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
c = qb_ipcs_connection_alloc(s);
c->sock = new_fd;
res = qb_ipcs_uc_recv_and_auth(c);
if (res == 0) {
qb_util_log(LOG_INFO, "IPC credentials authenticated");
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
qb_list_add(&c->list, &s->connections);
c->receive_buf = malloc(c->request.max_msg_size);
if (s->needs_sock_for_poll) {
- qb_poll_dispatch_add(s->poll_handle, c->sock,
+ qb_loop_poll_add(s->loop_pt, QB_LOOP_HIGH, c->sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
}
}
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
qb_ipc_us_send(c->sock, &response, response.hdr.size);
if (res == 0) {
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
} else if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials.");
} else {
strerror_r(-response.hdr.error, error_str, 100);
qb_util_log(LOG_ERR, "Error in connection setup: %s.",
error_str);
}
if (res != 0) {
qb_ipcs_disconnect(c);
}
return 0;
}
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 404894e..69f6c1a 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -1,299 +1,295 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "util_int.h"
#include "ipc_int.h"
#include <qb/qbdefs.h>
#include <qb/qbipcs.h>
static void qb_ipcs_destroy_internal(void *data);
QB_HDB_DECLARE(qb_ipc_services, qb_ipcs_destroy_internal);
qb_ipcs_service_pt qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers)
{
struct qb_ipcs_service *s;
qb_ipcs_service_pt h;
qb_hdb_handle_create(&qb_ipc_services,
sizeof(struct qb_ipcs_service), &h);
qb_hdb_handle_get(&qb_ipc_services, h, (void **)&s);
s->pid = getpid();
s->type = type;
s->needs_sock_for_poll = QB_FALSE;
s->service_id = service_id;
strncpy(s->name, name, NAME_MAX);
s->serv_fns.connection_accept = handlers->connection_accept;
s->serv_fns.connection_created = handlers->connection_created;
s->serv_fns.msg_process = handlers->msg_process;
s->serv_fns.connection_destroyed = handlers->connection_destroyed;
qb_list_init(&s->connections);
qb_hdb_handle_put(&qb_ipc_services, h);
return h;
}
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt pt,
struct qb_ipcs_poll_handlers *handlers)
{
struct qb_ipcs_service *s;
qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_rm = handlers->dispatch_rm;
qb_hdb_handle_put(&qb_ipc_services, pt);
}
-int32_t qb_ipcs_run(qb_ipcs_service_pt pt, qb_handle_t poll_handle)
+int32_t qb_ipcs_run(qb_ipcs_service_pt pt, void *loop_pt)
{
int32_t res;
struct qb_ipcs_service *s;
qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
- s->poll_handle = poll_handle;
+ s->loop_pt = loop_pt;
res = qb_ipcs_us_publish(s);
if (res < 0) {
qb_hdb_handle_put(&qb_ipc_services, pt);
return res;
}
switch (s->type) {
case QB_IPC_SOCKET:
res = 0;
break;
case QB_IPC_SHM:
res = qb_ipcs_shm_create((struct qb_ipcs_service *)s);
break;
case QB_IPC_POSIX_MQ:
res = qb_ipcs_pmq_create((struct qb_ipcs_service *)s);
break;
case QB_IPC_SYSV_MQ:
res = qb_ipcs_smq_create((struct qb_ipcs_service *)s);
break;
default:
res = -EINVAL;
break;
}
if (res < 0) {
qb_ipcs_us_withdraw(s);
}
qb_hdb_handle_put(&qb_ipc_services, pt);
return res;
}
void qb_ipcs_destroy(qb_ipcs_service_pt pt)
{
qb_hdb_handle_put(&qb_ipc_services, pt);
qb_hdb_handle_destroy(&qb_ipc_services, pt);
}
static void qb_ipcs_destroy_internal(void *data)
{
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
s->funcs.destroy(s);
}
ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->response, data, size);
qb_ipcs_connection_ref_dec(c);
return res;
}
ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->event, data, size);
qb_ipc_us_send(c->sock, data, 1);
qb_ipcs_connection_ref_dec(c);
return res;
}
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len)
{
ssize_t res;
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.sendv(&c->event, iov, iov_len);
qb_ipc_us_send(c->sock, &res, 1);
qb_ipcs_connection_ref_dec(c);
return res;
}
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = malloc(sizeof(struct qb_ipcs_connection));
c->refcount = 1;
c->service = s;
c->pid = 0;
c->euid = -1;
c->egid = -1;
c->sock = -1;
qb_list_init(&c->list);
c->receive_buf = NULL;
return c;
}
void qb_ipcs_connection_ref_inc(struct qb_ipcs_connection *c)
{
// lock
c->refcount++;
//qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
// unlock
}
void qb_ipcs_connection_ref_dec(struct qb_ipcs_connection *c)
{
// lock
c->refcount--;
//qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
if (c->refcount == 0) {
qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
qb_list_del(&c->list);
// unlock
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
}
c->service->funcs.disconnect(c);
qb_ipcc_us_disconnect(c->sock);
if (c->receive_buf) {
free(c->receive_buf);
}
} else {
// unlock
}
}
int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection *c)
{
return c->service->service_id;
}
void qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{
qb_util_log(LOG_DEBUG, "%s()", __func__);
qb_ipcs_connection_ref_dec(c);
}
static int32_t _process_request_(struct qb_ipcs_connection *c)
{
int32_t res = 0;
struct qb_ipc_request_header *hdr;
hdr = (struct qb_ipc_request_header *)c->receive_buf;
qb_ipcs_connection_ref_inc(c);
get_msg_with_live_connection:
res = c->service->funcs.recv(&c->request, hdr, c->request.max_msg_size, 0);
if (res == -EAGAIN) {
goto get_msg_with_live_connection;
}
if (res < 0) {
qb_util_log(LOG_DEBUG, "%s(): %s", __func__, strerror(-res));
goto cleanup;
}
- qb_util_log(LOG_DEBUG, "%s() service:%d msg:%d", __func__,
- c->service->service_id, hdr->id);
if (hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func__);
qb_ipcs_disconnect(c);
res = -ESHUTDOWN;
} else {
c->service->serv_fns.msg_process(c, hdr, hdr->size);
res = 0;
}
cleanup:
qb_ipcs_connection_ref_dec(c);
return res;
}
-int32_t qb_ipcs_dispatch_service_request(qb_handle_t handle,
- int32_t fd, int32_t revents,
+int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents,
void *data)
{
return _process_request_((struct qb_ipcs_connection *)data);
}
-int32_t qb_ipcs_dispatch_connection_request(qb_handle_t handle,
- int32_t fd, int32_t revents,
+int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents,
void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
char one_byte;
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "%s HUP", __func__);
qb_ipcc_us_disconnect(c->sock);
c->sock = -1;
qb_ipcs_connection_ref_dec(c);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
if (c->service->needs_sock_for_poll) {
qb_ipc_us_recv(c->sock, &one_byte, 1);
}
return _process_request_(c);
}
void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
{
c->context = context;
}
void *qb_ipcs_context_get(struct qb_ipcs_connection *c)
{
return c->context;
}
diff --git a/lib/loop.c b/lib/loop.c
new file mode 100644
index 0000000..5a98d67
--- /dev/null
+++ b/lib/loop.c
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+
+static struct qb_loop_source * timer_source;
+static struct qb_loop_source * job_source;
+static struct qb_loop_source * fd_source;
+
+static int32_t qb_loop_run_level(struct qb_loop_level *level)
+{
+ struct qb_loop_item *job;
+ struct qb_list_head *iter, *iter_next;
+ int32_t processed = 0;
+
+ for (iter = level->job_head.next;
+ iter != &level->job_head;
+ iter = iter_next) {
+ if (processed >= level->to_process) {
+ break;
+ }
+ iter_next = iter->next;
+ job = qb_list_entry(iter, struct qb_loop_item, list);
+ qb_list_del (&job->list);
+ qb_list_init (&job->list);
+ job->source->dispatch_and_take_back(job, level->priority);
+ if (level->l->stop_requested) {
+ printf("%s:%d pr:%d STOP!!!\n", __func__, __LINE__, level->priority);
+ return processed;
+ }
+ processed++;
+ }
+ return processed;
+}
+
+
+struct qb_loop * qb_loop_create(void)
+{
+ struct qb_loop *l = malloc(sizeof(struct qb_loop));
+ int32_t p;
+
+ for (p = QB_LOOP_LOW; p <= QB_LOOP_HIGH; p++) {
+ l->level[p].priority = p;
+ l->level[p].to_process = 4;
+ l->level[p].l = l;
+
+ qb_list_init(&l->level[p].job_head);
+ qb_list_init(&l->level[p].wait_head);
+ }
+
+ l->stop_requested = QB_FALSE;
+ qb_list_init(&l->source_head);
+ // install sources
+ timer_source = qb_loop_timer_init(l);
+ job_source = qb_loop_jobs_init(l);
+ fd_source = qb_loop_poll_init(l);
+
+ return l;
+}
+
+void qb_loop_stop(struct qb_loop *l)
+{
+ l->stop_requested = QB_TRUE;
+}
+
+void qb_loop_run(struct qb_loop *l)
+{
+ int32_t p;
+ int32_t p_stop;
+ int32_t todo;
+ int32_t done;
+ int32_t ms_timeout;
+ int32_t fd_poll_done = QB_FALSE;
+
+ do {
+ p_stop = QB_LOOP_HIGH;
+ todo = 0;
+ poll_again:
+ if (!fd_poll_done) {
+ todo += fd_source->poll(fd_source, 0);
+ }
+ todo += job_source->poll(job_source, 0);
+ todo += timer_source->poll(timer_source, 0);
+
+ for (p = QB_LOOP_HIGH; p >= p_stop; p--) {
+ done = qb_loop_run_level(&l->level[p]);
+ if (l->stop_requested) {
+ return;
+ }
+ todo -= done;
+ }
+ if (p_stop > QB_LOOP_LOW) {
+ p_stop--;
+ fd_poll_done = QB_FALSE;
+ goto poll_again;
+ }
+ if (todo == 0) {
+ ms_timeout = qb_loop_timer_msec_duration_to_expire(timer_source);
+ todo = fd_source->poll(fd_source, ms_timeout);
+ fd_poll_done = QB_TRUE;
+ } else {
+ fd_poll_done = QB_FALSE;
+ }
+ } while (!l->stop_requested);
+}
+
diff --git a/lib/loop_int.h b/lib/loop_int.h
new file mode 100644
index 0000000..157fd3a
--- /dev/null
+++ b/lib/loop_int.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef QB_LOOP_INT_DEFINED
+#define QB_LOOP_INT_DEFINED
+
+#include <qb/qbloop.h>
+
+struct qb_loop;
+struct qb_loop_item;
+
+struct qb_loop_item {
+ struct qb_list_head list;
+ struct qb_loop_source *source;
+ void *user_data;
+};
+
+struct qb_loop_level {
+ enum qb_loop_priority priority;
+ int32_t to_process;
+ struct qb_list_head wait_head;
+ struct qb_list_head job_head;
+ struct qb_loop *l;
+};
+
+struct qb_loop_source {
+ struct qb_loop *l;
+ void (*dispatch_and_take_back)(struct qb_loop_item *i,
+ enum qb_loop_priority p);
+ int32_t (*poll)(struct qb_loop_source* s, int32_t ms_timeout);
+ struct qb_list_head list;
+};
+
+struct qb_loop {
+ struct qb_loop_level level[3];
+ int32_t stop_requested;
+ struct qb_list_head source_head;
+};
+
+struct qb_loop_source *
+qb_loop_jobs_init(struct qb_loop *l);
+
+struct qb_loop_source*
+qb_loop_timer_init(struct qb_loop *l);
+
+struct qb_loop_source*
+qb_loop_poll_init(struct qb_loop *l);
+
+int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_source);
+
+
+#endif /* QB_LOOP_INT_DEFINED */
+
diff --git a/lib/loop_job.c b/lib/loop_job.c
new file mode 100644
index 0000000..7396437
--- /dev/null
+++ b/lib/loop_job.c
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2006-2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+
+struct qb_loop_job {
+ struct qb_loop_item item;
+ qb_loop_job_dispatch_fn dispatch_fn;
+};
+
+static struct qb_loop_source * my_src;
+
+
+static void job_dispatch(struct qb_loop_item * item,
+ enum qb_loop_priority p)
+{
+ struct qb_loop_job *job = qb_list_entry(item, struct qb_loop_job, item);
+
+ job->dispatch_fn(job->item.user_data);
+ free(job);
+
+ // this is a one-shot so don't re-add
+}
+
+static int32_t get_more_jobs(struct qb_loop_source* s, int32_t ms_timeout)
+{
+ struct qb_list_head* iter;
+ struct qb_list_head* iter_next;
+ int32_t p;
+ int32_t new_jobs = 0;
+
+ // this is simple, move jobs from wait_head to job_head
+ // TODO use qb_list_splice
+ for (p = QB_LOOP_LOW; p <= QB_LOOP_HIGH; p++) {
+ for (iter = s->l->level[p].wait_head.next;
+ iter != &s->l->level[p].wait_head;
+ iter = iter_next) {
+ iter_next = iter->next;
+ qb_list_del(iter);
+ qb_list_init(iter);
+ qb_list_add_tail(iter, &s->l->level[p].job_head);
+ }
+ qb_list_init(&s->l->level[p].wait_head);
+ new_jobs += qb_list_length(&s->l->level[p].job_head);
+ }
+ return new_jobs;
+}
+
+struct qb_loop_source *
+qb_loop_jobs_init(struct qb_loop *l)
+{
+ my_src = malloc(sizeof(struct qb_loop_source));
+ my_src->l = l;
+ my_src->dispatch_and_take_back = job_dispatch;
+ my_src->poll = get_more_jobs;
+
+ qb_list_init(&my_src->list);
+ qb_list_add_tail(&my_src->list, &l->source_head);
+ return my_src;
+}
+
+int32_t qb_loop_job_add(struct qb_loop *l,
+ enum qb_loop_priority p,
+ void *data,
+ qb_loop_job_dispatch_fn dispatch_fn)
+{
+ struct qb_loop_job *job = malloc(sizeof(struct qb_loop_job));
+
+ job->dispatch_fn = dispatch_fn;
+ job->item.user_data = data;
+ job->item.source = my_src;
+
+ qb_list_init(&job->item.list);
+ qb_list_add_tail(&job->item.list, &l->level[p].wait_head);
+
+ return 0;
+}
+
diff --git a/lib/loop_poll.c b/lib/loop_poll.c
new file mode 100644
index 0000000..63b2afc
--- /dev/null
+++ b/lib/loop_poll.c
@@ -0,0 +1,298 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+#include <sys/poll.h>
+#include <sys/resource.h>
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+
+/* logs, std(in|out|err), pipe */
+#define POLL_FDS_USED_MISC 50
+
+struct qb_poll_entry {
+ struct qb_loop_item item;
+ struct pollfd ufd;
+ qb_loop_poll_dispatch_fn dispatch_fn;
+ enum qb_loop_priority p;
+ int32_t install_pos;
+};
+
+struct qb_poll_source {
+ struct qb_loop_source s;
+ struct pollfd *ufds;
+ int32_t poll_entry_count;
+ struct qb_poll_entry *poll_entries;
+ qb_loop_poll_low_fds_event_fn low_fds_event_fn;
+ int32_t not_enough_fds;
+};
+
+static struct qb_poll_source * my_src;
+
+static void poll_dispatch_and_take_back(struct qb_loop_item * item,
+ enum qb_loop_priority p)
+{
+ struct qb_poll_entry *pe = (struct qb_poll_entry *)item;
+ int32_t res;
+ int32_t idx = pe->install_pos;
+
+ res = pe->dispatch_fn(pe->ufd.fd, pe->ufd.revents, pe->item.user_data);
+ pe = &my_src->poll_entries[idx];
+ if (res < 0) {
+ pe->ufd.fd = -1; /* empty entry */
+ }
+ pe->ufd.revents = 0;
+}
+
+static void poll_fds_usage_check(struct qb_poll_source *s)
+{
+ struct rlimit lim;
+ static int32_t socks_limit = 0;
+ int32_t send_event = 0;
+ int32_t socks_used = 0;
+ int32_t socks_avail = 0;
+ int32_t i;
+
+ if (socks_limit == 0) {
+ if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
+ char error_str[100];
+ strerror_r(errno, error_str, 100);
+ printf("getrlimit: %s\n", error_str);
+ return;
+ }
+ socks_limit = lim.rlim_cur;
+ socks_limit -= POLL_FDS_USED_MISC;
+ if (socks_limit < 0) {
+ socks_limit = 0;
+ }
+ }
+
+ for (i = 0; i < s->poll_entry_count; i++) {
+ if (s->poll_entries[i].ufd.fd != -1) {
+ socks_used++;
+ }
+ }
+ socks_avail = socks_limit - socks_used;
+ if (socks_avail < 0) {
+ socks_avail = 0;
+ }
+ send_event = 0;
+ if (s->not_enough_fds) {
+ if (socks_avail > 2) {
+ s->not_enough_fds = 0;
+ send_event = 1;
+ }
+ } else {
+ if (socks_avail <= 1) {
+ s->not_enough_fds = 1;
+ send_event = 1;
+ }
+ }
+ if (send_event && s->low_fds_event_fn) {
+ s->low_fds_event_fn(s->not_enough_fds,
+ socks_avail);
+ }
+}
+
+static int32_t poll_and_add_to_jobs(struct qb_loop_source* src, int32_t ms_timeout)
+{
+ int32_t i;
+ int32_t res;
+ int32_t new_jobs = 0;
+ struct qb_poll_entry * pe;
+ struct qb_poll_source * s = (struct qb_poll_source *)src;
+
+ poll_fds_usage_check(s);
+
+ for (i = 0; i < s->poll_entry_count; i++) {
+ memcpy(&s->ufds[i], &s->poll_entries[i].ufd, sizeof(struct pollfd));
+ }
+
+ retry_poll:
+ res = poll(s->ufds, s->poll_entry_count, ms_timeout);
+ if (errno == EINTR && res == -1) {
+ goto retry_poll;
+ } else if (res == -1) {
+ return -errno;
+ }
+
+ for (i = 0; i < s->poll_entry_count; i++) {
+ if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) {
+ // empty
+ continue;
+ }
+ pe = &s->poll_entries[i];
+ if (s->ufds[i].revents == pe->ufd.revents) {
+ // entry already in the job queue.
+ continue;
+ }
+ pe->ufd.revents = s->ufds[i].revents;
+ qb_list_init(&pe->item.list);
+ qb_list_add_tail(&pe->item.list, &s->s.l->level[pe->p].job_head);
+ new_jobs++;
+ }
+
+ return new_jobs;
+}
+
+
+struct qb_loop_source*
+qb_loop_poll_init(struct qb_loop *l)
+{
+ my_src = malloc(sizeof(struct qb_poll_source));
+ my_src->s.l = l;
+ my_src->s.dispatch_and_take_back = poll_dispatch_and_take_back;
+ my_src->s.poll = poll_and_add_to_jobs;
+
+ my_src->poll_entries = 0;
+ my_src->ufds = 0;
+ my_src->poll_entry_count = 0;
+ my_src->low_fds_event_fn = NULL;
+ my_src->not_enough_fds = 0;
+
+ qb_list_init(&my_src->s.list);
+ qb_list_add_tail(&my_src->s.list, &l->source_head);
+ return (struct qb_loop_source*)my_src;
+}
+
+int32_t qb_loop_poll_low_fds_event_set(
+ qb_loop_t *l,
+ qb_loop_poll_low_fds_event_fn fn)
+{
+ my_src->low_fds_event_fn = fn;
+
+ return 0;
+}
+
+
+int32_t qb_loop_poll_add(struct qb_loop *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ void *data,
+ qb_loop_poll_dispatch_fn dispatch_fn)
+{
+ struct qb_poll_entry *poll_entries;
+ struct qb_poll_entry *pe;
+ struct pollfd *ufds;
+ int32_t found = 0;
+ int32_t install_pos;
+ int32_t res = 0;
+ int32_t new_size = 0;
+
+ for (found = 0, install_pos = 0;
+ install_pos < my_src->poll_entry_count; install_pos++) {
+ if (my_src->poll_entries[install_pos].ufd.fd == -1) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (found == 0) {
+ /*
+ * Grow pollfd list
+ */
+ new_size = (my_src->poll_entry_count + 1) * sizeof(struct qb_poll_entry);
+ poll_entries = realloc(my_src->poll_entries, new_size);
+ if (poll_entries == NULL) {
+ return -ENOMEM;
+ }
+ my_src->poll_entries = poll_entries;
+
+ new_size = (my_src->poll_entry_count+ 1) * sizeof(struct pollfd);
+ ufds = realloc(my_src->ufds, new_size);
+ if (ufds == NULL) {
+ return -ENOMEM;
+ }
+ my_src->ufds = ufds;
+
+ my_src->poll_entry_count += 1;
+ install_pos = my_src->poll_entry_count - 1;
+ }
+
+ /*
+ * Install new dispatch handler
+ */
+ pe = &my_src->poll_entries[install_pos];
+ pe->install_pos = install_pos;
+ pe->ufd.fd = fd;
+ pe->ufd.events = events;
+ pe->ufd.revents = 0;
+ pe->dispatch_fn = dispatch_fn;
+ pe->item.user_data = data;
+ pe->item.source = (struct qb_loop_source*)my_src;
+ pe->p = p;
+
+ return (res);
+}
+
+int32_t qb_loop_poll_mod(struct qb_loop *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ qb_loop_poll_dispatch_fn dispatch_fn)
+{
+ int32_t i;
+ struct qb_poll_entry *pe;
+
+ /*
+ * Find file descriptor to modify events and dispatch function
+ */
+ for (i = 0; i < my_src->poll_entry_count; i++) {
+ pe = &my_src->poll_entries[i];
+ if (pe->ufd.fd == fd) {
+ pe->ufd.events = events;
+ pe->dispatch_fn = dispatch_fn;
+ pe->p = p;
+ return 0;
+ }
+ }
+
+ return -EBADF;
+}
+
+int32_t qb_loop_poll_del(struct qb_loop *l, int32_t fd)
+{
+ int32_t i;
+ struct qb_poll_entry *pe;
+
+ /*
+ * Find file descriptor to modify events and dispatch function
+ */
+ for (i = 0; i < my_src->poll_entry_count; i++) {
+ pe = &my_src->poll_entries[i];
+ if (pe->ufd.fd == fd) {
+ my_src->ufds[i].fd = -1;
+ my_src->ufds[i].events = 0;
+ my_src->ufds[i].revents = 0;
+ pe->ufd.fd = -1;
+ pe->ufd.events = 0;
+ pe->ufd.revents = 0;
+ return 0;
+ }
+ }
+
+ return -EBADF;
+}
+
+
diff --git a/lib/loop_timer.c b/lib/loop_timer.c
new file mode 100644
index 0000000..a1da762
--- /dev/null
+++ b/lib/loop_timer.c
@@ -0,0 +1,136 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+#include <pthread.h>
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+#include "tlist.h"
+
+struct qb_loop_timer {
+ struct qb_loop_item item;
+ qb_loop_timer_dispatch_fn dispatch_fn;
+ enum qb_loop_priority p;
+};
+
+struct qb_timer_source {
+ struct qb_loop_source s;
+ struct timerlist timerlist;
+};
+
+static struct qb_timer_source * my_src;
+
+static void timer_dispatch(struct qb_loop_item * item,
+ enum qb_loop_priority p)
+{
+ struct qb_loop_timer *timer = (struct qb_loop_timer *)item;
+
+ timer->dispatch_fn(timer->item.user_data);
+ free(timer);
+}
+
+static int32_t expired_timers;
+static void make_job_from_tmo(void *data)
+{
+ struct qb_loop_timer *t = (struct qb_loop_timer *)data;
+ struct qb_loop *l = t->item.source->l;
+ qb_list_init(&t->item.list);
+ qb_list_add_tail(&t->item.list, &l->level[t->p].job_head);
+ expired_timers++;
+}
+
+static int32_t expire_the_timers(struct qb_loop_source* s, int32_t ms_timeout)
+{
+ struct qb_timer_source *ts = (struct qb_timer_source *)s;
+ expired_timers = 0;
+ timerlist_expire(&ts->timerlist);
+ return expired_timers;
+}
+
+
+int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_source)
+{
+ uint64_t left = timerlist_msec_duration_to_expire(&my_src->timerlist);
+ if (left != -1 && left > 0xFFFFFFFF) {
+ left = 0xFFFFFFFE;
+ }
+ return left;
+}
+
+struct qb_loop_source*
+qb_loop_timer_init(struct qb_loop *l)
+{
+ my_src = malloc(sizeof(struct qb_timer_source));
+ my_src->s.l = l;
+ my_src->s.dispatch_and_take_back = timer_dispatch;
+ my_src->s.poll = expire_the_timers;
+
+ qb_list_init(&my_src->s.list);
+ qb_list_add_tail(&my_src->s.list, &l->source_head);
+
+ timerlist_init(&my_src->timerlist);
+
+ return (struct qb_loop_source*)my_src;
+}
+
+int32_t qb_loop_timer_add(struct qb_loop *l,
+ enum qb_loop_priority p,
+ int32_t msec_duration,
+ void *data,
+ qb_loop_timer_dispatch_fn timer_fn,
+ qb_loop_timer_handle * timer_handle_out)
+{
+ struct qb_loop_timer *t;
+
+ if (timer_handle_out == NULL) {
+ return -ENOENT;
+ }
+ t = malloc(sizeof(struct qb_loop_timer));
+ t->item.user_data = data;
+ t->item.source = (struct qb_loop_source*)my_src;
+ t->dispatch_fn = timer_fn;
+ t->p = p;
+ qb_list_init(&t->item.list);
+
+ timerlist_add_duration(&my_src->timerlist,
+ make_job_from_tmo, t,
+ ((uint64_t)msec_duration) * QB_TIME_NS_IN_MSEC,
+ timer_handle_out);
+
+ return 0;
+}
+
+int32_t qb_loop_timer_del(struct qb_loop *l, qb_loop_timer_handle th)
+{
+ int32_t res = 0;
+
+ if (th == NULL) {
+ return -EINVAL;
+ }
+
+ timerlist_del(&my_src->timerlist, (void *)th);
+
+ return (res);
+}
+
+
diff --git a/lib/poll.c b/lib/poll.c
deleted file mode 100644
index 3e223c0..0000000
--- a/lib/poll.c
+++ /dev/null
@@ -1,631 +0,0 @@
-/*
- * Copyright (C) 2006-2010 Red Hat, Inc.
- *
- * Author: Steven Dake <sdake@redhat.com>
- *
- * This file is part of libqb.
- *
- * libqb is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 2.1 of the License, or
- * (at your option) any later version.
- *
- * libqb is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with libqb. If not, see <http://www.gnu.org/licenses/>.
- */
-#include "os_base.h"
-
-#include <pthread.h>
-#include <sys/poll.h>
-
-#include <qb/qbdefs.h>
-#include <qb/qbhdb.h>
-#include <qb/qbpoll.h>
-#include <qb/qblist.h>
-#include "tlist.h"
-#include "util_int.h"
-#include <sys/resource.h>
-
-typedef int32_t (*dispatch_fn_t) (qb_handle_t hdb_handle, int32_t fd, int32_t revents,
- void *data);
-
-struct qb_poll_entry {
- struct pollfd ufd;
- dispatch_fn_t dispatch_fn;
- void *data;
-};
-
-struct qb_poll_job {
- qb_poll_job_execute_fn_t execute_fn;
- void *data;
- struct qb_list_head list;
-};
-
-struct qb_poll_instance {
- struct qb_poll_entry *poll_entries;
- struct pollfd *ufds;
- int32_t poll_entry_count;
- struct timerlist timerlist;
- int32_t stop_requested;
- int32_t pipefds[2];
- qb_poll_low_fds_event_fn low_fds_event_fn;
- int32_t not_enough_fds;
- struct qb_list_head job_list;
-};
-
-QB_HDB_DECLARE(poll_instance_database, NULL);
-
-static int dummy_dispatch_fn (qb_handle_t handle, int fd, int revents, void *data) {
- return (0);
-}
-
-qb_handle_t qb_poll_create(void)
-{
- qb_handle_t handle;
- struct qb_poll_instance *poll_instance;
- int32_t res;
-
- res = qb_hdb_handle_create(&poll_instance_database,
- sizeof(struct qb_poll_instance), &handle);
- if (res != 0) {
- goto error_exit;
- }
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_destroy;
- }
-
- poll_instance->poll_entries = 0;
- poll_instance->ufds = 0;
- poll_instance->poll_entry_count = 0;
- poll_instance->stop_requested = 0;
- timerlist_init(&poll_instance->timerlist);
- poll_instance->not_enough_fds = 0;
- qb_list_init(&poll_instance->job_list);
-
- res = pipe (poll_instance->pipefds);
- if (res != 0) {
- goto error_destroy;
- }
-
- /*
- * Allow changes in modify to propogate into new poll instance
- */
- res = qb_poll_dispatch_add (
- handle,
- poll_instance->pipefds[0],
- POLLIN,
- NULL,
- dummy_dispatch_fn);
- if (res != 0) {
- goto error_destroy;
- }
-
- return (handle);
-
-error_destroy:
- qb_hdb_handle_destroy(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_destroy(qb_handle_t handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- close(poll_instance->pipefds[0]);
- close(poll_instance->pipefds[1]);
- free(poll_instance->poll_entries);
- free(poll_instance->ufds);
-
- qb_hdb_handle_destroy(&poll_instance_database, handle);
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_dispatch_add(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- void *data,
- int32_t (*dispatch_fn) (qb_handle_t hdb_handle_t,
- int32_t fd, int32_t revents, void *data))
-{
- struct qb_poll_instance *poll_instance;
- struct qb_poll_entry *poll_entries;
- struct pollfd *ufds;
- int32_t found = 0;
- int32_t install_pos;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (found = 0, install_pos = 0;
- install_pos < poll_instance->poll_entry_count; install_pos++) {
- if (poll_instance->poll_entries[install_pos].ufd.fd == -1) {
- found = 1;
- break;
- }
- }
-
- if (found == 0) {
- /*
- * Grow pollfd list
- */
- poll_entries =
- (struct qb_poll_entry *)realloc(poll_instance->poll_entries,
- (poll_instance->
- poll_entry_count +
- 1) *
- sizeof(struct
- qb_poll_entry));
- if (poll_entries == NULL) {
- res = -ENOMEM;
- goto error_put;
- }
- poll_instance->poll_entries = poll_entries;
-
- ufds = (struct pollfd *)realloc(poll_instance->ufds,
- (poll_instance->poll_entry_count
- + 1) * sizeof(struct pollfd));
- if (ufds == NULL) {
- res = -ENOMEM;
- goto error_put;
- }
- poll_instance->ufds = ufds;
-
- poll_instance->poll_entry_count += 1;
- install_pos = poll_instance->poll_entry_count - 1;
- }
-
- /*
- * Install new dispatch handler
- */
- poll_instance->poll_entries[install_pos].ufd.fd = fd;
- poll_instance->poll_entries[install_pos].ufd.events = events;
- poll_instance->poll_entries[install_pos].ufd.revents = 0;
- poll_instance->poll_entries[install_pos].dispatch_fn = dispatch_fn;
- poll_instance->poll_entries[install_pos].data = data;
-
-error_put:
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_dispatch_modify(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- int32_t (*dispatch_fn) (qb_handle_t hdb_handle_t,
- int32_t fd,
- int32_t revents, void *data))
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- /*
- * Find file descriptor to modify events and dispatch function
- */
- res = -EBADF;
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd == fd) {
- int change_notify = 0;
-
- if (poll_instance->poll_entries[i].ufd.events != events) {
- change_notify = 1;
- }
- poll_instance->poll_entries[i].ufd.events = events;
- poll_instance->poll_entries[i].dispatch_fn =
- dispatch_fn;
- if (change_notify) {
- char buf = 1;
- write (poll_instance->pipefds[1], &buf, 1);
- }
-
- res = 0;
- break;
- }
- }
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_dispatch_delete(qb_handle_t handle, int32_t fd)
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- /*
- * Find dispatch fd to delete
- */
- res = -EBADF;
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd == fd) {
- poll_instance->ufds[i].fd = -1;
- poll_instance->poll_entries[i].ufd.fd = -1;
- poll_instance->poll_entries[i].ufd.revents = 0;
-
- res = 0;
- break;
- }
- }
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_timer_add(qb_handle_t handle,
- int32_t msec_duration, void *data,
- void (*timer_fn) (void *data),
- qb_poll_timer_handle * timer_handle_out)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
-
- if (timer_handle_out == NULL) {
- res = -ENOENT;
- goto error_exit;
- }
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- timerlist_add_duration(&poll_instance->timerlist,
- timer_fn, data,
- ((uint64_t)msec_duration) * 1000000ULL,
- timer_handle_out);
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_timer_delete(qb_handle_t handle, qb_poll_timer_handle th)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
-
- if (th == 0) {
- return (0);
- }
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- timerlist_del(&poll_instance->timerlist, (void *)th);
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_job_add(qb_handle_t poll_handle,
- void *data,
- qb_poll_job_execute_fn_t execute_fn,
- qb_poll_job_handle * handle_out)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
- struct qb_poll_job *job;
-
- if (handle_out == NULL) {
- res = -ENOENT;
- goto error_exit;
- }
-
- res = qb_hdb_handle_get(&poll_instance_database, poll_handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
- job = malloc(sizeof(struct qb_poll_job));
- job->execute_fn = execute_fn;
- job->data = data;
- qb_list_init(&job->list);
- qb_list_add(&job->list, &poll_instance->job_list);
- handle_out = (qb_poll_job_handle)job;
-
- qb_hdb_handle_put(&poll_instance_database, poll_handle);
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_job_delete(qb_handle_t poll_handle, qb_poll_job_handle job_handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
- struct qb_poll_job *job = (struct qb_poll_job *)job_handle;
-
- if (job_handle == NULL) {
- res = -ENOENT;
- goto error_exit;
- }
-
- res = qb_hdb_handle_get(&poll_instance_database, poll_handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
- qb_list_del(&job->list);
- free(job);
-
- qb_hdb_handle_put(&poll_instance_database, poll_handle);
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_stop(qb_handle_t handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- poll_instance->stop_requested = 1;
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-error_exit:
- return (res);
-}
-
-static int32_t _qb_poll_job_run(struct qb_poll_instance *poll_instance)
-{
- struct qb_poll_job *job = NULL;
- struct qb_list_head *iter;
- size_t jobs_run = 0;
-
- for (iter = poll_instance->job_list.next;
- iter != &poll_instance->job_list;
- iter = iter->next) {
- job = qb_list_entry(iter, struct qb_poll_job, list);
- if (job == NULL) {
- continue;
- }
- jobs_run += job->execute_fn(job->data);
- if (jobs_run > 10) {
- break;
- }
- }
- return (jobs_run > 0);
-}
-
-int32_t qb_poll_low_fds_event_set(
- qb_handle_t handle,
- qb_poll_low_fds_event_fn fn)
-{
- struct qb_poll_instance *poll_instance;
-
- if (qb_hdb_handle_get (&poll_instance_database, handle,
- (void *)&poll_instance) != 0) {
- return -ENOENT;
- }
-
- poll_instance->low_fds_event_fn = fn;
-
- qb_hdb_handle_put (&poll_instance_database, handle);
- return 0;
-}
-
-/* logs, std(in|out|err), pipe */
-#define POLL_FDS_USED_MISC 50
-
-static void poll_fds_usage_check(struct qb_poll_instance *poll_instance)
-{
- struct rlimit lim;
- static int32_t socks_limit = 0;
- int32_t send_event = 0;
- int32_t socks_used = 0;
- int32_t socks_avail = 0;
- int32_t i;
-
- if (socks_limit == 0) {
- if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
- char error_str[100];
- strerror_r(errno, error_str, 100);
- printf("getrlimit: %s\n", error_str);
- return;
- }
- socks_limit = lim.rlim_cur;
- socks_limit -= POLL_FDS_USED_MISC;
- if (socks_limit < 0) {
- socks_limit = 0;
- }
- }
-
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd != -1) {
- socks_used++;
- }
- }
- socks_avail = socks_limit - socks_used;
- if (socks_avail < 0) {
- socks_avail = 0;
- }
- send_event = 0;
- if (poll_instance->not_enough_fds) {
- if (socks_avail > 2) {
- poll_instance->not_enough_fds = 0;
- send_event = 1;
- }
- } else {
- if (socks_avail <= 1) {
- poll_instance->not_enough_fds = 1;
- send_event = 1;
- }
- }
- if (send_event) {
- poll_instance->low_fds_event_fn(poll_instance->not_enough_fds,
- socks_avail);
- }
-}
-
-
-int32_t qb_poll_run(qb_handle_t handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- uint64_t expire_timeout_msec = -1;
- int32_t res;
- int32_t poll_entry_count;
- int32_t job_executed;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (;;) {
-rebuild_poll:
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- memcpy(&poll_instance->ufds[i],
- &poll_instance->poll_entries[i].ufd,
- sizeof(struct pollfd));
- }
- job_executed = _qb_poll_job_run(poll_instance);
-
- if (job_executed == QB_TRUE) {
- expire_timeout_msec = 0;
- } else {
- expire_timeout_msec =
- timerlist_msec_duration_to_expire
- (&poll_instance->timerlist);
- if (!qb_list_empty(&poll_instance->job_list) &&
- expire_timeout_msec > 50) {
- expire_timeout_msec = 50;
- }
- }
- poll_fds_usage_check(poll_instance);
-
- if (expire_timeout_msec != -1
- && expire_timeout_msec > 0xFFFFFFFF) {
- expire_timeout_msec = 0xFFFFFFFE;
- }
-
-retry_poll:
- if (poll_instance->stop_requested) {
- return (0);
- }
- res = poll(poll_instance->ufds,
- poll_instance->poll_entry_count,
- expire_timeout_msec);
- if (poll_instance->stop_requested) {
- return (0);
- }
- if (errno == EINTR && res == -1) {
- goto retry_poll;
- } else if (res == -1) {
- res = -errno;
- goto error_exit;
- }
-
- if (poll_instance->ufds[0].revents) {
- char buf;
- read (poll_instance->ufds[0].fd, &buf, 1);
- goto rebuild_poll;
- }
- poll_entry_count = poll_instance->poll_entry_count;
- for (i = 0; i < poll_entry_count; i++) {
- if (poll_instance->ufds[i].fd != -1 &&
- poll_instance->ufds[i].revents) {
-
- res =
- poll_instance->poll_entries[i].
- dispatch_fn(handle,
- poll_instance->ufds[i].fd,
- poll_instance->ufds[i].revents,
- poll_instance->poll_entries[i].
- data);
-
- /*
- * Remove dispatch functions that return -1
- */
- if (res < 0) {
- poll_instance->poll_entries[i].ufd.fd = -1; /* empty entry */
- }
- }
- }
- timerlist_expire(&poll_instance->timerlist);
- } /* for (;;) */
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-error_exit:
- return res;
-}
-
-#ifdef COMPILE_OUT
-void qb_poll_print_state(qb_handle_t handle, int32_t fd)
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- int32_t res = 0;
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- res = -ENOENT;
- exit(1);
- }
-
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd == fd) {
- printf("fd %d\n",
- poll_instance->poll_entries[i].ufd.fd);
- printf("events %d\n",
- poll_instance->poll_entries[i].ufd.events);
- printf("dispatch_fn %p\n",
- poll_instance->poll_entries[i].dispatch_fn);
- }
- }
-}
-
-#endif
diff --git a/libqb.spec.in b/libqb.spec.in
index 9997f2c..f3ac4ec 100644
--- a/libqb.spec.in
+++ b/libqb.spec.in
@@ -1,84 +1,84 @@
%global alphatag @alphatag@
Name: libqb
Summary: The Quarterback Client Server Developer Library
Version: @version@
Release: 1%{?alphatag:.%{alphatag}}%{?dist}
License: LGPL-2.1
Group: System Environment/Libraries
URL: http://www.libqb.org
Source0: https://fedorahosted.org/releases/q/u/quarterback/%{name}-%{version}.tar.gz
BuildRequires: autoconf automake
BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX)
%prep
%setup -q -n %{name}-%{version}
./autogen.sh
%{configure}
%build
make %{_smp_mflags}
%install
rm -rf %{buildroot}
make install DESTDIR=%{buildroot}
## tree fixup
# drop static libs
rm -f %{buildroot}%{_libdir}/*.a
# drop docs and html docs for now
rm -rf %{buildroot}%{_docdir}/*
%clean
rm -rf %{buildroot}
%description
This package contains libqb libraries.
%files -n libqb
%defattr(-,root,root,-)
%doc COPYING
%{_libdir}/libqb.so.*
%post -n libqb -p /sbin/ldconfig
%postun -n libqb -p /sbin/ldconfig
%package -n libqb-devel
Summary: The Quarterback Development Kit
Group: Development/Libraries
Requires: libqb = %{version}-%{release}
Requires: pkgconfig
Provides: libqb-devel = %{version}
%description -n libqb-devel
This package contains include files and man pages used to develop using
The Quarterback APIs.
%files -n libqb-devel
%defattr(-,root,root,-)
%doc COPYING README
%dir %{_includedir}/qb/
%{_libdir}/libqb.so
%{_libdir}/libqb.la
%{_libdir}/pkgconfig/libqb.pc
-%{_includedir}/qb/qbpoll.h
+%{_includedir}/qb/qbloop.h
%{_includedir}/qb/qbhdb.h
%{_includedir}/qb/qblist.h
%{_includedir}/qb/qbtimer.h
%{_includedir}/qb/qbipcc.h
%{_includedir}/qb/qbipcs.h
%{_includedir}/qb/qbipc_common.h
%{_includedir}/qb/qbrb.h
%{_includedir}/qb/qbutil.h
%{_includedir}/qb/qbdefs.h
%{_mandir}/man3/qb*3*
%changelog
* @date@ Autotools generated version <nobody@nowhere.org> - @version@-1.@alphatag@
- Autotools generated version
diff --git a/tests/.gitignore b/tests/.gitignore
index a8c14a8..cb9f7ec 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -1,9 +1,10 @@
check_plugin
check_ipc
check_hash
check_rb
bmc
bmcpt
bms
rbreader
rbwriter
+loop
diff --git a/tests/Makefile.am b/tests/Makefile.am
index cfc4d5f..7b78cb0 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1,58 +1,63 @@
# Copyright (c) 2010 Red Hat, Inc.
#
# Authors: Angus Salkeld <asalkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
#
-noinst_PROGRAMS = bmc bmcpt bms rbwriter rbreader
+noinst_PROGRAMS = bmc bmcpt bms rbwriter rbreader loop
bmc_SOURCES = bmc.c $(top_builddir)/include/qb/qbipcc.h
bmc_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bmc_LDADD = -lrt $(top_builddir)/lib/libqb.la
bmcpt_SOURCES = bmcpt.c $(top_builddir)/include/qb/qbipcc.h
bmcpt_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bmcpt_LDADD = -lrt $(top_builddir)/lib/libqb.la
bms_SOURCES = bms.c $(top_builddir)/include/qb/qbipcs.h
bms_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bms_LDADD = -lrt $(top_builddir)/lib/libqb.la
rbwriter_SOURCES = rbwriter.c $(top_builddir)/include/qb/qbrb.h
rbwriter_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
rbwriter_LDADD = -lrt $(top_builddir)/lib/libqb.la
rbreader_SOURCES = rbreader.c $(top_builddir)/include/qb/qbrb.h
rbreader_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
rbreader_LDADD = -lrt $(top_builddir)/lib/libqb.la
+loop_SOURCES = loop.c $(top_builddir)/include/qb/qbloop.h
+loop_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
+loop_LDADD = -lrt $(top_builddir)/lib/libqb.la
+
+
if HAVE_CHECK
TESTS = check_rb check_ipc
check_PROGRAMS = check_rb check_ipc
check_rb_SOURCES = check_rb.c $(top_builddir)/include/qb/qbrb.h
check_rb_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
check_rb_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
check_ipc_SOURCES = check_ipc.c $(top_builddir)/include/qb/qbipcc.h $(top_builddir)/include/qb/qbipcs.h
check_ipc_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
check_ipc_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
endif
diff --git a/tests/bms.c b/tests/bms.c
index 0c64189..e891d6d 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -1,192 +1,192 @@
/*
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake <sdake@redhat.com>
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#include <stdarg.h>
#include <sched.h>
#include <qb/qbutil.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include <qb/qbipcs.h>
int32_t blocking = 1;
int32_t verbose = 0;
-static qb_handle_t bms_poll_handle;
+static qb_loop_t *bms_loop;
static qb_ipcs_service_pt s1;
static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
#if 0
if (uid == 0 && gid == 0) {
if (verbose) {
printf("%s:%d %s authenticated connection\n",
__FILE__, __LINE__, __func__);
}
return 1;
}
printf("%s:%d %s() BAD user!\n", __FILE__, __LINE__, __func__);
return 0;
#else
return 0;
#endif
}
static void s1_connection_created_fn(qb_ipcs_connection_t *c)
{
if (verbose) {
printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
}
}
static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
{
if (verbose) {
printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
}
}
static void s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response;
ssize_t res;
if (verbose > 2) {
printf("%s:%d %s > msg:%d, size:%d\n",
__FILE__, __LINE__, __func__,
req_pt->id, req_pt->size);
}
response.size = sizeof(struct qb_ipc_response_header);
response.id = 13;
response.error = 0;
if (blocking == 1) {
res = qb_ipcs_response_send(c, &response,
sizeof(response));
if (res < 0) {
perror("qb_ipcs_response_send");
}
}
}
static void ipc_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
fprintf(stderr, "%s:%d [%d] %s\n", file_name, file_line, severity, msg);
}
static void sigusr1_handler(int32_t num)
{
printf("%s(%d)\n", __func__, num);
qb_ipcs_destroy(s1);
exit(0);
}
static void show_usage(const char *name)
{
printf("usage: \n");
printf("%s <options>\n", name);
printf("\n");
printf(" options:\n");
printf("\n");
printf(" -n non-blocking ipc (default blocking)\n");
printf(" -v verbose\n");
printf(" -h show this help text\n");
printf(" -m use shared memory\n");
printf(" -p use posix message queues\n");
printf(" -s use sysv message queues\n");
printf("\n");
}
int32_t main(int32_t argc, char *argv[])
{
const char *options = "nvhmps";
int32_t opt;
enum qb_ipc_type ipc_type = QB_IPC_SHM;
struct qb_ipcs_service_handlers sh = {
.connection_accept = s1_connection_accept_fn,
.connection_created = s1_connection_created_fn,
.msg_process = s1_msg_process_fn,
.connection_destroyed = s1_connection_destroyed_fn,
};
while ((opt = getopt(argc, argv, options)) != -1) {
switch (opt) {
case 'm':
ipc_type = QB_IPC_SHM;
break;
case 's':
ipc_type = QB_IPC_SYSV_MQ;
break;
case 'p':
ipc_type = QB_IPC_POSIX_MQ;
break;
case 'n': /* non-blocking */
blocking = 0;
break;
case 'v':
verbose++;
break;
case 'h':
default:
show_usage(argv[0]);
exit(0);
break;
}
}
signal(SIGINT, sigusr1_handler);
signal(SIGILL, sigusr1_handler);
signal(SIGTERM, sigusr1_handler);
qb_util_set_log_function(ipc_log_fn);
- bms_poll_handle = qb_poll_create();
+ bms_loop = qb_loop_create();
s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
if (s1 == 0) {
perror("qb_ipcs_create");
exit(1);
}
- qb_ipcs_run(s1, bms_poll_handle);
- qb_poll_run(bms_poll_handle);
+ qb_ipcs_run(s1, bms_loop);
+ qb_loop_run(bms_loop);
return EXIT_SUCCESS;
}
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 6a44adc..19d013b 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -1,363 +1,366 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <syslog.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <signal.h>
#include <check.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#define IPC_NAME "ipc_test"
#define MAX_MSG_SIZE (8192*16)
static qb_ipcc_connection_t *conn;
static enum qb_ipc_type ipc_type;
enum my_msg_ids {
IPC_MSG_REQ_TX_RX,
IPC_MSG_RES_TX_RX,
IPC_MSG_REQ_DISPATCH,
IPC_MSG_RES_DISPATCH
};
/* Test Cases
*
* 1) basic send & recv differnet message sizes
*
* 2) send message to start dispatch (confirm receipt)
*
* 3) flow control
*
* 4) authentication
*
* 5) thread safety
*
* 6) cleanup
*
* 7) service availabilty
*
* 8) multiple services
*/
-static qb_handle_t bms_poll_handle;
+static qb_loop_t *my_loop;
static qb_ipcs_service_pt s1;
static void sigterm_handler(int32_t num)
{
qb_ipcs_destroy(s1);
- qb_poll_stop(bms_poll_handle);
+ qb_loop_stop(my_loop);
exit(0);
}
static void s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response;
ssize_t res;
if (req_pt->id == IPC_MSG_REQ_TX_RX) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_TX_RX;
response.error = 0;
res = qb_ipcs_response_send(c, &response,
sizeof(response));
if (res < 0) {
perror("qb_ipcs_response_send");
}
} else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
if (res < 0) {
perror("qb_ipcs_dispatch_send");
}
}
}
static void ipc_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
if (severity < LOG_INFO)
fprintf(stderr, "%s:%d [%d] %s\n", file_name, file_line, severity, msg);
}
static void run_ipc_server(void)
{
int32_t res;
struct qb_ipcs_service_handlers sh = {
.connection_accept = NULL,
.connection_created = NULL,
.msg_process = s1_msg_process_fn,
.connection_destroyed = NULL,
};
signal(SIGTERM, sigterm_handler);
- bms_poll_handle = qb_poll_create();
+ my_loop = qb_loop_create();
s1 = qb_ipcs_create(IPC_NAME, 4, ipc_type, &sh);
fail_if(s1 == 0);
- res = qb_ipcs_run(s1, bms_poll_handle);
+ res = qb_ipcs_run(s1, my_loop);
ck_assert_int_eq(res, 0);
- qb_poll_run(bms_poll_handle);
+ qb_loop_run(my_loop);
}
static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void))
{
pid_t pid = fork ();
if (pid == -1) {
fprintf (stderr, "Can't fork\n");
return -1;
}
if (pid == 0) {
run_ipc_server_fn();
return 0;
}
return pid;
}
static int32_t stop_process(pid_t pid)
{
kill(pid, SIGTERM);
waitpid(pid, NULL, 0);
return 0;
}
-
-static char buffer[1024 * 1024];
+#define IPC_BUF_SIZE (1024 * 1024)
+static char buffer[IPC_BUF_SIZE];
static int32_t bmc_send_nozc(uint32_t size)
{
struct qb_ipc_request_header *req_header = (struct qb_ipc_request_header *)buffer;
struct qb_ipc_response_header res_header;
int32_t res;
req_header->id = IPC_MSG_REQ_TX_RX;
req_header->size = sizeof(struct qb_ipc_request_header) + size;
repeat_send:
res = qb_ipcc_send(conn, req_header, req_header->size);
if (res < 0) {
if (res == -EAGAIN || res == -ENOMEM) {
goto repeat_send;
} else if (res == -EINVAL || res == -EINTR) {
perror("qb_ipcc_send");
return -1;
} else {
errno = -res;
perror("qb_ipcc_send");
goto repeat_send;
}
}
repeat_recv:
res = qb_ipcc_recv(conn,
&res_header,
sizeof(struct qb_ipc_response_header));
if (res == -EAGAIN) {
goto repeat_recv;
}
if (res == -EINTR) {
return -1;
}
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header.id, IPC_MSG_RES_TX_RX);
ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
return 0;
}
static void test_ipc_txrx(void)
{
int32_t j;
int32_t c = 0;
size_t size;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
for (j = 1; j < 19; j++) {
size = (10 * j * j * j) + sizeof(struct qb_ipc_request_header);
if (size >= MAX_MSG_SIZE)
break;
if (bmc_send_nozc(size) == -1) {
break;
}
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_txrx_shm)
{
ipc_type = QB_IPC_SHM;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_pmq)
{
ipc_type = QB_IPC_POSIX_MQ;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_smq)
{
ipc_type = QB_IPC_SYSV_MQ;
test_ipc_txrx();
}
END_TEST
static void test_ipc_dispatch(void)
{
int32_t res;
int32_t j;
int32_t c = 0;
pid_t pid;
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header *res_header = (struct qb_ipc_response_header*)buffer;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
req_header.id = IPC_MSG_REQ_DISPATCH;
req_header.size = sizeof(struct qb_ipc_request_header);
repeat_send:
res = qb_ipcc_send(conn, &req_header, req_header.size);
if (res < 0) {
if (res == -EAGAIN || res == -ENOMEM) {
goto repeat_send;
} else if (res == -EINVAL || res == -EINTR) {
perror("qb_ipcc_send");
return;
} else {
errno = -res;
perror("qb_ipcc_send");
goto repeat_send;
}
}
repeat_event_recv:
res = qb_ipcc_event_recv(conn, res_header, IPC_BUF_SIZE, 0);
if (res < 0) {
if (res == -EAGAIN) {
goto repeat_event_recv;
} else {
errno = -res;
perror("qb_ipcc_send");
goto repeat_send;
}
}
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header->id, IPC_MSG_RES_DISPATCH);
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_disp_shm)
{
ipc_type = QB_IPC_SHM;
test_ipc_dispatch();
}
END_TEST
static Suite *ipc_suite(void)
{
TCase *tc;
+ uid_t uid;
Suite *s = suite_create("ipc");
tc = tcase_create("ipc_txrx_shm");
tcase_add_test(tc, test_ipc_txrx_shm);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
-#if 0
- tc = tcase_create("ipc_txrx_posix_mq");
- tcase_add_test(tc, test_ipc_txrx_pmq);
- tcase_set_timeout(tc, 10);
- suite_add_tcase(s, tc);
-#endif
- tc = tcase_create("ipc_txrx_sysv_mq");
- tcase_add_test(tc, test_ipc_txrx_smq);
- tcase_set_timeout(tc, 10);
- suite_add_tcase(s, tc);
+ uid = geteuid();
+ if (uid == 0) {
+ tc = tcase_create("ipc_txrx_posix_mq");
+ tcase_add_test(tc, test_ipc_txrx_pmq);
+ tcase_set_timeout(tc, 10);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("ipc_txrx_sysv_mq");
+ tcase_add_test(tc, test_ipc_txrx_smq);
+ tcase_set_timeout(tc, 10);
+ suite_add_tcase(s, tc);
+ }
tc = tcase_create("ipc_dispatch_shm");
tcase_add_test(tc, test_ipc_disp_shm);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
return s;
}
int32_t main(void)
{
int32_t number_failed;
Suite *s = ipc_suite();
SRunner *sr = srunner_create(s);
qb_util_set_log_function(ipc_log_fn);
srunner_run_all(sr, CK_NORMAL);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/check_rb.c b/tests/check_rb.c
index c35bebb..11c2097 100644
--- a/tests/check_rb.c
+++ b/tests/check_rb.c
@@ -1,227 +1,229 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
#include <errno.h>
#include <check.h>
+
+#include <qb/qbdefs.h>
#include <qb/qbrb.h>
#include <qb/qbipc_common.h>
#include <qb/qbutil.h>
START_TEST(test_ring_buffer1)
{
char my_buf[512];
struct qb_ipc_request_header *hdr;
char *str;
qb_ringbuffer_t *rb;
int32_t i;
int32_t b;
ssize_t actual;
ssize_t avail;
rb = qb_rb_open("test1", 200, QB_RB_FLAG_CREATE);
fail_if(rb == NULL);
for (b = 0; b < 3; b++) {
hdr = (struct qb_ipc_request_header *) my_buf;
str = my_buf + sizeof(struct qb_ipc_request_header);
for (i = 0; i < 900; i++) {
hdr->id = __LINE__ + i;
hdr->size =
sprintf(str, "ID: %d (%s + i(%d)) -- %s-%s!",
hdr->id, "actually the line number", i,
__func__, __FILE__) + 1;
hdr->size += sizeof(struct qb_ipc_request_header);
avail = qb_rb_space_free(rb);
actual = qb_rb_chunk_write(rb, hdr, hdr->size);
if (avail < (hdr->size + (2 * sizeof(uint32_t)))) {
ck_assert_int_eq(actual, -ENOMEM);
} else {
ck_assert_int_eq(actual, hdr->size);
}
}
memset(my_buf, 0, sizeof(my_buf));
hdr = (struct qb_ipc_request_header *) my_buf;
str = my_buf + sizeof(struct qb_ipc_request_header);
for (i = 0; i < 15; i++) {
actual = qb_rb_chunk_read(rb, hdr, 512, 0);
if (actual < 0) {
ck_assert_int_eq(0, qb_rb_chunks_used(rb));
break;
}
str[actual - sizeof(struct qb_ipc_request_header)] = '\0';
ck_assert_int_eq(actual, hdr->size);
}
}
qb_rb_close(rb, QB_FALSE);
}
END_TEST
/*
* nice size (int64)
*/
START_TEST(test_ring_buffer2)
{
qb_ringbuffer_t *t;
int32_t i;
int64_t v = 7891034;
int64_t *new_data;
ssize_t l;
t = qb_rb_open("test2", 200 * sizeof(int64_t), QB_RB_FLAG_CREATE);
fail_if(t == NULL);
for (i = 0; i < 200; i++) {
l = qb_rb_chunk_write(t, &v, sizeof(v));
ck_assert_int_eq(l, sizeof(v));
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l < 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, sizeof(v));
fail_unless(v == *new_data);
qb_rb_chunk_reclaim(t);
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_write(t, &v, sizeof(v));
ck_assert_int_eq(l, sizeof(v));
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l == 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, sizeof(v));
fail_unless(v == *new_data);
qb_rb_chunk_reclaim(t);
}
qb_rb_close(t, QB_FALSE);
}
END_TEST
/*
* odd size (10)
*/
START_TEST(test_ring_buffer3)
{
qb_ringbuffer_t *t;
int32_t i;
char v[] = "1234567891";
char out[32];
ssize_t l;
size_t len = strlen(v) + 1;
t = qb_rb_open("test3", 10, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE);
fail_if(t == NULL);
for (i = 0; i < 9000; i++) {
l = qb_rb_chunk_write(t, v, len);
ck_assert_int_eq(l, len);
}
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_read(t, (void *)out, 32, 0);
if (l < 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, len);
ck_assert_str_eq(v, out);
}
qb_rb_close(t, QB_FALSE);
}
END_TEST
START_TEST(test_ring_buffer4)
{
qb_ringbuffer_t *t;
char data[] = "1234567891";
int32_t i;
char *new_data;
ssize_t l;
t = qb_rb_open("test4", 10, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE);
fail_if(t == NULL);
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_write(t, data, strlen(data));
ck_assert_int_eq(l, strlen(data));
if (i == 0) {
data[0] = 'b';
}
}
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l == 0) {
break;
}
ck_assert_int_eq(l, strlen(data));
qb_rb_chunk_reclaim(t);
}
qb_rb_close(t, QB_FALSE);
}
END_TEST
static Suite *rb_suite(void)
{
TCase *tc_load;
Suite *s = suite_create("ringbuffer");
tc_load = tcase_create("test01");
tcase_add_test(tc_load, test_ring_buffer1);
tcase_add_test(tc_load, test_ring_buffer2);
tcase_add_test(tc_load, test_ring_buffer3);
tcase_add_test(tc_load, test_ring_buffer4);
suite_add_tcase(s, tc_load);
return s;
}
static void libqb_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
if (severity < LOG_INFO)
printf("libqb: %s:%d %s\n", file_name, file_line, msg);
}
int32_t main(void)
{
int32_t number_failed;
Suite *s = rb_suite();
SRunner *sr = srunner_create(s);
qb_util_set_log_function(libqb_log_fn);
srunner_run_all(sr, CK_NORMAL);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/loop.c b/tests/loop.c
new file mode 100644
index 0000000..182cf63
--- /dev/null
+++ b/tests/loop.c
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+#include <sys/poll.h>
+
+#include <qb/qbloop.h>
+
+static struct qb_loop *l;
+static qb_loop_timer_handle th;
+
+static void job_3_9(void *data) { printf("%s\n", __func__); }
+static void job_1_2(void *data) { printf("%s\n", __func__); }
+static void job_2_4(void *data) { printf("%s\n", __func__); }
+static void job_3_5(void *data) { printf("%s\n", __func__); }
+static void job_3_6(void *data) { printf("%s\n", __func__); }
+static void job_1_1(void *data) { printf("%s\n", __func__); }
+static void job_3_7(void *data) { printf("%s\n", __func__); }
+static void job_2_3(void *data) { printf("%s\n", __func__); }
+static void job_2_8(void *data) { printf("%s\n", __func__); }
+static void job_1_9(void *data) { printf("%s\n", __func__); }
+
+static void more_important_jobs(void *data)
+{
+ printf("%s\n", __func__);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_2);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_9);
+}
+
+static void more_jobs(void *data)
+{
+ printf("%s\n", __func__);
+ qb_loop_timer_add(l, QB_LOOP_HIGH, 3109, NULL, job_1_1, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_7);
+ qb_loop_timer_add(l, QB_LOOP_LOW, 1000, NULL, more_important_jobs, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_7);
+ qb_loop_timer_add(l, QB_LOOP_LOW, 2341, NULL, job_3_7, &th);
+ qb_loop_timer_add(l, QB_LOOP_LOW, 900, NULL, job_3_6, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_5);
+ qb_loop_timer_add(l, QB_LOOP_MED, 4000, NULL, more_jobs, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_9);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_9);
+ qb_loop_job_add(l, QB_LOOP_MED, NULL, job_2_3);
+}
+
+static int32_t read_stdin(int32_t fd, int32_t revents, void *data)
+{
+ char buf[100];
+ ssize_t len = read(fd, buf, 100);
+ buf[len-1] = '\0';
+ printf("typed > \"%s\"\n", buf);
+ if (strcmp(buf, "more") == 0) {
+ more_jobs(NULL);
+ }
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_9);
+ return 0;
+}
+
+int main(int argc, char * argv[])
+{
+
+ l = qb_loop_create();
+
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_9);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_2_4);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_2);
+ qb_loop_job_add(l, QB_LOOP_MED, NULL, job_3_7);
+// qb_loop_timer_add(l, QB_LOOP_HIGH, 40, NULL, more_jobs, &th);
+ qb_loop_job_add(l, QB_LOOP_MED, NULL, job_2_8);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_6);
+
+ qb_loop_poll_add(l, QB_LOOP_LOW, 0, POLLIN | POLLPRI | POLLNVAL,
+ NULL, read_stdin);
+
+ qb_loop_run(l);
+ return 0;
+}
+
+

File Metadata

Mime Type
text/x-diff
Expires
Thu, Feb 27, 3:29 AM (1 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1466108
Default Alt Text
(131 KB)

Event Timeline