Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3156065
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
131 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/docs/Makefile.am b/docs/Makefile.am
index 6dfd264..a2005eb 100644
--- a/docs/Makefile.am
+++ b/docs/Makefile.am
@@ -1,46 +1,46 @@
# Copyright (C) 2010 Red Hat, Inc.
#
# Authors: Angus Salkeld <asalkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
MAINTAINERCLEANFILES = Makefile.in
EXTRA_DIST = man.dox html.dox
if HAVE_DOXYGEN
inc_dir = $(top_srcdir)/include/qb
dependant_headers = $(wildcard $(inc_dir)/qb*.h)
dist_man_MANS = man3/qbipcc.h.3 man3/qbipcs.h.3 \
man3/qbhdb.h.3 man3/qbipc_common.h.3 man3/qblist.h.3 \
- man3/qbpoll.h.3 man3/qbtimer.h.3 man3/qbutil.h.3
+ man3/qbloop.h.3 man3/qbtimer.h.3 man3/qbutil.h.3
$(dist_man_MANS): man.dox $(dependant_headers)
@mkdir -p man3
@doxygen man.dox
doxygen: html.dox
@mkdir -p html
@doxygen html.dox
else
doxygen:
@echo WARNING: no doxygen to build man pages!
endif
clean-generic:
rm -rf html man3
diff --git a/include/qb/Makefile.am b/include/qb/Makefile.am
index 28a3a74..4cc1b3e 100644
--- a/include/qb/Makefile.am
+++ b/include/qb/Makefile.am
@@ -1,26 +1,27 @@
# Copyright (c) 2010 Red Hat, Inc.
#
# Authors: Angus Salkeld <asalkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
#
MAINTAINERCLEANFILES = Makefile.in
instdir = $(includedir)/qb/
-inst_HEADERS = qbhdb.h qblist.h qbpoll.h qbtimer.h \
- qbipcc.h qbipcs.h qbipc_common.h \
- qbrb.h qbutil.h qbdefs.h
+inst_HEADERS = qbhdb.h qblist.h qbdefs.h qbtimer.h \
+ qbloop.h qbrb.h qbutil.h \
+ qbipcc.h qbipcs.h qbipc_common.h
+
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 727234d..6d98471 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -1,155 +1,155 @@
/*
* Copyright (C) 2006-2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>,
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPCS_H_DEFINED
#define QB_IPCS_H_DEFINED
#include <stdlib.h>
#include <qb/qbipc_common.h>
#include <qb/qbhdb.h>
/* *INDENT-OFF* */
#ifdef __cplusplus
extern "C" {
#endif
/* *INDENT-ON* */
struct qb_ipcs_connection;
typedef struct qb_ipcs_connection qb_ipcs_connection_t;
typedef qb_handle_t qb_ipcs_service_pt;
typedef int32_t (*qb_ipcs_dispatch_fn_t) (qb_ipcs_service_pt s, int32_t fd, int32_t revents,
void *data);
typedef int32_t (*qb_ipcs_dispatch_add_fn)(qb_ipcs_service_pt s, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_rm_fn)(qb_ipcs_service_pt s, int32_t fd, int32_t events,
void *data, qb_ipcs_dispatch_fn_t fn);
struct qb_ipcs_poll_handlers {
qb_ipcs_dispatch_add_fn dispatch_add;
qb_ipcs_dispatch_rm_fn dispatch_rm;
};
/**
* This callback is to check wether you want to accept a new connection.
*
* The type of checks you should do are authentication, service availabilty
* or process resource constraints.
* @return 0 to accept or -errno to indicate a failure (sent back to the client)
*/
typedef int32_t (*qb_ipcs_connection_accept_fn) (qb_ipcs_connection_t *c, uid_t uid, gid_t gid);
/**
* This is called after a new connection has been created.
*/
typedef void (*qb_ipcs_connection_created_fn) (qb_ipcs_connection_t *c);
/**
* This is called after a connection has been destroyed.
*/
typedef void (*qb_ipcs_connection_destroyed_fn) (qb_ipcs_connection_t *c);
/**
* This is the message processing calback.
* It is called with the message data.
*/
typedef void (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c,
void *data, size_t size);
struct qb_ipcs_service_handlers {
qb_ipcs_connection_accept_fn connection_accept;
qb_ipcs_connection_created_fn connection_created;
qb_ipcs_msg_process_fn msg_process;
qb_ipcs_connection_destroyed_fn connection_destroyed;
};
/**
* Create a new IPC server.
*/
qb_ipcs_service_pt qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers);
/**
* Set your callbacks.
*/
void qb_ipcs_service_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_service_handlers *handlers);
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_poll_handlers *handlers);
/**
* run the new IPC server.
*/
-int32_t qb_ipcs_run(qb_ipcs_service_pt s, qb_handle_t poll);
+int32_t qb_ipcs_run(qb_ipcs_service_pt s, void *loop_pt);
/**
* Destroy the IPC server.
*/
void qb_ipcs_destroy(qb_ipcs_service_pt s);
/**
* send a response to a incomming request.
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Increment the connection's reference counter.
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
/* *INDENT-OFF* */
#ifdef __cplusplus
}
#endif
/* *INDENT-ON* */
#endif /* QB_IPCS_H_DEFINED */
diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h
new file mode 100644
index 0000000..6d1bf6f
--- /dev/null
+++ b/include/qb/qbloop.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef QB_LOOP_DEFINED
+#define QB_LOOP_DEFINED
+
+/*
+ * Main loop functions
+ */
+enum qb_loop_priority {
+ QB_LOOP_LOW = 0,
+ QB_LOOP_MED = 1,
+ QB_LOOP_HIGH = 2,
+};
+
+typedef struct qb_loop qb_loop_t;
+
+qb_loop_t * qb_loop_create(void);
+void qb_loop_stop(qb_loop_t *l);
+void qb_loop_run(qb_loop_t *l);
+
+
+/*
+ * Job API
+ */
+
+typedef void *qb_loop_job_handle;
+
+typedef void (*qb_loop_job_dispatch_fn)(void *data);
+
+int32_t qb_loop_job_add(qb_loop_t *l,
+ enum qb_loop_priority p,
+ void *data,
+ qb_loop_job_dispatch_fn dispatch_fn);
+
+/*
+ * Timer API
+ */
+
+typedef void *qb_loop_timer_handle;
+#define qb_poll_timer_handle qb_loop_timer_handle
+
+typedef void (*qb_loop_timer_dispatch_fn)(void *data);
+
+int32_t qb_loop_timer_add(qb_loop_t *l,
+ enum qb_loop_priority p,
+ int32_t msec_duration,
+ void *data,
+ qb_loop_timer_dispatch_fn timer_fn,
+ qb_loop_timer_handle * timer_handle_out);
+
+int32_t qb_loop_timer_del(qb_loop_t *l, qb_loop_timer_handle th);
+
+/*
+ * Poll API
+ */
+typedef void (*qb_loop_poll_low_fds_event_fn) (int32_t not_enough, int32_t fds_available);
+int32_t qb_loop_poll_low_fds_event_set(
+ qb_loop_t *loop,
+ qb_loop_poll_low_fds_event_fn fn);
+
+typedef int32_t (*qb_loop_poll_dispatch_fn) (int32_t fd, int32_t revents, void *data);
+
+
+int32_t qb_loop_poll_add(qb_loop_t *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ void *data,
+ qb_loop_poll_dispatch_fn dispatch_fn);
+
+int32_t qb_loop_poll_mod(qb_loop_t *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ qb_loop_poll_dispatch_fn dispatch_fn);
+
+int32_t qb_loop_poll_del(qb_loop_t *l, int32_t fd);
+
+#endif /* QB_LOOP_DEFINED */
+
diff --git a/include/qb/qbpoll.h b/include/qb/qbpoll.h
deleted file mode 100644
index 009ca3c..0000000
--- a/include/qb/qbpoll.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (C) 2003-2010 Red Hat, Inc.
- *
- * Author: Steven Dake <sdake@redhat.com>
- *
- * This file is part of libqb.
- *
- * libqb is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 2.1 of the License, or
- * (at your option) any later version.
- *
- * libqb is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with libqb. If not, see <http://www.gnu.org/licenses/>.
- */
-#ifndef QB_POLL_H_DEFINED
-#define QB_POLL_H_DEFINED
-
-#include <qb/qbhdb.h>
-#include <pthread.h>
-
-/* *INDENT-OFF* */
-#ifdef __cplusplus
-extern "C" {
-#endif
-/* *INDENT-ON* */
-
-typedef void (*qb_poll_low_fds_event_fn) (int32_t not_enough, int32_t fds_available);
-
-typedef void *qb_poll_timer_handle;
-typedef void *qb_poll_job_handle;
-
-/**
- * return < 0 for removal
- * return == 0 for no-op
- * return > 0 for work done
- */
-typedef int32_t (*qb_poll_job_execute_fn_t) (void *data);
-
-qb_handle_t qb_poll_create(void);
-
-int32_t qb_poll_destroy(qb_handle_t hdb_handle);
-
-int32_t qb_poll_dispatch_add(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- void *data,
- int32_t (*dispatch_fn) (qb_handle_t handle,
- int32_t fd, int32_t revents, void *data));
-
-int32_t qb_poll_dispatch_modify(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- int32_t (*dispatch_fn) (qb_handle_t hdb_handle_t,
- int32_t fd,
- int32_t revents, void *data));
-
-int32_t qb_poll_dispatch_delete(qb_handle_t handle, int32_t fd);
-
-int32_t qb_poll_low_fds_event_set(
- qb_handle_t handle,
- qb_poll_low_fds_event_fn fn);
-
-int32_t qb_poll_timer_add(qb_handle_t handle,
- int32_t msec_in_future, void *data,
- void (*timer_fn) (void *data),
- qb_poll_timer_handle * timer_handle_out);
-
-int32_t qb_poll_timer_delete(qb_handle_t handle, qb_poll_timer_handle timer_handle);
-
-int32_t qb_poll_job_add(qb_handle_t handle,
- void *data,
- qb_poll_job_execute_fn_t execute_fn,
- qb_poll_job_handle * handle_out);
-int32_t qb_poll_job_delete(qb_handle_t poll_handle, qb_poll_job_handle job_handle);
-
-int32_t qb_poll_run(qb_handle_t handle);
-
-int32_t qb_poll_stop(qb_handle_t handle);
-
-/* *INDENT-OFF* */
-#ifdef __cplusplus
-}
-#endif
-/* *INDENT-ON* */
-
-#endif /* QB_POLL_H_DEFINED */
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 6758208..63ca286 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -1,47 +1,48 @@
#
# Copyright (C) 2010 Red Hat, Inc.
#
# Author: Angus Salkeld <asaslkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
MAINTAINERCLEANFILES = Makefile.in
-noinst_HEADERS = ipc_int.h util_int.h ringbuffer_int.h
+noinst_HEADERS = ipc_int.h util_int.h ringbuffer_int.h loop_int.h
#
# Here are a set of rules to help you update your library version information:
# Start with version information of ‘0:0:0’ for each libtool library.
# Update the version information only immediately before a public release of your software.
# More frequent updates are unnecessary, and only guarantee that the current interface number gets larger faster.
# If the library source code has changed at all since the last update, then increment revision (‘c:r:a’ becomes ‘c:r+1:a’).
# If any interfaces have been added, removed, or changed since the last update, increment current, and set revision to 0.
# If any interfaces have been added since the last public release, then increment age.
# If any interfaces have been removed since the last public release, then set age to 0.
#
lib_LTLIBRARIES = libqb.la
libqb_la_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
libqb_la_LDFLAGS = -version-info 0:0:0
-libqb_la_SOURCES = util.c poll.c timer.c hdb.c ringbuffer.c ringbuffer_helper.c \
+libqb_la_SOURCES = util.c hdb.c ringbuffer.c ringbuffer_helper.c timer.c \
+ loop.c loop_poll.c loop_timer.c loop_job.c \
ipcc.c ipcs.c ipc_posix_mq.c ipc_sysv_mq.c ipc_shm.c ipc_us.c
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libqb.pc
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 76cdb41..9555d48 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -1,191 +1,189 @@
/*
* Copyright (C) 2009 Red Hat, Inc.
*
* Author: Steven Dake <sdake@redhat.com>
* Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QB_IPC_INT_H_DEFINED
#define QB_IPC_INT_H_DEFINED
#include <unistd.h>
#include "config.h"
#include <dirent.h>
#include <mqueue.h>
#include <qb/qblist.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
#include <qb/qbipc_common.h>
#include <qb/qbrb.h>
/*
* Darwin claims to support process shared synchronization
* but it really does not. The unistd.h header file is wrong.
*/
#if defined(QB_DARWIN) || defined(__UCLIBC__)
#undef _POSIX_THREAD_PROCESS_SHARED
#define _POSIX_THREAD_PROCESS_SHARED -1
#endif
#ifndef _POSIX_THREAD_PROCESS_SHARED
#define _POSIX_THREAD_PROCESS_SHARED -1
#endif
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
#endif
/*
Client Server
SEND CONN REQ ->
ACCEPT & CREATE queues
or DENY
<- SEND ACCEPT(with details)/DENY
*/
struct qb_ipc_connection_request {
struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
uint32_t max_msg_size __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
struct qb_ipc_connection_response {
struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
int32_t connection_type __attribute__ ((aligned(8)));
uint32_t max_msg_size __attribute__ ((aligned(8)));
char request[PATH_MAX] __attribute__ ((aligned(8)));
char response[PATH_MAX] __attribute__ ((aligned(8)));
char event[PATH_MAX] __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
struct qb_ipcc_connection;
struct qb_ipc_one_way {
size_t max_msg_size;
union {
struct {
mqd_t q;
char name[NAME_MAX];
} pmq;
struct {
int32_t q;
int32_t key;
} smq;
struct {
qb_ringbuffer_t *rb;
} shm;
} u;
};
struct qb_ipcc_funcs {
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec *iov, size_t iov_len);
void (*disconnect)(struct qb_ipcc_connection* c);
};
struct qb_ipcc_connection {
char name[NAME_MAX];
enum qb_ipc_type type;
int32_t needs_sock_for_poll;
int32_t sock;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
char *receive_buf;
};
int32_t qb_ipc_us_send(int32_t s, const void *msg, size_t len);
int32_t qb_ipc_us_recv (int32_t s, void *msg, size_t len);
int32_t qb_ipc_us_recv_ready(int32_t s, int32_t ms_timeout);
int32_t qb_ipcc_us_connect(const char *socket_name, int32_t *sock_pt);
void qb_ipcc_us_disconnect (int32_t sock);
int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_soc_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
struct qb_ipcs_funcs {
void (*destroy)(struct qb_ipcs_service *s);
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
ssize_t (*recv)(struct qb_ipc_one_way *one_way, void *buf, size_t buf_size, int32_t timeout);
ssize_t (*send)(struct qb_ipc_one_way *one_way, const void *data, size_t size);
ssize_t (*sendv)(struct qb_ipc_one_way *one_way, const struct iovec* iov, size_t iov_len);
};
struct qb_ipcs_service {
enum qb_ipc_type type;
char name[NAME_MAX];
int32_t service_id;
pid_t pid;
int32_t needs_sock_for_poll;
int32_t server_sock;
- qb_handle_t poll_handle;
+ void* loop_pt;
struct qb_ipcs_service_handlers serv_fns;
struct qb_ipcs_poll_handlers poll_fns;
struct qb_ipcs_funcs funcs;
struct qb_list_head connections;
};
struct qb_ipcs_connection {
int32_t refcount;
pid_t pid;
uid_t euid;
gid_t egid;
int32_t sock;
struct qb_ipc_one_way request;
struct qb_ipc_one_way response;
struct qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
char *receive_buf;
void *context;
};
int32_t qb_ipcs_pmq_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_soc_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_smq_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_shm_create(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_publish(struct qb_ipcs_service *s);
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
-int32_t qb_ipcs_dispatch_connection_request(qb_handle_t hdb_handle_t,
- int32_t fd, int32_t revents, void *data);
-int32_t qb_ipcs_dispatch_service_request(qb_handle_t hdb_handle_t,
- int32_t fd, int32_t revents, void *data);
+int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
+int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
void qb_ipcs_disconnect(struct qb_ipcs_connection *c);
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipc_posix_mq.c b/lib/ipc_posix_mq.c
index 2616f33..3cb0f08 100644
--- a/lib/ipc_posix_mq.c
+++ b/lib/ipc_posix_mq.c
@@ -1,418 +1,419 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <mqueue.h>
#include <sys/resource.h>
#include "ipc_int.h"
#include "util_int.h"
#include <qb/qbdefs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#define QB_REQUEST_Q_LEN 3
#define QB_RESPONSE_Q_LEN 1
#define QB_EVENT_Q_LEN 3
static size_t q_space_used = 0;
#ifdef QB_LINUX
#define QB_RLIMIT_CHANGE_NEEDED 1
#endif /* QB_LINUX */
/*
* utility functions
* --------------------------------------------------------
*/
static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t q_len)
{
int32_t res = 0;
#ifdef QB_RLIMIT_CHANGE_NEEDED
struct rlimit rlim;
size_t q_space_needed;
#endif /* QB_RLIMIT_CHANGE_NEEDED */
#ifdef QB_RLIMIT_CHANGE_NEEDED
if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno;
qb_util_log(LOG_ERR, "getrlimit failed");
return res;
}
q_space_needed = q_space_used + (max_msg_size * q_len * 4 / 3);
qb_util_log(LOG_DEBUG, "rlimit:%d needed:%zu used:%zu",
(int)rlim.rlim_cur, q_space_needed, q_space_used);
if (rlim.rlim_cur < q_space_needed) {
rlim.rlim_cur = q_space_needed;
}
if (rlim.rlim_max < q_space_needed) {
rlim.rlim_max = q_space_needed;
}
if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno;
qb_util_log(LOG_ERR, "setrlimit failed");
}
#endif /* QB_RLIMIT_CHANGE_NEEDED */
return res;
}
static int32_t posix_mq_open(struct qb_ipc_one_way *one_way,
const char *name, size_t q_len)
{
int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len);
if (res != 0) {
return res;
}
one_way->u.pmq.q = mq_open(name, O_RDWR);
if (one_way->u.pmq.q == (mqd_t) - 1) {
res = -errno;
perror("mq_open");
return res;
}
strcpy(one_way->u.pmq.name, name);
q_space_used += one_way->max_msg_size * q_len;
return 0;
}
static int32_t posix_mq_create(struct qb_ipcs_connection *c,
struct qb_ipc_one_way *one_way,
const char *name, size_t q_len)
{
struct mq_attr attr;
mqd_t q = 0;
int32_t res = 0;
mode_t m = 0600;
size_t max_msg_size = one_way->max_msg_size;
res = posix_mq_increase_limits(max_msg_size, q_len);
if (res != 0) {
return res;
}
try_smaller:
if (q != 0) {
max_msg_size = max_msg_size / 2;
q_len--;
}
// qb_util_log(LOG_DEBUG, "%s() max_msg_size:%zu q_len:%zu", __func__,
// max_msg_size, q_len);
attr.mq_flags = O_NONBLOCK;
attr.mq_maxmsg = q_len;
attr.mq_msgsize = max_msg_size;
q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr);
if (q == (mqd_t) - 1 && errno == ENOMEM) {
if (max_msg_size > 9000 && q_len > 3) {
goto try_smaller;
}
}
if (q == (mqd_t) - 1) {
res = -errno;
qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s",
name, strerror(errno));
return res;
}
q_space_used += max_msg_size * q_len;
one_way->max_msg_size = max_msg_size;
one_way->u.pmq.q = q;
strcpy(one_way->u.pmq.name, name);
res = fchown(q, c->euid, c->egid);
if (res == -1) {
res = -errno;
qb_util_log(LOG_ERR, "fchown:%s %s", name, strerror(errno));
mq_close(q);
mq_unlink(name);
}
return res;
}
static ssize_t qb_ipc_pmq_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1);
if (res != 0) {
return -errno;
}
return msg_len;
}
static ssize_t qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way,
const struct iovec* iov,
size_t iov_len)
{
int32_t total_size = 0;
int32_t i;
int32_t res = 0;
char *data = NULL;
char *pt = NULL;
for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len;
}
data = malloc(total_size);
pt = data;
for (i = 0; i < iov_len; i++) {
memcpy(pt, iov[i].iov_base, iov[i].iov_len);
pt += iov[i].iov_len;
}
res = mq_send(one_way->u.pmq.q, data, total_size, 1);
free(data);
if (res != 0) {
return -errno;
}
return total_size;
}
static ssize_t qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way,
void *msg_ptr,
size_t msg_len,
int32_t ms_timeout)
{
uint32_t msg_prio;
struct timespec ts_timeout;
ssize_t res;
if (ms_timeout >= 0) {
clock_gettime(CLOCK_REALTIME, &ts_timeout);
qb_timespec_add_ms(&ts_timeout, ms_timeout);
}
mq_receive_again:
if (ms_timeout >= 0) {
res = mq_timedreceive(one_way->u.pmq.q,
(char *)msg_ptr,
one_way->max_msg_size,
&msg_prio,
&ts_timeout);
} else {
res = mq_receive(one_way->u.pmq.q,
(char *)msg_ptr,
one_way->max_msg_size,
&msg_prio);
}
if (res == -1) {
switch (errno) {
case EINTR:
goto mq_receive_again;
break;
case EAGAIN:
res = -ETIMEDOUT;
break;
case ETIMEDOUT:
res = -errno;
break;
default:
res = -errno;
qb_util_log(LOG_ERR,
"error waiting for mq_timedreceive : %s",
strerror(errno));
break;
}
}
return res;
}
/*
* client functions
* --------------------------------------------------------
*/
static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_request_header hdr;
qb_util_log(LOG_DEBUG, "%s()\n", __func__);
if (c->needs_sock_for_poll) {
return;
}
hdr.id = QB_IPC_MSG_DISCONNECT;
hdr.size = sizeof(hdr);
mq_send(c->request.u.pmq.q, (const char *)&hdr, hdr.size, 30);
mq_close(c->event.u.pmq.q);
mq_close(c->response.u.pmq.q);
mq_close(c->request.u.pmq.q);
mq_unlink(c->event.u.pmq.name);
mq_unlink(c->request.u.pmq.name);
mq_unlink(c->response.u.pmq.name);
}
int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{
int32_t res = 0;
c->funcs.send = qb_ipc_pmq_send;
c->funcs.sendv = qb_ipc_pmq_sendv;
c->funcs.recv = qb_ipc_pmq_recv;
c->funcs.disconnect = qb_ipcc_pmq_disconnect;
#if defined(QB_LINUX) || defined(QB_BSD)
c->needs_sock_for_poll = QB_FALSE;
#else
c->needs_sock_for_poll = QB_TRUE;
#endif
if (strlen(c->name) > (NAME_MAX - 20)) {
return -EINVAL;
}
res = posix_mq_open(&c->request, response->request,
QB_REQUEST_Q_LEN);
if (res != 0) {
perror("mq_open:REQUEST");
return res;
}
res = posix_mq_open(&c->response, response->response,
QB_RESPONSE_Q_LEN);
if (res != 0) {
perror("mq_open:RESPONSE");
goto cleanup_request;
}
res = posix_mq_open(&c->event, response->event, QB_EVENT_Q_LEN);
if (res != 0) {
perror("mq_open:EVENT");
goto cleanup_request_response;
}
return 0;
cleanup_request_response:
mq_close(c->response.u.pmq.q);
cleanup_request:
mq_close(c->request.u.pmq.q);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static void qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c)
{
struct qb_ipc_response_header msg;
msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg);
msg.error = 0;
qb_ipc_pmq_send(&c->event, &msg, msg.size);
mq_close(c->event.u.pmq.q);
mq_close(c->response.u.pmq.q);
mq_close(c->request.u.pmq.q);
mq_unlink(c->event.u.pmq.name);
mq_unlink(c->request.u.pmq.name);
mq_unlink(c->response.u.pmq.name);
}
static void qb_ipcs_pmq_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
qb_util_log(LOG_DEBUG, "%s\n", __func__);
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
iter_next = iter->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
}
static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res = 0;
snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid);
snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid);
snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid);
res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN);
if (res < 0) {
goto cleanup;
}
res = posix_mq_create(c, &c->response, r->response, QB_RESPONSE_Q_LEN);
if (res < 0) {
goto cleanup_request;
}
res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN);
if (res < 0) {
goto cleanup_request_response;
}
if (!s->needs_sock_for_poll) {
- qb_poll_dispatch_add(s->poll_handle, c->request.u.pmq.q,
+ qb_loop_poll_add(s->loop_pt, QB_LOOP_HIGH,
+ c->request.u.pmq.q,
POLLIN | POLLPRI | POLLNVAL,
c, qb_ipcs_dispatch_service_request);
}
r->hdr.error = 0;
return 0;
cleanup_request_response:
mq_close(c->response.u.pmq.q);
mq_unlink(r->response);
cleanup_request:
mq_close(c->request.u.pmq.q);
mq_unlink(r->request);
cleanup:
r->hdr.error = res;
return res;
}
int32_t qb_ipcs_pmq_create(struct qb_ipcs_service * s)
{
s->funcs.recv = qb_ipc_pmq_recv;
s->funcs.send = qb_ipc_pmq_send;
s->funcs.sendv = qb_ipc_pmq_sendv;
s->funcs.connect = qb_ipcs_pmq_connect;
s->funcs.disconnect = qb_ipcs_pmq_disconnect;
s->funcs.destroy = qb_ipcs_pmq_destroy;
#if defined(QB_LINUX) || defined(QB_BSD)
s->needs_sock_for_poll = QB_FALSE;
#else
s->needs_sock_for_poll = QB_TRUE;
#endif
return 0;
}
diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c
index 7dd41c3..b679f04 100644
--- a/lib/ipc_shm.c
+++ b/lib/ipc_shm.c
@@ -1,274 +1,274 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "ipc_int.h"
#include "util_int.h"
#include <qb/qbdefs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include <qb/qbrb.h>
/*
* utility functions
* --------------------------------------------------------
*/
/*
* client functions
* --------------------------------------------------------
*/
static void qb_ipcc_shm_disconnect(struct qb_ipcc_connection *c)
{
qb_rb_close(c->request.u.shm.rb, QB_FALSE);
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
qb_rb_close(c->event.u.shm.rb, QB_FALSE);
}
static ssize_t qb_ipc_shm_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
return qb_rb_chunk_write(one_way->u.shm.rb, msg_ptr, msg_len);
}
static ssize_t qb_ipc_shm_sendv(struct qb_ipc_one_way *one_way,
const struct iovec* iov,
size_t iov_len)
{
char *dest;
int32_t res = 0;
int32_t total_size = 0;
int32_t i;
char *pt = NULL;
for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len;
}
dest = qb_rb_chunk_alloc(one_way->u.shm.rb, total_size);
if (dest == NULL) {
return -errno;
}
pt = dest;
for (i = 0; i < iov_len; i++) {
memcpy(pt, iov[i].iov_base, iov[i].iov_len);
pt += iov[i].iov_len;
}
res = qb_rb_chunk_commit(one_way->u.shm.rb, total_size);
if (res < 0) {
return res;
}
return total_size;
}
static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way,
void *msg_ptr,
size_t msg_len,
int32_t ms_timeout)
{
ssize_t res = qb_rb_chunk_read(one_way->u.shm.rb,
(void *)msg_ptr,
msg_len,
ms_timeout);
if (res == -ETIMEDOUT) {
return -EAGAIN;
}
return res;
}
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{
int32_t res = 0;
c->funcs.send = qb_ipc_shm_send;
c->funcs.sendv = qb_ipc_shm_sendv;
c->funcs.recv = qb_ipc_shm_recv;
c->funcs.disconnect = qb_ipcc_shm_disconnect;
c->needs_sock_for_poll = QB_TRUE;
if (strlen(c->name) > (NAME_MAX - 20)) {
errno = EINVAL;
return -errno;
}
c->request.u.shm.rb = qb_rb_open(response->request, c->request.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS);
if (c->request.u.shm.rb == NULL) {
perror("qb_rb_open:REQUEST");
return -errno;
}
c->response.u.shm.rb = qb_rb_open(response->response,
c->response.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS);
if (c->response.u.shm.rb == NULL) {
perror("qb_rb_open:RESPONSE");
goto cleanup_request;
}
c->event.u.shm.rb = qb_rb_open(response->event,
c->response.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS);
if (c->event.u.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:EVENT");
goto cleanup_request_response;
}
return 0;
cleanup_request_response:
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
cleanup_request:
qb_rb_close(c->request.u.shm.rb, QB_FALSE);
qb_util_log(LOG_DEBUG, "connection failed %d\n", res);
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static void qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c)
{
struct qb_ipc_response_header msg;
int32_t peer_alive = QB_TRUE;
if (c->sock == -1) {
peer_alive = QB_FALSE;
}
msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg);
msg.error = 0;
if (c->response.u.shm.rb) {
if (peer_alive) {
qb_ipc_shm_send(&c->event, &msg, msg.size);
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
} else {
qb_rb_close(c->response.u.shm.rb, QB_TRUE);
}
}
if (c->event.u.shm.rb) {
qb_rb_close(c->event.u.shm.rb, !peer_alive);
}
if (c->request.u.shm.rb) {
qb_rb_close(c->request.u.shm.rb, !peer_alive);
}
}
static void qb_ipcs_shm_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
qb_util_log(LOG_DEBUG, "destroying server\n");
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
iter_next = iter->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
}
static int32_t qb_ipcs_shm_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res;
qb_util_log(LOG_DEBUG, "connecting to client [%d]\n", c->pid);
snprintf(r->request, NAME_MAX, "%s-request-%d", s->name, c->pid);
snprintf(r->response, NAME_MAX, "%s-response-%d", s->name, c->pid);
snprintf(r->event, NAME_MAX, "%s-event-%d", s->name, c->pid);
qb_util_log(LOG_DEBUG, "rb_open:%s", r->request);
c->request.u.shm.rb = qb_rb_open(r->request,
c->request.max_msg_size,
QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS);
if (c->request.u.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:REQUEST");
goto cleanup;
}
res = qb_rb_chown(c->request.u.shm.rb, c->euid, c->egid);
c->response.u.shm.rb = qb_rb_open(r->response,
c->response.max_msg_size,
QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS);
if (c->response.u.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:RESPONSE");
goto cleanup_request;
}
res = qb_rb_chown(c->response.u.shm.rb, c->euid, c->egid);
c->event.u.shm.rb = qb_rb_open(r->event,
c->event.max_msg_size,
QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS);
if (c->event.u.shm.rb == NULL) {
res = -errno;
perror("mq_open:EVENT");
goto cleanup_request_response;
}
res = qb_rb_chown(c->event.u.shm.rb, c->euid, c->egid);
r->hdr.error = 0;
return 0;
cleanup_request_response:
qb_rb_close(c->request.u.shm.rb, QB_FALSE);
cleanup_request:
qb_rb_close(c->response.u.shm.rb, QB_FALSE);
cleanup:
r->hdr.error = res;
return res;
}
int32_t qb_ipcs_shm_create(struct qb_ipcs_service *s)
{
s->funcs.destroy = qb_ipcs_shm_destroy;
s->funcs.recv = qb_ipc_shm_recv;
s->funcs.send = qb_ipc_shm_send;
s->funcs.sendv = qb_ipc_shm_sendv;
s->funcs.connect = qb_ipcs_shm_connect;
s->funcs.disconnect = qb_ipcs_shm_disconnect;
s->needs_sock_for_poll = QB_TRUE;
return 0;
}
diff --git a/lib/ipc_sysv_mq.c b/lib/ipc_sysv_mq.c
index ef59192..dd224b1 100644
--- a/lib/ipc_sysv_mq.c
+++ b/lib/ipc_sysv_mq.c
@@ -1,398 +1,398 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include <sys/ipc.h>
#include <sys/msg.h>
#include <qb/qbdefs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include "ipc_int.h"
#include "util_int.h"
#ifndef MSGMAX
#define MSGMAX 8192
#endif
#define MY_DATA_SIZE 8000
struct my_msgbuf {
int32_t id __attribute__ ((aligned(8)));
char data[MY_DATA_SIZE] __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
/*
* utility functions
* --------------------------------------------------------
*/
static int32_t sysv_mq_unnamed_create(struct qb_ipcs_connection *c,
struct qb_ipc_one_way *queue)
{
struct msqid_ds info;
int32_t res = 0;
retry_creating_the_q:
queue->u.smq.key = random();
queue->u.smq.q =
msgget(queue->u.smq.key,
IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR);
if (queue->u.smq.q == -1 && errno == EEXIST) {
goto retry_creating_the_q;
} else if (queue->u.smq.q == -1) {
return -errno;
}
/*
* change the queue size and change the ownership to that of
* the client so they can access it.
*/
res = msgctl(queue->u.smq.q, IPC_STAT, &info);
if (res != 0) {
res = -errno;
qb_util_log(LOG_ERR, "error getting sysv-mq info : %s",
strerror(errno));
return res;
}
if (info.msg_perm.uid != 0) {
qb_util_log(LOG_WARNING,
"not enough privileges to increase msg_qbytes");
return res;
}
info.msg_qbytes = 2 * queue->max_msg_size;
info.msg_perm.uid = c->euid;
info.msg_perm.gid = c->egid;
res = msgctl(queue->u.smq.q, IPC_SET, &info);
if (res != 0) {
res = -errno;
qb_util_log(LOG_ERR,
"error modifing the SYSV message queue : %s",
strerror(errno));
return res;
}
return 0;
}
static int32_t sysv_split_and_send(mqd_t q, const void *msg_ptr,
size_t msg_len, int32_t last_chunk)
{
int32_t res;
int32_t sent = 0;
#ifdef PACK_MESSAGES
char *progress = (char *)msg_ptr;
struct my_msgbuf buf;
size_t to_send_now; /* to send in this message */
size_t to_send_next; /* to send in next message */
do {
to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE);
to_send_next = msg_len - (sent + to_send_now);
/* receiver used the ID to check to see if there
* is more to recieve for this message.
*/
if (last_chunk) {
buf.id = to_send_next + 1;
} else {
buf.id = to_send_next + 1 + msg_len;
}
memcpy(buf.data, progress, to_send_now);
res = msgsnd(q, &buf, to_send_now, IPC_NOWAIT);
if (res == 0) {
sent += to_send_now;
progress += to_send_now;
} else {
goto return_status;
}
} while (sent < msg_len);
return_status:
#else
res = msgsnd(q, msg_ptr, msg_len, IPC_NOWAIT);
sent = msg_len;
#endif
if (res == -1) {
return -errno;
}
return sent;
}
/*
* client functions
* --------------------------------------------------------
*/
static ssize_t qb_ipc_smq_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{
return sysv_split_and_send(one_way->u.smq.q, msg_ptr, msg_len, QB_TRUE);
}
static ssize_t qb_ipc_smq_sendv(struct qb_ipc_one_way *one_way,
const struct iovec *iov, size_t iov_len)
{
int32_t res;
int32_t sent = 0;
struct my_msgbuf buf;
int32_t i;
for (i = 0; i < iov_len; i++) {
if (iov[i].iov_len <= MY_DATA_SIZE) {
if (i == iov_len-1) {
buf.id = 1;
} else {
buf.id = i + iov[i].iov_len;
}
memcpy(buf.data, iov[i].iov_base, iov[i].iov_len);
res = msgsnd(one_way->u.smq.q, &buf, iov[i].iov_len, IPC_NOWAIT);
if (res == 0) {
res = iov[i].iov_len;
} else {
res = -errno;
}
} else {
res = sysv_split_and_send(one_way->u.smq.q, iov[i].iov_base,
iov[i].iov_len, (i == iov_len-1));
}
if (res > 0) {
sent += res;
} else {
return res;
}
}
return sent;
}
static ssize_t qb_ipc_smq_recv(struct qb_ipc_one_way *one_way,
void *msg_ptr,
size_t msg_len,
int32_t ms_timeout)
{
ssize_t res;
ssize_t received = 0;
#ifdef PACK_MESSAGES
char *progress = (char *)msg_ptr;
struct my_msgbuf buf;
do {
try_again:
res = msgrcv(one_way->u.smq.q, &buf, MY_DATA_SIZE, 0, IPC_NOWAIT);
if (res == -1 && errno == ENOMSG) {
goto try_again;
}
//printf("res:%zd, ID:%d\n", res, buf.id);
if (res == -1) {
goto return_status;
}
memcpy(progress, buf.data, res);
received += res;
progress += res;
} while (buf.id > 1);
return_status:
#else
res = msgrcv(one_way->u.smq.q, msg_ptr, msg_len, 0, IPC_NOWAIT);
received = res;
#endif
if (res == -1 && errno == ENOMSG) {
/* just to be consistent with other IPC types.
*/
return -EAGAIN;
}
if (res == -1) {
perror(__func__);
return -errno;
}
return received;
}
static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_request_header hdr;
qb_util_log(LOG_DEBUG, "%s()\n", __func__);
//if (c->needs_sock_for_poll) {
// return;
//}
hdr.id = QB_IPC_MSG_DISCONNECT;
hdr.size = sizeof(hdr);
sysv_split_and_send(c->request.u.smq.q, (const char *)&hdr, hdr.size,
QB_TRUE);
msgctl(c->event.u.smq.q, IPC_RMID, NULL);
msgctl(c->response.u.smq.q, IPC_RMID, NULL);
msgctl(c->request.u.smq.q, IPC_RMID, NULL);
}
int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{
int32_t res = 0;
c->funcs.send = qb_ipc_smq_send;
c->funcs.sendv = qb_ipc_smq_sendv;
c->funcs.recv = qb_ipc_smq_recv;
c->funcs.disconnect = qb_ipcc_smq_disconnect;
c->type = QB_IPC_SYSV_MQ;
c->needs_sock_for_poll = QB_TRUE;
if (strlen(c->name) > (NAME_MAX - 20)) {
return -EINVAL;
}
memcpy(&c->request.u.smq.key, response->request, sizeof(uint32_t));
c->request.u.smq.q = msgget(c->request.u.smq.key, IPC_NOWAIT);
if (c->request.u.smq.q == -1) {
res = -errno;
perror("msgget:REQUEST");
goto cleanup;
}
memcpy(&c->response.u.smq.key, response->response, sizeof(uint32_t));
c->response.u.smq.q = msgget(c->response.u.smq.key, IPC_NOWAIT);
if (c->response.u.smq.q == -1) {
res = -errno;
perror("msgget:RESPONSE");
goto cleanup;
}
memcpy(&c->event.u.smq.key, response->event, sizeof(uint32_t));
c->event.u.smq.q = msgget(c->event.u.smq.key, IPC_NOWAIT);
if (c->event.u.smq.q == -1) {
res = -errno;
perror("msgget:EVENT");
goto cleanup;
}
cleanup:
return res;
}
/*
* service functions
* --------------------------------------------------------
*/
static void qb_ipcs_smq_disconnect(struct qb_ipcs_connection *c)
{
struct qb_ipc_response_header msg;
if (c->sock != -1) {
msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg);
msg.error = 0;
qb_ipc_smq_send(&c->event, &msg, msg.size);
} else {
msgctl(c->event.u.smq.q, IPC_RMID, NULL);
msgctl(c->response.u.smq.q, IPC_RMID, NULL);
msgctl(c->request.u.smq.q, IPC_RMID, NULL);
}
}
static void qb_ipcs_smq_destroy(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
qb_util_log(LOG_DEBUG, "%s\n", __func__);
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
iter_next = iter->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list);
if (c == NULL) {
continue;
}
qb_ipcs_disconnect(c);
}
}
static int32_t qb_ipcs_smq_connect(struct qb_ipcs_service *s,
struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{
int32_t res = 0;
res = sysv_mq_unnamed_create(c, &c->request);
if (res < 0) {
res = -errno;
goto cleanup;
}
memcpy(r->request, &c->request.u.smq.key, sizeof(int32_t));
res = sysv_mq_unnamed_create(c, &c->response);
if (res < 0) {
res = -errno;
goto cleanup_request;
}
memcpy(r->response, &c->response.u.smq.key, sizeof(int32_t));
res = sysv_mq_unnamed_create(c, &c->event);
if (res < 0) {
res = -errno;
goto cleanup_request_response;
}
memcpy(r->event, &c->event.u.smq.key, sizeof(int32_t));
r->hdr.error = 0;
return 0;
cleanup_request:
msgctl(c->request.u.smq.q, IPC_RMID, NULL);
cleanup_request_response:
msgctl(c->response.u.smq.q, IPC_RMID, NULL);
cleanup:
r->hdr.error = res;
return res;
}
#if 0
static int32_t qb_ipcs_smq_is_msg_ready(struct qb_ipcs_service *s)
{
struct msqid_ds info;
int32_t res = msgctl(s->u.smq.q, IPC_STAT, &info);
if (res == 0) {
return info.msg_qnum;
} else {
perror("is_msg_ready");
}
return -errno;
}
#endif
int32_t qb_ipcs_smq_create(struct qb_ipcs_service *s)
{
s->funcs.destroy = qb_ipcs_smq_destroy;
s->funcs.connect = qb_ipcs_smq_connect;
s->funcs.disconnect = qb_ipcs_smq_disconnect;
s->funcs.send = qb_ipc_smq_send;
s->funcs.sendv = qb_ipc_smq_sendv;
s->funcs.recv = qb_ipc_smq_recv;
s->needs_sock_for_poll = QB_TRUE;
return 0;
}
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index b4581e5..48fa6e0 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -1,632 +1,630 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#if defined(HAVE_GETPEERUCRED)
#include <ucred.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#include <qb/qbipcs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include "util_int.h"
#include "ipc_int.h"
#define SERVER_BACKLOG 5
#if defined(QB_LINUX) || defined(QB_SOLARIS)
#define QB_SUN_LEN(a) sizeof(*(a))
#else
#define QB_SUN_LEN(a) SUN_LEN(a)
#endif
-static int32_t qb_ipcs_us_connection_acceptor(qb_handle_t handle,
- int fd, int revent, void *data);
+static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);
#ifdef SO_NOSIGPIPE
static void socket_nosigpipe(int32_t s)
{
int32_t on = 1;
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on));
}
#endif
static void set_cloexec_flag(int32_t fd)
{
int32_t oldflags = fcntl(fd, F_GETFD, 0);
if (oldflags < 0) {
oldflags = 0;
}
oldflags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, oldflags);
}
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
int32_t qb_ipc_us_send(int32_t s, const void *msg, size_t len)
{
int32_t result;
struct msghdr msg_send;
struct iovec iov_send;
char *rbuf = (char *)msg;
int32_t processed = 0;
msg_send.msg_iov = &iov_send;
msg_send.msg_iovlen = 1;
msg_send.msg_name = 0;
msg_send.msg_namelen = 0;
#if !defined(QB_SOLARIS)
msg_send.msg_control = 0;
msg_send.msg_controllen = 0;
msg_send.msg_flags = 0;
#else
msg_send.msg_accrights = NULL;
msg_send.msg_accrightslen = 0;
#endif
retry_send:
iov_send.iov_base = &rbuf[processed];
iov_send.iov_len = len - processed;
result = sendmsg(s, &msg_send, MSG_NOSIGNAL);
if (result == -1 && errno == EAGAIN) {
goto retry_send;
}
if (result == -1) {
return -errno;
}
processed += result;
if (processed != len) {
goto retry_send;
}
return processed;
}
static ssize_t qb_ipc_us_recv_msghdr(int32_t s,
struct msghdr *hdr, char *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
retry_recv:
hdr->msg_iov->iov_base = &msg[processed];
hdr->msg_iov->iov_len = len - processed;
result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
return -errno;
}
#if defined(QB_SOLARIS) || defined(QB_BSD) || defined(QB_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (result == 0) {
return -errno; //ENOTCONN
}
#endif
processed += result;
if (processed != len) {
goto retry_recv;
}
assert(processed == len);
return processed;
}
int32_t qb_ipc_us_recv_ready(int32_t s, int32_t ms_timeout)
{
struct pollfd ufds;
int32_t poll_events;
ufds.fd = s;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll (&ufds, 1, ms_timeout);
if ((poll_events == -1 && errno == EINTR) ||
poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
} else if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
return -ESHUTDOWN;
}
return 0;
}
int32_t qb_ipc_us_recv(int32_t s, void *msg, size_t len)
{
struct msghdr msg_recv;
struct iovec iov_recv;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#if !defined (QB_SOLARIS)
msg_recv.msg_control = 0;
msg_recv.msg_controllen = 0;
msg_recv.msg_flags = 0;
#else
msg_recv.msg_accrights = NULL;
msg_recv.msg_accrightslen = 0;
#endif
return qb_ipc_us_recv_msghdr(s, &msg_recv, msg, len);
}
static int32_t qb_ipcs_uc_recv_and_auth(struct qb_ipcs_connection *c)
{
int32_t res = 0;
struct msghdr msg_recv;
struct iovec iov_recv;
struct qb_ipc_connection_request setup_msg;
#ifdef QB_LINUX
struct cmsghdr *cmsg;
char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))];
int off = 0;
int on = 1;
struct ucred *cred;
#endif
msg_recv.msg_flags = 0;
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0;
#ifdef QB_LINUX
msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof(cmsg_cred);
#endif
#ifdef QB_SOLARIS
msg_recv.msg_accrights = 0;
msg_recv.msg_accrightslen = 0;
#endif /* QB_SOLARIS */
iov_recv.iov_base = &setup_msg;
iov_recv.iov_len = sizeof(struct qb_ipc_connection_request);
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
res = qb_ipc_us_recv_msghdr(c->sock, &msg_recv, (char *)&setup_msg,
sizeof(struct qb_ipc_connection_request));
if (res < 0) {
goto cleanup_and_return;
}
if (res != sizeof(struct qb_ipc_connection_request)) {
res = -EIO;
goto cleanup_and_return;
}
c->request.max_msg_size = setup_msg.max_msg_size;
c->response.max_msg_size = setup_msg.max_msg_size;
c->event.max_msg_size = setup_msg.max_msg_size;
res = -EBADMSG;
/*
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
* retrieval mechanisms for various Platforms
*/
#ifdef HAVE_GETPEERUCRED
/*
* Solaris and some BSD systems
*/
{
ucred_t *uc = NULL;
if (getpeerucred(c->sock, &uc) == 0) {
res = 0;
c->euid = ucred_geteuid(uc);
c->egid = ucred_getegid(uc);
c->pid = ucred_getpid(uc);
ucred_free(uc);
} else {
res = -errno;
}
}
#elif HAVE_GETPEEREID
/*
* Usually MacOSX systems
*/
{
/*
* TODO get the peer's pid.
* c->pid = ?;
*/
if (getpeereid(c->sock, &c->euid, &c->egid) == 0) {
res = 0;
} else {
res = -errno;
}
}
#elif SO_PASSCRED
/*
* Usually Linux systems
*/
cmsg = CMSG_FIRSTHDR(&msg_recv);
assert(cmsg);
cred = (struct ucred *)CMSG_DATA(cmsg);
if (cred) {
res = 0;
c->pid = cred->pid;
c->euid = cred->uid;
c->egid = cred->gid;
} else {
res = -EBADMSG;
}
#else /* no credentials */
res = -ENOTSUP;
#endif /* no credentials */
cleanup_and_return:
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif
if (res == 0) {
if (c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
c->euid,
c->egid);
} else {
res = 0;
}
}
return res;
}
int32_t qb_ipcc_us_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
#if defined(QB_SOLARIS)
request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
#else
request_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif
if (request_fd == -1) {
return -errno;
}
#ifdef SO_NOSIGPIPE
socket_nosigpipe(request_fd);
#endif /* SO_NOSIGPIPE */
set_cloexec_flag(request_fd);
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
address.sun_len = SUN_LEN(&address);
#endif
#if defined(QB_LINUX)
sprintf(address.sun_path + 1, "%s", socket_name);
#else
sprintf(address.sun_path, "%s/%s", SOCKETDIR, socket_name);
#endif
if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) {
goto error_connect;
}
*sock_pt = request_fd;
return 0;
error_connect:
close(request_fd);
*sock_pt = -1;
return -errno;
}
void qb_ipcc_us_disconnect(int32_t sock)
{
shutdown(sock, SHUT_RDWR);
close(sock);
}
#if 0
cs_error_t coroipcc_dispatch_get(hdb_handle_t handle, void **data, int timeout)
{
struct pollfd ufds;
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
char *data_addr;
cs_error_t error = CS_OK;
int res;
error =
hdb_error_to_cs(hdb_handle_get
(&ipc_hdb, handle, (void **)&ipc_instance));
if (error != CS_OK) {
return (error);
}
*data = NULL;
ufds.fd = ipc_instance->fd;
ufds.events = POLLIN;
ufds.revents = 0;
poll_events = poll(&ufds, 1, timeout);
if (poll_events == -1 && errno == EINTR) {
error = CS_ERR_TRY_AGAIN;
goto error_put;
} else if (poll_events == -1) {
error = CS_ERR_LIBRARY;
goto error_put;
} else if (poll_events == 0) {
error = CS_ERR_TRY_AGAIN;
goto error_put;
}
if (poll_events == 1 && (ufds.revents & (POLLERR | POLLHUP))) {
error = CS_ERR_LIBRARY;
goto error_put;
}
error = socket_recv(ipc_instance->fd, &buf, 1);
assert(error == CS_OK);
if (shared_mem_dispatch_bytes_left(ipc_instance) > 500000) {
/*
* Notify coroipcs to flush any pending dispatch messages
*/
res =
ipc_sem_post(ipc_instance->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
if (res != CS_OK) {
error = CS_ERR_LIBRARY;
goto error_put;
}
}
data_addr = ipc_instance->dispatch_buffer;
data_addr = &data_addr[ipc_instance->control_buffer->read];
*data = (void *)data_addr;
return (CS_OK);
error_put:
hdb_handle_put(&ipc_hdb, handle);
return (error);
}
#endif
/*
**************************************************************************
* SERVER
*/
int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s)
{
struct sockaddr_un un_addr;
int32_t res;
char error_str[100];
/*
* Create socket for IPC clients, name socket, listen for connections
*/
#if defined(QB_SOLARIS)
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
#else
s->server_sock = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif
if (s->server_sock == -1) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Cannot create server socket: %s\n", error_str);
return res;
}
set_cloexec_flag(s->server_sock);
res = fcntl(s->server_sock, F_SETFL, O_NONBLOCK);
if (res == -1) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_CRIT,
"Could not set non-blocking operation on server socket: %s\n",
error_str);
goto error_close;
}
memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr);
#endif
#if defined(QB_LINUX)
sprintf(un_addr.sun_path + 1, "%s", s->name);
#else
{
struct stat stat_out;
res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno;
qb_util_log(LOG_CRIT,
"Required directory not present %s\n",
SOCKETDIR);
goto error_close;
}
sprintf(un_addr.sun_path, "%s/%s", SOCKETDIR, s->name);
unlink(un_addr.sun_path);
}
#endif
res =
bind(s->server_sock, (struct sockaddr *)&un_addr,
QB_SUN_LEN(&un_addr));
if (res) {
res = -errno;
strerror_r(errno, error_str, 100);
qb_util_log(LOG_CRIT,
"Could not bind AF_UNIX (%s): %s.\n",
un_addr.sun_path, error_str);
goto error_close;
}
/*
* Allow eveyrone to write to the socket since the IPC layer handles
* security automatically
*/
#if !defined(QB_LINUX)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR, "listen failed: %s.\n", error_str);
}
- qb_poll_dispatch_add(s->poll_handle, s->server_sock,
+ qb_loop_poll_add(s->loop_pt, QB_LOOP_HIGH, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor);
return 0;
error_close:
close(s->server_sock);
return res;
}
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
qb_util_log(LOG_INFO, "withdrawing server sockets\n");
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
return 0;
}
-static int32_t qb_ipcs_us_connection_acceptor(qb_handle_t handle,
- int fd, int revent, void *data)
+static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{
struct sockaddr_un un_addr;
int32_t new_fd;
struct qb_ipcs_connection *c;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
struct qb_ipc_connection_response response;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
char error_str[100];
retry_accept:
errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
if (new_fd == -1 && errno == EBADF) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection:(fd: %d) [%d] %s\n",
fd, errno, error_str);
return -1;
}
if (new_fd == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection: [%d] %s\n",
errno, error_str);
return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
set_cloexec_flag(new_fd);
res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
if (res == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not set non-blocking operation on library connection: %s\n",
error_str);
close(new_fd);
return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
c = qb_ipcs_connection_alloc(s);
c->sock = new_fd;
res = qb_ipcs_uc_recv_and_auth(c);
if (res == 0) {
qb_util_log(LOG_INFO, "IPC credentials authenticated");
res = s->funcs.connect(s, c, &response);
if (res != 0) {
goto send_response;
}
qb_list_add(&c->list, &s->connections);
c->receive_buf = malloc(c->request.max_msg_size);
if (s->needs_sock_for_poll) {
- qb_poll_dispatch_add(s->poll_handle, c->sock,
+ qb_loop_poll_add(s->loop_pt, QB_LOOP_HIGH, c->sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
}
}
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
response.hdr.error = res;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
qb_ipc_us_send(c->sock, &response, response.hdr.size);
if (res == 0) {
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
} else if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials.");
} else {
strerror_r(-response.hdr.error, error_str, 100);
qb_util_log(LOG_ERR, "Error in connection setup: %s.",
error_str);
}
if (res != 0) {
qb_ipcs_disconnect(c);
}
return 0;
}
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 404894e..69f6c1a 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -1,299 +1,295 @@
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os_base.h"
#include "util_int.h"
#include "ipc_int.h"
#include <qb/qbdefs.h>
#include <qb/qbipcs.h>
static void qb_ipcs_destroy_internal(void *data);
QB_HDB_DECLARE(qb_ipc_services, qb_ipcs_destroy_internal);
qb_ipcs_service_pt qb_ipcs_create(const char *name,
int32_t service_id,
enum qb_ipc_type type,
struct qb_ipcs_service_handlers *handlers)
{
struct qb_ipcs_service *s;
qb_ipcs_service_pt h;
qb_hdb_handle_create(&qb_ipc_services,
sizeof(struct qb_ipcs_service), &h);
qb_hdb_handle_get(&qb_ipc_services, h, (void **)&s);
s->pid = getpid();
s->type = type;
s->needs_sock_for_poll = QB_FALSE;
s->service_id = service_id;
strncpy(s->name, name, NAME_MAX);
s->serv_fns.connection_accept = handlers->connection_accept;
s->serv_fns.connection_created = handlers->connection_created;
s->serv_fns.msg_process = handlers->msg_process;
s->serv_fns.connection_destroyed = handlers->connection_destroyed;
qb_list_init(&s->connections);
qb_hdb_handle_put(&qb_ipc_services, h);
return h;
}
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt pt,
struct qb_ipcs_poll_handlers *handlers)
{
struct qb_ipcs_service *s;
qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_rm = handlers->dispatch_rm;
qb_hdb_handle_put(&qb_ipc_services, pt);
}
-int32_t qb_ipcs_run(qb_ipcs_service_pt pt, qb_handle_t poll_handle)
+int32_t qb_ipcs_run(qb_ipcs_service_pt pt, void *loop_pt)
{
int32_t res;
struct qb_ipcs_service *s;
qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
- s->poll_handle = poll_handle;
+ s->loop_pt = loop_pt;
res = qb_ipcs_us_publish(s);
if (res < 0) {
qb_hdb_handle_put(&qb_ipc_services, pt);
return res;
}
switch (s->type) {
case QB_IPC_SOCKET:
res = 0;
break;
case QB_IPC_SHM:
res = qb_ipcs_shm_create((struct qb_ipcs_service *)s);
break;
case QB_IPC_POSIX_MQ:
res = qb_ipcs_pmq_create((struct qb_ipcs_service *)s);
break;
case QB_IPC_SYSV_MQ:
res = qb_ipcs_smq_create((struct qb_ipcs_service *)s);
break;
default:
res = -EINVAL;
break;
}
if (res < 0) {
qb_ipcs_us_withdraw(s);
}
qb_hdb_handle_put(&qb_ipc_services, pt);
return res;
}
void qb_ipcs_destroy(qb_ipcs_service_pt pt)
{
qb_hdb_handle_put(&qb_ipc_services, pt);
qb_hdb_handle_destroy(&qb_ipc_services, pt);
}
static void qb_ipcs_destroy_internal(void *data)
{
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
s->funcs.destroy(s);
}
ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->response, data, size);
qb_ipcs_connection_ref_dec(c);
return res;
}
ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void *data,
size_t size)
{
ssize_t res;
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->event, data, size);
qb_ipc_us_send(c->sock, data, 1);
qb_ipcs_connection_ref_dec(c);
return res;
}
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len)
{
ssize_t res;
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.sendv(&c->event, iov, iov_len);
qb_ipc_us_send(c->sock, &res, 1);
qb_ipcs_connection_ref_dec(c);
return res;
}
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
struct qb_ipcs_connection *c = malloc(sizeof(struct qb_ipcs_connection));
c->refcount = 1;
c->service = s;
c->pid = 0;
c->euid = -1;
c->egid = -1;
c->sock = -1;
qb_list_init(&c->list);
c->receive_buf = NULL;
return c;
}
void qb_ipcs_connection_ref_inc(struct qb_ipcs_connection *c)
{
// lock
c->refcount++;
//qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
// unlock
}
void qb_ipcs_connection_ref_dec(struct qb_ipcs_connection *c)
{
// lock
c->refcount--;
//qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
if (c->refcount == 0) {
qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
qb_list_del(&c->list);
// unlock
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
}
c->service->funcs.disconnect(c);
qb_ipcc_us_disconnect(c->sock);
if (c->receive_buf) {
free(c->receive_buf);
}
} else {
// unlock
}
}
int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection *c)
{
return c->service->service_id;
}
void qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{
qb_util_log(LOG_DEBUG, "%s()", __func__);
qb_ipcs_connection_ref_dec(c);
}
static int32_t _process_request_(struct qb_ipcs_connection *c)
{
int32_t res = 0;
struct qb_ipc_request_header *hdr;
hdr = (struct qb_ipc_request_header *)c->receive_buf;
qb_ipcs_connection_ref_inc(c);
get_msg_with_live_connection:
res = c->service->funcs.recv(&c->request, hdr, c->request.max_msg_size, 0);
if (res == -EAGAIN) {
goto get_msg_with_live_connection;
}
if (res < 0) {
qb_util_log(LOG_DEBUG, "%s(): %s", __func__, strerror(-res));
goto cleanup;
}
- qb_util_log(LOG_DEBUG, "%s() service:%d msg:%d", __func__,
- c->service->service_id, hdr->id);
if (hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func__);
qb_ipcs_disconnect(c);
res = -ESHUTDOWN;
} else {
c->service->serv_fns.msg_process(c, hdr, hdr->size);
res = 0;
}
cleanup:
qb_ipcs_connection_ref_dec(c);
return res;
}
-int32_t qb_ipcs_dispatch_service_request(qb_handle_t handle,
- int32_t fd, int32_t revents,
+int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents,
void *data)
{
return _process_request_((struct qb_ipcs_connection *)data);
}
-int32_t qb_ipcs_dispatch_connection_request(qb_handle_t handle,
- int32_t fd, int32_t revents,
+int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents,
void *data)
{
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
char one_byte;
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "%s HUP", __func__);
qb_ipcc_us_disconnect(c->sock);
c->sock = -1;
qb_ipcs_connection_ref_dec(c);
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
if (c->service->needs_sock_for_poll) {
qb_ipc_us_recv(c->sock, &one_byte, 1);
}
return _process_request_(c);
}
void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
{
c->context = context;
}
void *qb_ipcs_context_get(struct qb_ipcs_connection *c)
{
return c->context;
}
diff --git a/lib/loop.c b/lib/loop.c
new file mode 100644
index 0000000..5a98d67
--- /dev/null
+++ b/lib/loop.c
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+
+static struct qb_loop_source * timer_source;
+static struct qb_loop_source * job_source;
+static struct qb_loop_source * fd_source;
+
+static int32_t qb_loop_run_level(struct qb_loop_level *level)
+{
+ struct qb_loop_item *job;
+ struct qb_list_head *iter, *iter_next;
+ int32_t processed = 0;
+
+ for (iter = level->job_head.next;
+ iter != &level->job_head;
+ iter = iter_next) {
+ if (processed >= level->to_process) {
+ break;
+ }
+ iter_next = iter->next;
+ job = qb_list_entry(iter, struct qb_loop_item, list);
+ qb_list_del (&job->list);
+ qb_list_init (&job->list);
+ job->source->dispatch_and_take_back(job, level->priority);
+ if (level->l->stop_requested) {
+ printf("%s:%d pr:%d STOP!!!\n", __func__, __LINE__, level->priority);
+ return processed;
+ }
+ processed++;
+ }
+ return processed;
+}
+
+
+struct qb_loop * qb_loop_create(void)
+{
+ struct qb_loop *l = malloc(sizeof(struct qb_loop));
+ int32_t p;
+
+ for (p = QB_LOOP_LOW; p <= QB_LOOP_HIGH; p++) {
+ l->level[p].priority = p;
+ l->level[p].to_process = 4;
+ l->level[p].l = l;
+
+ qb_list_init(&l->level[p].job_head);
+ qb_list_init(&l->level[p].wait_head);
+ }
+
+ l->stop_requested = QB_FALSE;
+ qb_list_init(&l->source_head);
+ // install sources
+ timer_source = qb_loop_timer_init(l);
+ job_source = qb_loop_jobs_init(l);
+ fd_source = qb_loop_poll_init(l);
+
+ return l;
+}
+
+void qb_loop_stop(struct qb_loop *l)
+{
+ l->stop_requested = QB_TRUE;
+}
+
+void qb_loop_run(struct qb_loop *l)
+{
+ int32_t p;
+ int32_t p_stop;
+ int32_t todo;
+ int32_t done;
+ int32_t ms_timeout;
+ int32_t fd_poll_done = QB_FALSE;
+
+ do {
+ p_stop = QB_LOOP_HIGH;
+ todo = 0;
+ poll_again:
+ if (!fd_poll_done) {
+ todo += fd_source->poll(fd_source, 0);
+ }
+ todo += job_source->poll(job_source, 0);
+ todo += timer_source->poll(timer_source, 0);
+
+ for (p = QB_LOOP_HIGH; p >= p_stop; p--) {
+ done = qb_loop_run_level(&l->level[p]);
+ if (l->stop_requested) {
+ return;
+ }
+ todo -= done;
+ }
+ if (p_stop > QB_LOOP_LOW) {
+ p_stop--;
+ fd_poll_done = QB_FALSE;
+ goto poll_again;
+ }
+ if (todo == 0) {
+ ms_timeout = qb_loop_timer_msec_duration_to_expire(timer_source);
+ todo = fd_source->poll(fd_source, ms_timeout);
+ fd_poll_done = QB_TRUE;
+ } else {
+ fd_poll_done = QB_FALSE;
+ }
+ } while (!l->stop_requested);
+}
+
diff --git a/lib/loop_int.h b/lib/loop_int.h
new file mode 100644
index 0000000..157fd3a
--- /dev/null
+++ b/lib/loop_int.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef QB_LOOP_INT_DEFINED
+#define QB_LOOP_INT_DEFINED
+
+#include <qb/qbloop.h>
+
+struct qb_loop;
+struct qb_loop_item;
+
+struct qb_loop_item {
+ struct qb_list_head list;
+ struct qb_loop_source *source;
+ void *user_data;
+};
+
+struct qb_loop_level {
+ enum qb_loop_priority priority;
+ int32_t to_process;
+ struct qb_list_head wait_head;
+ struct qb_list_head job_head;
+ struct qb_loop *l;
+};
+
+struct qb_loop_source {
+ struct qb_loop *l;
+ void (*dispatch_and_take_back)(struct qb_loop_item *i,
+ enum qb_loop_priority p);
+ int32_t (*poll)(struct qb_loop_source* s, int32_t ms_timeout);
+ struct qb_list_head list;
+};
+
+struct qb_loop {
+ struct qb_loop_level level[3];
+ int32_t stop_requested;
+ struct qb_list_head source_head;
+};
+
+struct qb_loop_source *
+qb_loop_jobs_init(struct qb_loop *l);
+
+struct qb_loop_source*
+qb_loop_timer_init(struct qb_loop *l);
+
+struct qb_loop_source*
+qb_loop_poll_init(struct qb_loop *l);
+
+int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_source);
+
+
+#endif /* QB_LOOP_INT_DEFINED */
+
diff --git a/lib/loop_job.c b/lib/loop_job.c
new file mode 100644
index 0000000..7396437
--- /dev/null
+++ b/lib/loop_job.c
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2006-2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+
+struct qb_loop_job {
+ struct qb_loop_item item;
+ qb_loop_job_dispatch_fn dispatch_fn;
+};
+
+static struct qb_loop_source * my_src;
+
+
+static void job_dispatch(struct qb_loop_item * item,
+ enum qb_loop_priority p)
+{
+ struct qb_loop_job *job = qb_list_entry(item, struct qb_loop_job, item);
+
+ job->dispatch_fn(job->item.user_data);
+ free(job);
+
+ // this is a one-shot so don't re-add
+}
+
+static int32_t get_more_jobs(struct qb_loop_source* s, int32_t ms_timeout)
+{
+ struct qb_list_head* iter;
+ struct qb_list_head* iter_next;
+ int32_t p;
+ int32_t new_jobs = 0;
+
+ // this is simple, move jobs from wait_head to job_head
+ // TODO use qb_list_splice
+ for (p = QB_LOOP_LOW; p <= QB_LOOP_HIGH; p++) {
+ for (iter = s->l->level[p].wait_head.next;
+ iter != &s->l->level[p].wait_head;
+ iter = iter_next) {
+ iter_next = iter->next;
+ qb_list_del(iter);
+ qb_list_init(iter);
+ qb_list_add_tail(iter, &s->l->level[p].job_head);
+ }
+ qb_list_init(&s->l->level[p].wait_head);
+ new_jobs += qb_list_length(&s->l->level[p].job_head);
+ }
+ return new_jobs;
+}
+
+struct qb_loop_source *
+qb_loop_jobs_init(struct qb_loop *l)
+{
+ my_src = malloc(sizeof(struct qb_loop_source));
+ my_src->l = l;
+ my_src->dispatch_and_take_back = job_dispatch;
+ my_src->poll = get_more_jobs;
+
+ qb_list_init(&my_src->list);
+ qb_list_add_tail(&my_src->list, &l->source_head);
+ return my_src;
+}
+
+int32_t qb_loop_job_add(struct qb_loop *l,
+ enum qb_loop_priority p,
+ void *data,
+ qb_loop_job_dispatch_fn dispatch_fn)
+{
+ struct qb_loop_job *job = malloc(sizeof(struct qb_loop_job));
+
+ job->dispatch_fn = dispatch_fn;
+ job->item.user_data = data;
+ job->item.source = my_src;
+
+ qb_list_init(&job->item.list);
+ qb_list_add_tail(&job->item.list, &l->level[p].wait_head);
+
+ return 0;
+}
+
diff --git a/lib/loop_poll.c b/lib/loop_poll.c
new file mode 100644
index 0000000..63b2afc
--- /dev/null
+++ b/lib/loop_poll.c
@@ -0,0 +1,298 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+#include <sys/poll.h>
+#include <sys/resource.h>
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+
+/* logs, std(in|out|err), pipe */
+#define POLL_FDS_USED_MISC 50
+
+struct qb_poll_entry {
+ struct qb_loop_item item;
+ struct pollfd ufd;
+ qb_loop_poll_dispatch_fn dispatch_fn;
+ enum qb_loop_priority p;
+ int32_t install_pos;
+};
+
+struct qb_poll_source {
+ struct qb_loop_source s;
+ struct pollfd *ufds;
+ int32_t poll_entry_count;
+ struct qb_poll_entry *poll_entries;
+ qb_loop_poll_low_fds_event_fn low_fds_event_fn;
+ int32_t not_enough_fds;
+};
+
+static struct qb_poll_source * my_src;
+
+static void poll_dispatch_and_take_back(struct qb_loop_item * item,
+ enum qb_loop_priority p)
+{
+ struct qb_poll_entry *pe = (struct qb_poll_entry *)item;
+ int32_t res;
+ int32_t idx = pe->install_pos;
+
+ res = pe->dispatch_fn(pe->ufd.fd, pe->ufd.revents, pe->item.user_data);
+ pe = &my_src->poll_entries[idx];
+ if (res < 0) {
+ pe->ufd.fd = -1; /* empty entry */
+ }
+ pe->ufd.revents = 0;
+}
+
+static void poll_fds_usage_check(struct qb_poll_source *s)
+{
+ struct rlimit lim;
+ static int32_t socks_limit = 0;
+ int32_t send_event = 0;
+ int32_t socks_used = 0;
+ int32_t socks_avail = 0;
+ int32_t i;
+
+ if (socks_limit == 0) {
+ if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
+ char error_str[100];
+ strerror_r(errno, error_str, 100);
+ printf("getrlimit: %s\n", error_str);
+ return;
+ }
+ socks_limit = lim.rlim_cur;
+ socks_limit -= POLL_FDS_USED_MISC;
+ if (socks_limit < 0) {
+ socks_limit = 0;
+ }
+ }
+
+ for (i = 0; i < s->poll_entry_count; i++) {
+ if (s->poll_entries[i].ufd.fd != -1) {
+ socks_used++;
+ }
+ }
+ socks_avail = socks_limit - socks_used;
+ if (socks_avail < 0) {
+ socks_avail = 0;
+ }
+ send_event = 0;
+ if (s->not_enough_fds) {
+ if (socks_avail > 2) {
+ s->not_enough_fds = 0;
+ send_event = 1;
+ }
+ } else {
+ if (socks_avail <= 1) {
+ s->not_enough_fds = 1;
+ send_event = 1;
+ }
+ }
+ if (send_event && s->low_fds_event_fn) {
+ s->low_fds_event_fn(s->not_enough_fds,
+ socks_avail);
+ }
+}
+
+static int32_t poll_and_add_to_jobs(struct qb_loop_source* src, int32_t ms_timeout)
+{
+ int32_t i;
+ int32_t res;
+ int32_t new_jobs = 0;
+ struct qb_poll_entry * pe;
+ struct qb_poll_source * s = (struct qb_poll_source *)src;
+
+ poll_fds_usage_check(s);
+
+ for (i = 0; i < s->poll_entry_count; i++) {
+ memcpy(&s->ufds[i], &s->poll_entries[i].ufd, sizeof(struct pollfd));
+ }
+
+ retry_poll:
+ res = poll(s->ufds, s->poll_entry_count, ms_timeout);
+ if (errno == EINTR && res == -1) {
+ goto retry_poll;
+ } else if (res == -1) {
+ return -errno;
+ }
+
+ for (i = 0; i < s->poll_entry_count; i++) {
+ if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) {
+ // empty
+ continue;
+ }
+ pe = &s->poll_entries[i];
+ if (s->ufds[i].revents == pe->ufd.revents) {
+ // entry already in the job queue.
+ continue;
+ }
+ pe->ufd.revents = s->ufds[i].revents;
+ qb_list_init(&pe->item.list);
+ qb_list_add_tail(&pe->item.list, &s->s.l->level[pe->p].job_head);
+ new_jobs++;
+ }
+
+ return new_jobs;
+}
+
+
+struct qb_loop_source*
+qb_loop_poll_init(struct qb_loop *l)
+{
+ my_src = malloc(sizeof(struct qb_poll_source));
+ my_src->s.l = l;
+ my_src->s.dispatch_and_take_back = poll_dispatch_and_take_back;
+ my_src->s.poll = poll_and_add_to_jobs;
+
+ my_src->poll_entries = 0;
+ my_src->ufds = 0;
+ my_src->poll_entry_count = 0;
+ my_src->low_fds_event_fn = NULL;
+ my_src->not_enough_fds = 0;
+
+ qb_list_init(&my_src->s.list);
+ qb_list_add_tail(&my_src->s.list, &l->source_head);
+ return (struct qb_loop_source*)my_src;
+}
+
+int32_t qb_loop_poll_low_fds_event_set(
+ qb_loop_t *l,
+ qb_loop_poll_low_fds_event_fn fn)
+{
+ my_src->low_fds_event_fn = fn;
+
+ return 0;
+}
+
+
+int32_t qb_loop_poll_add(struct qb_loop *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ void *data,
+ qb_loop_poll_dispatch_fn dispatch_fn)
+{
+ struct qb_poll_entry *poll_entries;
+ struct qb_poll_entry *pe;
+ struct pollfd *ufds;
+ int32_t found = 0;
+ int32_t install_pos;
+ int32_t res = 0;
+ int32_t new_size = 0;
+
+ for (found = 0, install_pos = 0;
+ install_pos < my_src->poll_entry_count; install_pos++) {
+ if (my_src->poll_entries[install_pos].ufd.fd == -1) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (found == 0) {
+ /*
+ * Grow pollfd list
+ */
+ new_size = (my_src->poll_entry_count + 1) * sizeof(struct qb_poll_entry);
+ poll_entries = realloc(my_src->poll_entries, new_size);
+ if (poll_entries == NULL) {
+ return -ENOMEM;
+ }
+ my_src->poll_entries = poll_entries;
+
+ new_size = (my_src->poll_entry_count+ 1) * sizeof(struct pollfd);
+ ufds = realloc(my_src->ufds, new_size);
+ if (ufds == NULL) {
+ return -ENOMEM;
+ }
+ my_src->ufds = ufds;
+
+ my_src->poll_entry_count += 1;
+ install_pos = my_src->poll_entry_count - 1;
+ }
+
+ /*
+ * Install new dispatch handler
+ */
+ pe = &my_src->poll_entries[install_pos];
+ pe->install_pos = install_pos;
+ pe->ufd.fd = fd;
+ pe->ufd.events = events;
+ pe->ufd.revents = 0;
+ pe->dispatch_fn = dispatch_fn;
+ pe->item.user_data = data;
+ pe->item.source = (struct qb_loop_source*)my_src;
+ pe->p = p;
+
+ return (res);
+}
+
+int32_t qb_loop_poll_mod(struct qb_loop *l,
+ enum qb_loop_priority p,
+ int32_t fd,
+ int32_t events,
+ qb_loop_poll_dispatch_fn dispatch_fn)
+{
+ int32_t i;
+ struct qb_poll_entry *pe;
+
+ /*
+ * Find file descriptor to modify events and dispatch function
+ */
+ for (i = 0; i < my_src->poll_entry_count; i++) {
+ pe = &my_src->poll_entries[i];
+ if (pe->ufd.fd == fd) {
+ pe->ufd.events = events;
+ pe->dispatch_fn = dispatch_fn;
+ pe->p = p;
+ return 0;
+ }
+ }
+
+ return -EBADF;
+}
+
+int32_t qb_loop_poll_del(struct qb_loop *l, int32_t fd)
+{
+ int32_t i;
+ struct qb_poll_entry *pe;
+
+ /*
+ * Find file descriptor to modify events and dispatch function
+ */
+ for (i = 0; i < my_src->poll_entry_count; i++) {
+ pe = &my_src->poll_entries[i];
+ if (pe->ufd.fd == fd) {
+ my_src->ufds[i].fd = -1;
+ my_src->ufds[i].events = 0;
+ my_src->ufds[i].revents = 0;
+ pe->ufd.fd = -1;
+ pe->ufd.events = 0;
+ pe->ufd.revents = 0;
+ return 0;
+ }
+ }
+
+ return -EBADF;
+}
+
+
diff --git a/lib/loop_timer.c b/lib/loop_timer.c
new file mode 100644
index 0000000..a1da762
--- /dev/null
+++ b/lib/loop_timer.c
@@ -0,0 +1,136 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+#include <pthread.h>
+
+#include <qb/qbdefs.h>
+#include <qb/qblist.h>
+#include <qb/qbloop.h>
+#include "loop_int.h"
+#include "tlist.h"
+
+struct qb_loop_timer {
+ struct qb_loop_item item;
+ qb_loop_timer_dispatch_fn dispatch_fn;
+ enum qb_loop_priority p;
+};
+
+struct qb_timer_source {
+ struct qb_loop_source s;
+ struct timerlist timerlist;
+};
+
+static struct qb_timer_source * my_src;
+
+static void timer_dispatch(struct qb_loop_item * item,
+ enum qb_loop_priority p)
+{
+ struct qb_loop_timer *timer = (struct qb_loop_timer *)item;
+
+ timer->dispatch_fn(timer->item.user_data);
+ free(timer);
+}
+
+static int32_t expired_timers;
+static void make_job_from_tmo(void *data)
+{
+ struct qb_loop_timer *t = (struct qb_loop_timer *)data;
+ struct qb_loop *l = t->item.source->l;
+ qb_list_init(&t->item.list);
+ qb_list_add_tail(&t->item.list, &l->level[t->p].job_head);
+ expired_timers++;
+}
+
+static int32_t expire_the_timers(struct qb_loop_source* s, int32_t ms_timeout)
+{
+ struct qb_timer_source *ts = (struct qb_timer_source *)s;
+ expired_timers = 0;
+ timerlist_expire(&ts->timerlist);
+ return expired_timers;
+}
+
+
+int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_source)
+{
+ uint64_t left = timerlist_msec_duration_to_expire(&my_src->timerlist);
+ if (left != -1 && left > 0xFFFFFFFF) {
+ left = 0xFFFFFFFE;
+ }
+ return left;
+}
+
+struct qb_loop_source*
+qb_loop_timer_init(struct qb_loop *l)
+{
+ my_src = malloc(sizeof(struct qb_timer_source));
+ my_src->s.l = l;
+ my_src->s.dispatch_and_take_back = timer_dispatch;
+ my_src->s.poll = expire_the_timers;
+
+ qb_list_init(&my_src->s.list);
+ qb_list_add_tail(&my_src->s.list, &l->source_head);
+
+ timerlist_init(&my_src->timerlist);
+
+ return (struct qb_loop_source*)my_src;
+}
+
+int32_t qb_loop_timer_add(struct qb_loop *l,
+ enum qb_loop_priority p,
+ int32_t msec_duration,
+ void *data,
+ qb_loop_timer_dispatch_fn timer_fn,
+ qb_loop_timer_handle * timer_handle_out)
+{
+ struct qb_loop_timer *t;
+
+ if (timer_handle_out == NULL) {
+ return -ENOENT;
+ }
+ t = malloc(sizeof(struct qb_loop_timer));
+ t->item.user_data = data;
+ t->item.source = (struct qb_loop_source*)my_src;
+ t->dispatch_fn = timer_fn;
+ t->p = p;
+ qb_list_init(&t->item.list);
+
+ timerlist_add_duration(&my_src->timerlist,
+ make_job_from_tmo, t,
+ ((uint64_t)msec_duration) * QB_TIME_NS_IN_MSEC,
+ timer_handle_out);
+
+ return 0;
+}
+
+int32_t qb_loop_timer_del(struct qb_loop *l, qb_loop_timer_handle th)
+{
+ int32_t res = 0;
+
+ if (th == NULL) {
+ return -EINVAL;
+ }
+
+ timerlist_del(&my_src->timerlist, (void *)th);
+
+ return (res);
+}
+
+
diff --git a/lib/poll.c b/lib/poll.c
deleted file mode 100644
index 3e223c0..0000000
--- a/lib/poll.c
+++ /dev/null
@@ -1,631 +0,0 @@
-/*
- * Copyright (C) 2006-2010 Red Hat, Inc.
- *
- * Author: Steven Dake <sdake@redhat.com>
- *
- * This file is part of libqb.
- *
- * libqb is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 2.1 of the License, or
- * (at your option) any later version.
- *
- * libqb is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with libqb. If not, see <http://www.gnu.org/licenses/>.
- */
-#include "os_base.h"
-
-#include <pthread.h>
-#include <sys/poll.h>
-
-#include <qb/qbdefs.h>
-#include <qb/qbhdb.h>
-#include <qb/qbpoll.h>
-#include <qb/qblist.h>
-#include "tlist.h"
-#include "util_int.h"
-#include <sys/resource.h>
-
-typedef int32_t (*dispatch_fn_t) (qb_handle_t hdb_handle, int32_t fd, int32_t revents,
- void *data);
-
-struct qb_poll_entry {
- struct pollfd ufd;
- dispatch_fn_t dispatch_fn;
- void *data;
-};
-
-struct qb_poll_job {
- qb_poll_job_execute_fn_t execute_fn;
- void *data;
- struct qb_list_head list;
-};
-
-struct qb_poll_instance {
- struct qb_poll_entry *poll_entries;
- struct pollfd *ufds;
- int32_t poll_entry_count;
- struct timerlist timerlist;
- int32_t stop_requested;
- int32_t pipefds[2];
- qb_poll_low_fds_event_fn low_fds_event_fn;
- int32_t not_enough_fds;
- struct qb_list_head job_list;
-};
-
-QB_HDB_DECLARE(poll_instance_database, NULL);
-
-static int dummy_dispatch_fn (qb_handle_t handle, int fd, int revents, void *data) {
- return (0);
-}
-
-qb_handle_t qb_poll_create(void)
-{
- qb_handle_t handle;
- struct qb_poll_instance *poll_instance;
- int32_t res;
-
- res = qb_hdb_handle_create(&poll_instance_database,
- sizeof(struct qb_poll_instance), &handle);
- if (res != 0) {
- goto error_exit;
- }
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_destroy;
- }
-
- poll_instance->poll_entries = 0;
- poll_instance->ufds = 0;
- poll_instance->poll_entry_count = 0;
- poll_instance->stop_requested = 0;
- timerlist_init(&poll_instance->timerlist);
- poll_instance->not_enough_fds = 0;
- qb_list_init(&poll_instance->job_list);
-
- res = pipe (poll_instance->pipefds);
- if (res != 0) {
- goto error_destroy;
- }
-
- /*
- * Allow changes in modify to propogate into new poll instance
- */
- res = qb_poll_dispatch_add (
- handle,
- poll_instance->pipefds[0],
- POLLIN,
- NULL,
- dummy_dispatch_fn);
- if (res != 0) {
- goto error_destroy;
- }
-
- return (handle);
-
-error_destroy:
- qb_hdb_handle_destroy(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_destroy(qb_handle_t handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- close(poll_instance->pipefds[0]);
- close(poll_instance->pipefds[1]);
- free(poll_instance->poll_entries);
- free(poll_instance->ufds);
-
- qb_hdb_handle_destroy(&poll_instance_database, handle);
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_dispatch_add(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- void *data,
- int32_t (*dispatch_fn) (qb_handle_t hdb_handle_t,
- int32_t fd, int32_t revents, void *data))
-{
- struct qb_poll_instance *poll_instance;
- struct qb_poll_entry *poll_entries;
- struct pollfd *ufds;
- int32_t found = 0;
- int32_t install_pos;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (found = 0, install_pos = 0;
- install_pos < poll_instance->poll_entry_count; install_pos++) {
- if (poll_instance->poll_entries[install_pos].ufd.fd == -1) {
- found = 1;
- break;
- }
- }
-
- if (found == 0) {
- /*
- * Grow pollfd list
- */
- poll_entries =
- (struct qb_poll_entry *)realloc(poll_instance->poll_entries,
- (poll_instance->
- poll_entry_count +
- 1) *
- sizeof(struct
- qb_poll_entry));
- if (poll_entries == NULL) {
- res = -ENOMEM;
- goto error_put;
- }
- poll_instance->poll_entries = poll_entries;
-
- ufds = (struct pollfd *)realloc(poll_instance->ufds,
- (poll_instance->poll_entry_count
- + 1) * sizeof(struct pollfd));
- if (ufds == NULL) {
- res = -ENOMEM;
- goto error_put;
- }
- poll_instance->ufds = ufds;
-
- poll_instance->poll_entry_count += 1;
- install_pos = poll_instance->poll_entry_count - 1;
- }
-
- /*
- * Install new dispatch handler
- */
- poll_instance->poll_entries[install_pos].ufd.fd = fd;
- poll_instance->poll_entries[install_pos].ufd.events = events;
- poll_instance->poll_entries[install_pos].ufd.revents = 0;
- poll_instance->poll_entries[install_pos].dispatch_fn = dispatch_fn;
- poll_instance->poll_entries[install_pos].data = data;
-
-error_put:
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_dispatch_modify(qb_handle_t handle,
- int32_t fd,
- int32_t events,
- int32_t (*dispatch_fn) (qb_handle_t hdb_handle_t,
- int32_t fd,
- int32_t revents, void *data))
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- /*
- * Find file descriptor to modify events and dispatch function
- */
- res = -EBADF;
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd == fd) {
- int change_notify = 0;
-
- if (poll_instance->poll_entries[i].ufd.events != events) {
- change_notify = 1;
- }
- poll_instance->poll_entries[i].ufd.events = events;
- poll_instance->poll_entries[i].dispatch_fn =
- dispatch_fn;
- if (change_notify) {
- char buf = 1;
- write (poll_instance->pipefds[1], &buf, 1);
- }
-
- res = 0;
- break;
- }
- }
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_dispatch_delete(qb_handle_t handle, int32_t fd)
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- int32_t res = 0;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- /*
- * Find dispatch fd to delete
- */
- res = -EBADF;
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd == fd) {
- poll_instance->ufds[i].fd = -1;
- poll_instance->poll_entries[i].ufd.fd = -1;
- poll_instance->poll_entries[i].ufd.revents = 0;
-
- res = 0;
- break;
- }
- }
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_timer_add(qb_handle_t handle,
- int32_t msec_duration, void *data,
- void (*timer_fn) (void *data),
- qb_poll_timer_handle * timer_handle_out)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
-
- if (timer_handle_out == NULL) {
- res = -ENOENT;
- goto error_exit;
- }
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- timerlist_add_duration(&poll_instance->timerlist,
- timer_fn, data,
- ((uint64_t)msec_duration) * 1000000ULL,
- timer_handle_out);
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_timer_delete(qb_handle_t handle, qb_poll_timer_handle th)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
-
- if (th == 0) {
- return (0);
- }
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- timerlist_del(&poll_instance->timerlist, (void *)th);
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_job_add(qb_handle_t poll_handle,
- void *data,
- qb_poll_job_execute_fn_t execute_fn,
- qb_poll_job_handle * handle_out)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
- struct qb_poll_job *job;
-
- if (handle_out == NULL) {
- res = -ENOENT;
- goto error_exit;
- }
-
- res = qb_hdb_handle_get(&poll_instance_database, poll_handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
- job = malloc(sizeof(struct qb_poll_job));
- job->execute_fn = execute_fn;
- job->data = data;
- qb_list_init(&job->list);
- qb_list_add(&job->list, &poll_instance->job_list);
- handle_out = (qb_poll_job_handle)job;
-
- qb_hdb_handle_put(&poll_instance_database, poll_handle);
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_job_delete(qb_handle_t poll_handle, qb_poll_job_handle job_handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res = 0;
- struct qb_poll_job *job = (struct qb_poll_job *)job_handle;
-
- if (job_handle == NULL) {
- res = -ENOENT;
- goto error_exit;
- }
-
- res = qb_hdb_handle_get(&poll_instance_database, poll_handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
- qb_list_del(&job->list);
- free(job);
-
- qb_hdb_handle_put(&poll_instance_database, poll_handle);
-error_exit:
- return (res);
-}
-
-int32_t qb_poll_stop(qb_handle_t handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t res;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- poll_instance->stop_requested = 1;
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-error_exit:
- return (res);
-}
-
-static int32_t _qb_poll_job_run(struct qb_poll_instance *poll_instance)
-{
- struct qb_poll_job *job = NULL;
- struct qb_list_head *iter;
- size_t jobs_run = 0;
-
- for (iter = poll_instance->job_list.next;
- iter != &poll_instance->job_list;
- iter = iter->next) {
- job = qb_list_entry(iter, struct qb_poll_job, list);
- if (job == NULL) {
- continue;
- }
- jobs_run += job->execute_fn(job->data);
- if (jobs_run > 10) {
- break;
- }
- }
- return (jobs_run > 0);
-}
-
-int32_t qb_poll_low_fds_event_set(
- qb_handle_t handle,
- qb_poll_low_fds_event_fn fn)
-{
- struct qb_poll_instance *poll_instance;
-
- if (qb_hdb_handle_get (&poll_instance_database, handle,
- (void *)&poll_instance) != 0) {
- return -ENOENT;
- }
-
- poll_instance->low_fds_event_fn = fn;
-
- qb_hdb_handle_put (&poll_instance_database, handle);
- return 0;
-}
-
-/* logs, std(in|out|err), pipe */
-#define POLL_FDS_USED_MISC 50
-
-static void poll_fds_usage_check(struct qb_poll_instance *poll_instance)
-{
- struct rlimit lim;
- static int32_t socks_limit = 0;
- int32_t send_event = 0;
- int32_t socks_used = 0;
- int32_t socks_avail = 0;
- int32_t i;
-
- if (socks_limit == 0) {
- if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
- char error_str[100];
- strerror_r(errno, error_str, 100);
- printf("getrlimit: %s\n", error_str);
- return;
- }
- socks_limit = lim.rlim_cur;
- socks_limit -= POLL_FDS_USED_MISC;
- if (socks_limit < 0) {
- socks_limit = 0;
- }
- }
-
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd != -1) {
- socks_used++;
- }
- }
- socks_avail = socks_limit - socks_used;
- if (socks_avail < 0) {
- socks_avail = 0;
- }
- send_event = 0;
- if (poll_instance->not_enough_fds) {
- if (socks_avail > 2) {
- poll_instance->not_enough_fds = 0;
- send_event = 1;
- }
- } else {
- if (socks_avail <= 1) {
- poll_instance->not_enough_fds = 1;
- send_event = 1;
- }
- }
- if (send_event) {
- poll_instance->low_fds_event_fn(poll_instance->not_enough_fds,
- socks_avail);
- }
-}
-
-
-int32_t qb_poll_run(qb_handle_t handle)
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- uint64_t expire_timeout_msec = -1;
- int32_t res;
- int32_t poll_entry_count;
- int32_t job_executed;
-
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- goto error_exit;
- }
-
- for (;;) {
-rebuild_poll:
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- memcpy(&poll_instance->ufds[i],
- &poll_instance->poll_entries[i].ufd,
- sizeof(struct pollfd));
- }
- job_executed = _qb_poll_job_run(poll_instance);
-
- if (job_executed == QB_TRUE) {
- expire_timeout_msec = 0;
- } else {
- expire_timeout_msec =
- timerlist_msec_duration_to_expire
- (&poll_instance->timerlist);
- if (!qb_list_empty(&poll_instance->job_list) &&
- expire_timeout_msec > 50) {
- expire_timeout_msec = 50;
- }
- }
- poll_fds_usage_check(poll_instance);
-
- if (expire_timeout_msec != -1
- && expire_timeout_msec > 0xFFFFFFFF) {
- expire_timeout_msec = 0xFFFFFFFE;
- }
-
-retry_poll:
- if (poll_instance->stop_requested) {
- return (0);
- }
- res = poll(poll_instance->ufds,
- poll_instance->poll_entry_count,
- expire_timeout_msec);
- if (poll_instance->stop_requested) {
- return (0);
- }
- if (errno == EINTR && res == -1) {
- goto retry_poll;
- } else if (res == -1) {
- res = -errno;
- goto error_exit;
- }
-
- if (poll_instance->ufds[0].revents) {
- char buf;
- read (poll_instance->ufds[0].fd, &buf, 1);
- goto rebuild_poll;
- }
- poll_entry_count = poll_instance->poll_entry_count;
- for (i = 0; i < poll_entry_count; i++) {
- if (poll_instance->ufds[i].fd != -1 &&
- poll_instance->ufds[i].revents) {
-
- res =
- poll_instance->poll_entries[i].
- dispatch_fn(handle,
- poll_instance->ufds[i].fd,
- poll_instance->ufds[i].revents,
- poll_instance->poll_entries[i].
- data);
-
- /*
- * Remove dispatch functions that return -1
- */
- if (res < 0) {
- poll_instance->poll_entries[i].ufd.fd = -1; /* empty entry */
- }
- }
- }
- timerlist_expire(&poll_instance->timerlist);
- } /* for (;;) */
-
- qb_hdb_handle_put(&poll_instance_database, handle);
-error_exit:
- return res;
-}
-
-#ifdef COMPILE_OUT
-void qb_poll_print_state(qb_handle_t handle, int32_t fd)
-{
- struct qb_poll_instance *poll_instance;
- int32_t i;
- int32_t res = 0;
- res = qb_hdb_handle_get(&poll_instance_database, handle,
- (void *)&poll_instance);
- if (res != 0) {
- res = -ENOENT;
- exit(1);
- }
-
- for (i = 0; i < poll_instance->poll_entry_count; i++) {
- if (poll_instance->poll_entries[i].ufd.fd == fd) {
- printf("fd %d\n",
- poll_instance->poll_entries[i].ufd.fd);
- printf("events %d\n",
- poll_instance->poll_entries[i].ufd.events);
- printf("dispatch_fn %p\n",
- poll_instance->poll_entries[i].dispatch_fn);
- }
- }
-}
-
-#endif
diff --git a/libqb.spec.in b/libqb.spec.in
index 9997f2c..f3ac4ec 100644
--- a/libqb.spec.in
+++ b/libqb.spec.in
@@ -1,84 +1,84 @@
%global alphatag @alphatag@
Name: libqb
Summary: The Quarterback Client Server Developer Library
Version: @version@
Release: 1%{?alphatag:.%{alphatag}}%{?dist}
License: LGPL-2.1
Group: System Environment/Libraries
URL: http://www.libqb.org
Source0: https://fedorahosted.org/releases/q/u/quarterback/%{name}-%{version}.tar.gz
BuildRequires: autoconf automake
BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX)
%prep
%setup -q -n %{name}-%{version}
./autogen.sh
%{configure}
%build
make %{_smp_mflags}
%install
rm -rf %{buildroot}
make install DESTDIR=%{buildroot}
## tree fixup
# drop static libs
rm -f %{buildroot}%{_libdir}/*.a
# drop docs and html docs for now
rm -rf %{buildroot}%{_docdir}/*
%clean
rm -rf %{buildroot}
%description
This package contains libqb libraries.
%files -n libqb
%defattr(-,root,root,-)
%doc COPYING
%{_libdir}/libqb.so.*
%post -n libqb -p /sbin/ldconfig
%postun -n libqb -p /sbin/ldconfig
%package -n libqb-devel
Summary: The Quarterback Development Kit
Group: Development/Libraries
Requires: libqb = %{version}-%{release}
Requires: pkgconfig
Provides: libqb-devel = %{version}
%description -n libqb-devel
This package contains include files and man pages used to develop using
The Quarterback APIs.
%files -n libqb-devel
%defattr(-,root,root,-)
%doc COPYING README
%dir %{_includedir}/qb/
%{_libdir}/libqb.so
%{_libdir}/libqb.la
%{_libdir}/pkgconfig/libqb.pc
-%{_includedir}/qb/qbpoll.h
+%{_includedir}/qb/qbloop.h
%{_includedir}/qb/qbhdb.h
%{_includedir}/qb/qblist.h
%{_includedir}/qb/qbtimer.h
%{_includedir}/qb/qbipcc.h
%{_includedir}/qb/qbipcs.h
%{_includedir}/qb/qbipc_common.h
%{_includedir}/qb/qbrb.h
%{_includedir}/qb/qbutil.h
%{_includedir}/qb/qbdefs.h
%{_mandir}/man3/qb*3*
%changelog
* @date@ Autotools generated version <nobody@nowhere.org> - @version@-1.@alphatag@
- Autotools generated version
diff --git a/tests/.gitignore b/tests/.gitignore
index a8c14a8..cb9f7ec 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -1,9 +1,10 @@
check_plugin
check_ipc
check_hash
check_rb
bmc
bmcpt
bms
rbreader
rbwriter
+loop
diff --git a/tests/Makefile.am b/tests/Makefile.am
index cfc4d5f..7b78cb0 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1,58 +1,63 @@
# Copyright (c) 2010 Red Hat, Inc.
#
# Authors: Angus Salkeld <asalkeld@redhat.com>
#
# This file is part of libqb.
#
# libqb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# libqb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with libqb. If not, see <http://www.gnu.org/licenses/>.
#
-noinst_PROGRAMS = bmc bmcpt bms rbwriter rbreader
+noinst_PROGRAMS = bmc bmcpt bms rbwriter rbreader loop
bmc_SOURCES = bmc.c $(top_builddir)/include/qb/qbipcc.h
bmc_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bmc_LDADD = -lrt $(top_builddir)/lib/libqb.la
bmcpt_SOURCES = bmcpt.c $(top_builddir)/include/qb/qbipcc.h
bmcpt_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bmcpt_LDADD = -lrt $(top_builddir)/lib/libqb.la
bms_SOURCES = bms.c $(top_builddir)/include/qb/qbipcs.h
bms_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bms_LDADD = -lrt $(top_builddir)/lib/libqb.la
rbwriter_SOURCES = rbwriter.c $(top_builddir)/include/qb/qbrb.h
rbwriter_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
rbwriter_LDADD = -lrt $(top_builddir)/lib/libqb.la
rbreader_SOURCES = rbreader.c $(top_builddir)/include/qb/qbrb.h
rbreader_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
rbreader_LDADD = -lrt $(top_builddir)/lib/libqb.la
+loop_SOURCES = loop.c $(top_builddir)/include/qb/qbloop.h
+loop_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
+loop_LDADD = -lrt $(top_builddir)/lib/libqb.la
+
+
if HAVE_CHECK
TESTS = check_rb check_ipc
check_PROGRAMS = check_rb check_ipc
check_rb_SOURCES = check_rb.c $(top_builddir)/include/qb/qbrb.h
check_rb_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
check_rb_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
check_ipc_SOURCES = check_ipc.c $(top_builddir)/include/qb/qbipcc.h $(top_builddir)/include/qb/qbipcs.h
check_ipc_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
check_ipc_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
endif
diff --git a/tests/bms.c b/tests/bms.c
index 0c64189..e891d6d 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -1,192 +1,192 @@
/*
* Copyright (c) 2006-2009 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Steven Dake <sdake@redhat.com>
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <sched.h>
#include <time.h>
#include <stdarg.h>
#include <sched.h>
#include <qb/qbutil.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#include <qb/qbipcs.h>
int32_t blocking = 1;
int32_t verbose = 0;
-static qb_handle_t bms_poll_handle;
+static qb_loop_t *bms_loop;
static qb_ipcs_service_pt s1;
static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
{
#if 0
if (uid == 0 && gid == 0) {
if (verbose) {
printf("%s:%d %s authenticated connection\n",
__FILE__, __LINE__, __func__);
}
return 1;
}
printf("%s:%d %s() BAD user!\n", __FILE__, __LINE__, __func__);
return 0;
#else
return 0;
#endif
}
static void s1_connection_created_fn(qb_ipcs_connection_t *c)
{
if (verbose) {
printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
}
}
static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
{
if (verbose) {
printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
}
}
static void s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response;
ssize_t res;
if (verbose > 2) {
printf("%s:%d %s > msg:%d, size:%d\n",
__FILE__, __LINE__, __func__,
req_pt->id, req_pt->size);
}
response.size = sizeof(struct qb_ipc_response_header);
response.id = 13;
response.error = 0;
if (blocking == 1) {
res = qb_ipcs_response_send(c, &response,
sizeof(response));
if (res < 0) {
perror("qb_ipcs_response_send");
}
}
}
static void ipc_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
fprintf(stderr, "%s:%d [%d] %s\n", file_name, file_line, severity, msg);
}
static void sigusr1_handler(int32_t num)
{
printf("%s(%d)\n", __func__, num);
qb_ipcs_destroy(s1);
exit(0);
}
static void show_usage(const char *name)
{
printf("usage: \n");
printf("%s <options>\n", name);
printf("\n");
printf(" options:\n");
printf("\n");
printf(" -n non-blocking ipc (default blocking)\n");
printf(" -v verbose\n");
printf(" -h show this help text\n");
printf(" -m use shared memory\n");
printf(" -p use posix message queues\n");
printf(" -s use sysv message queues\n");
printf("\n");
}
int32_t main(int32_t argc, char *argv[])
{
const char *options = "nvhmps";
int32_t opt;
enum qb_ipc_type ipc_type = QB_IPC_SHM;
struct qb_ipcs_service_handlers sh = {
.connection_accept = s1_connection_accept_fn,
.connection_created = s1_connection_created_fn,
.msg_process = s1_msg_process_fn,
.connection_destroyed = s1_connection_destroyed_fn,
};
while ((opt = getopt(argc, argv, options)) != -1) {
switch (opt) {
case 'm':
ipc_type = QB_IPC_SHM;
break;
case 's':
ipc_type = QB_IPC_SYSV_MQ;
break;
case 'p':
ipc_type = QB_IPC_POSIX_MQ;
break;
case 'n': /* non-blocking */
blocking = 0;
break;
case 'v':
verbose++;
break;
case 'h':
default:
show_usage(argv[0]);
exit(0);
break;
}
}
signal(SIGINT, sigusr1_handler);
signal(SIGILL, sigusr1_handler);
signal(SIGTERM, sigusr1_handler);
qb_util_set_log_function(ipc_log_fn);
- bms_poll_handle = qb_poll_create();
+ bms_loop = qb_loop_create();
s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
if (s1 == 0) {
perror("qb_ipcs_create");
exit(1);
}
- qb_ipcs_run(s1, bms_poll_handle);
- qb_poll_run(bms_poll_handle);
+ qb_ipcs_run(s1, bms_loop);
+ qb_loop_run(bms_loop);
return EXIT_SUCCESS;
}
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 6a44adc..19d013b 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -1,363 +1,366 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <syslog.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <signal.h>
#include <check.h>
#include <qb/qbipcc.h>
#include <qb/qbipcs.h>
-#include <qb/qbpoll.h>
+#include <qb/qbloop.h>
#define IPC_NAME "ipc_test"
#define MAX_MSG_SIZE (8192*16)
static qb_ipcc_connection_t *conn;
static enum qb_ipc_type ipc_type;
enum my_msg_ids {
IPC_MSG_REQ_TX_RX,
IPC_MSG_RES_TX_RX,
IPC_MSG_REQ_DISPATCH,
IPC_MSG_RES_DISPATCH
};
/* Test Cases
*
* 1) basic send & recv differnet message sizes
*
* 2) send message to start dispatch (confirm receipt)
*
* 3) flow control
*
* 4) authentication
*
* 5) thread safety
*
* 6) cleanup
*
* 7) service availabilty
*
* 8) multiple services
*/
-static qb_handle_t bms_poll_handle;
+static qb_loop_t *my_loop;
static qb_ipcs_service_pt s1;
static void sigterm_handler(int32_t num)
{
qb_ipcs_destroy(s1);
- qb_poll_stop(bms_poll_handle);
+ qb_loop_stop(my_loop);
exit(0);
}
static void s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
{
struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
struct qb_ipc_response_header response;
ssize_t res;
if (req_pt->id == IPC_MSG_REQ_TX_RX) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_TX_RX;
response.error = 0;
res = qb_ipcs_response_send(c, &response,
sizeof(response));
if (res < 0) {
perror("qb_ipcs_response_send");
}
} else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
response.size = sizeof(struct qb_ipc_response_header);
response.id = IPC_MSG_RES_DISPATCH;
response.error = 0;
res = qb_ipcs_event_send(c, &response,
sizeof(response));
if (res < 0) {
perror("qb_ipcs_dispatch_send");
}
}
}
static void ipc_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
if (severity < LOG_INFO)
fprintf(stderr, "%s:%d [%d] %s\n", file_name, file_line, severity, msg);
}
static void run_ipc_server(void)
{
int32_t res;
struct qb_ipcs_service_handlers sh = {
.connection_accept = NULL,
.connection_created = NULL,
.msg_process = s1_msg_process_fn,
.connection_destroyed = NULL,
};
signal(SIGTERM, sigterm_handler);
- bms_poll_handle = qb_poll_create();
+ my_loop = qb_loop_create();
s1 = qb_ipcs_create(IPC_NAME, 4, ipc_type, &sh);
fail_if(s1 == 0);
- res = qb_ipcs_run(s1, bms_poll_handle);
+ res = qb_ipcs_run(s1, my_loop);
ck_assert_int_eq(res, 0);
- qb_poll_run(bms_poll_handle);
+ qb_loop_run(my_loop);
}
static int32_t run_function_in_new_process(void (*run_ipc_server_fn)(void))
{
pid_t pid = fork ();
if (pid == -1) {
fprintf (stderr, "Can't fork\n");
return -1;
}
if (pid == 0) {
run_ipc_server_fn();
return 0;
}
return pid;
}
static int32_t stop_process(pid_t pid)
{
kill(pid, SIGTERM);
waitpid(pid, NULL, 0);
return 0;
}
-
-static char buffer[1024 * 1024];
+#define IPC_BUF_SIZE (1024 * 1024)
+static char buffer[IPC_BUF_SIZE];
static int32_t bmc_send_nozc(uint32_t size)
{
struct qb_ipc_request_header *req_header = (struct qb_ipc_request_header *)buffer;
struct qb_ipc_response_header res_header;
int32_t res;
req_header->id = IPC_MSG_REQ_TX_RX;
req_header->size = sizeof(struct qb_ipc_request_header) + size;
repeat_send:
res = qb_ipcc_send(conn, req_header, req_header->size);
if (res < 0) {
if (res == -EAGAIN || res == -ENOMEM) {
goto repeat_send;
} else if (res == -EINVAL || res == -EINTR) {
perror("qb_ipcc_send");
return -1;
} else {
errno = -res;
perror("qb_ipcc_send");
goto repeat_send;
}
}
repeat_recv:
res = qb_ipcc_recv(conn,
&res_header,
sizeof(struct qb_ipc_response_header));
if (res == -EAGAIN) {
goto repeat_recv;
}
if (res == -EINTR) {
return -1;
}
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header.id, IPC_MSG_RES_TX_RX);
ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
return 0;
}
static void test_ipc_txrx(void)
{
int32_t j;
int32_t c = 0;
size_t size;
pid_t pid;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
for (j = 1; j < 19; j++) {
size = (10 * j * j * j) + sizeof(struct qb_ipc_request_header);
if (size >= MAX_MSG_SIZE)
break;
if (bmc_send_nozc(size) == -1) {
break;
}
}
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_txrx_shm)
{
ipc_type = QB_IPC_SHM;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_pmq)
{
ipc_type = QB_IPC_POSIX_MQ;
test_ipc_txrx();
}
END_TEST
START_TEST(test_ipc_txrx_smq)
{
ipc_type = QB_IPC_SYSV_MQ;
test_ipc_txrx();
}
END_TEST
static void test_ipc_dispatch(void)
{
int32_t res;
int32_t j;
int32_t c = 0;
pid_t pid;
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header *res_header = (struct qb_ipc_response_header*)buffer;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(IPC_NAME, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
req_header.id = IPC_MSG_REQ_DISPATCH;
req_header.size = sizeof(struct qb_ipc_request_header);
repeat_send:
res = qb_ipcc_send(conn, &req_header, req_header.size);
if (res < 0) {
if (res == -EAGAIN || res == -ENOMEM) {
goto repeat_send;
} else if (res == -EINVAL || res == -EINTR) {
perror("qb_ipcc_send");
return;
} else {
errno = -res;
perror("qb_ipcc_send");
goto repeat_send;
}
}
repeat_event_recv:
res = qb_ipcc_event_recv(conn, res_header, IPC_BUF_SIZE, 0);
if (res < 0) {
if (res == -EAGAIN) {
goto repeat_event_recv;
} else {
errno = -res;
perror("qb_ipcc_send");
goto repeat_send;
}
}
ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
ck_assert_int_eq(res_header->id, IPC_MSG_RES_DISPATCH);
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_disp_shm)
{
ipc_type = QB_IPC_SHM;
test_ipc_dispatch();
}
END_TEST
static Suite *ipc_suite(void)
{
TCase *tc;
+ uid_t uid;
Suite *s = suite_create("ipc");
tc = tcase_create("ipc_txrx_shm");
tcase_add_test(tc, test_ipc_txrx_shm);
tcase_set_timeout(tc, 6);
suite_add_tcase(s, tc);
-#if 0
- tc = tcase_create("ipc_txrx_posix_mq");
- tcase_add_test(tc, test_ipc_txrx_pmq);
- tcase_set_timeout(tc, 10);
- suite_add_tcase(s, tc);
-#endif
- tc = tcase_create("ipc_txrx_sysv_mq");
- tcase_add_test(tc, test_ipc_txrx_smq);
- tcase_set_timeout(tc, 10);
- suite_add_tcase(s, tc);
+ uid = geteuid();
+ if (uid == 0) {
+ tc = tcase_create("ipc_txrx_posix_mq");
+ tcase_add_test(tc, test_ipc_txrx_pmq);
+ tcase_set_timeout(tc, 10);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("ipc_txrx_sysv_mq");
+ tcase_add_test(tc, test_ipc_txrx_smq);
+ tcase_set_timeout(tc, 10);
+ suite_add_tcase(s, tc);
+ }
tc = tcase_create("ipc_dispatch_shm");
tcase_add_test(tc, test_ipc_disp_shm);
tcase_set_timeout(tc, 16);
suite_add_tcase(s, tc);
return s;
}
int32_t main(void)
{
int32_t number_failed;
Suite *s = ipc_suite();
SRunner *sr = srunner_create(s);
qb_util_set_log_function(ipc_log_fn);
srunner_run_all(sr, CK_NORMAL);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/check_rb.c b/tests/check_rb.c
index c35bebb..11c2097 100644
--- a/tests/check_rb.c
+++ b/tests/check_rb.c
@@ -1,227 +1,229 @@
/*
* Copyright (c) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
#include <errno.h>
#include <check.h>
+
+#include <qb/qbdefs.h>
#include <qb/qbrb.h>
#include <qb/qbipc_common.h>
#include <qb/qbutil.h>
START_TEST(test_ring_buffer1)
{
char my_buf[512];
struct qb_ipc_request_header *hdr;
char *str;
qb_ringbuffer_t *rb;
int32_t i;
int32_t b;
ssize_t actual;
ssize_t avail;
rb = qb_rb_open("test1", 200, QB_RB_FLAG_CREATE);
fail_if(rb == NULL);
for (b = 0; b < 3; b++) {
hdr = (struct qb_ipc_request_header *) my_buf;
str = my_buf + sizeof(struct qb_ipc_request_header);
for (i = 0; i < 900; i++) {
hdr->id = __LINE__ + i;
hdr->size =
sprintf(str, "ID: %d (%s + i(%d)) -- %s-%s!",
hdr->id, "actually the line number", i,
__func__, __FILE__) + 1;
hdr->size += sizeof(struct qb_ipc_request_header);
avail = qb_rb_space_free(rb);
actual = qb_rb_chunk_write(rb, hdr, hdr->size);
if (avail < (hdr->size + (2 * sizeof(uint32_t)))) {
ck_assert_int_eq(actual, -ENOMEM);
} else {
ck_assert_int_eq(actual, hdr->size);
}
}
memset(my_buf, 0, sizeof(my_buf));
hdr = (struct qb_ipc_request_header *) my_buf;
str = my_buf + sizeof(struct qb_ipc_request_header);
for (i = 0; i < 15; i++) {
actual = qb_rb_chunk_read(rb, hdr, 512, 0);
if (actual < 0) {
ck_assert_int_eq(0, qb_rb_chunks_used(rb));
break;
}
str[actual - sizeof(struct qb_ipc_request_header)] = '\0';
ck_assert_int_eq(actual, hdr->size);
}
}
qb_rb_close(rb, QB_FALSE);
}
END_TEST
/*
* nice size (int64)
*/
START_TEST(test_ring_buffer2)
{
qb_ringbuffer_t *t;
int32_t i;
int64_t v = 7891034;
int64_t *new_data;
ssize_t l;
t = qb_rb_open("test2", 200 * sizeof(int64_t), QB_RB_FLAG_CREATE);
fail_if(t == NULL);
for (i = 0; i < 200; i++) {
l = qb_rb_chunk_write(t, &v, sizeof(v));
ck_assert_int_eq(l, sizeof(v));
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l < 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, sizeof(v));
fail_unless(v == *new_data);
qb_rb_chunk_reclaim(t);
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_write(t, &v, sizeof(v));
ck_assert_int_eq(l, sizeof(v));
}
for (i = 0; i < 100; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l == 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, sizeof(v));
fail_unless(v == *new_data);
qb_rb_chunk_reclaim(t);
}
qb_rb_close(t, QB_FALSE);
}
END_TEST
/*
* odd size (10)
*/
START_TEST(test_ring_buffer3)
{
qb_ringbuffer_t *t;
int32_t i;
char v[] = "1234567891";
char out[32];
ssize_t l;
size_t len = strlen(v) + 1;
t = qb_rb_open("test3", 10, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE);
fail_if(t == NULL);
for (i = 0; i < 9000; i++) {
l = qb_rb_chunk_write(t, v, len);
ck_assert_int_eq(l, len);
}
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_read(t, (void *)out, 32, 0);
if (l < 0) {
/* no more to read */
break;
}
ck_assert_int_eq(l, len);
ck_assert_str_eq(v, out);
}
qb_rb_close(t, QB_FALSE);
}
END_TEST
START_TEST(test_ring_buffer4)
{
qb_ringbuffer_t *t;
char data[] = "1234567891";
int32_t i;
char *new_data;
ssize_t l;
t = qb_rb_open("test4", 10, QB_RB_FLAG_CREATE | QB_RB_FLAG_OVERWRITE);
fail_if(t == NULL);
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_write(t, data, strlen(data));
ck_assert_int_eq(l, strlen(data));
if (i == 0) {
data[0] = 'b';
}
}
for (i = 0; i < 2000; i++) {
l = qb_rb_chunk_peek(t, (void **)&new_data, 0);
if (l == 0) {
break;
}
ck_assert_int_eq(l, strlen(data));
qb_rb_chunk_reclaim(t);
}
qb_rb_close(t, QB_FALSE);
}
END_TEST
static Suite *rb_suite(void)
{
TCase *tc_load;
Suite *s = suite_create("ringbuffer");
tc_load = tcase_create("test01");
tcase_add_test(tc_load, test_ring_buffer1);
tcase_add_test(tc_load, test_ring_buffer2);
tcase_add_test(tc_load, test_ring_buffer3);
tcase_add_test(tc_load, test_ring_buffer4);
suite_add_tcase(s, tc_load);
return s;
}
static void libqb_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
if (severity < LOG_INFO)
printf("libqb: %s:%d %s\n", file_name, file_line, msg);
}
int32_t main(void)
{
int32_t number_failed;
Suite *s = rb_suite();
SRunner *sr = srunner_create(s);
qb_util_set_log_function(libqb_log_fn);
srunner_run_all(sr, CK_NORMAL);
number_failed = srunner_ntests_failed(sr);
srunner_free(sr);
return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/tests/loop.c b/tests/loop.c
new file mode 100644
index 0000000..182cf63
--- /dev/null
+++ b/tests/loop.c
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * Author: Angus Salkeld <asalkeld@redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "os_base.h"
+#include <sys/poll.h>
+
+#include <qb/qbloop.h>
+
+static struct qb_loop *l;
+static qb_loop_timer_handle th;
+
+static void job_3_9(void *data) { printf("%s\n", __func__); }
+static void job_1_2(void *data) { printf("%s\n", __func__); }
+static void job_2_4(void *data) { printf("%s\n", __func__); }
+static void job_3_5(void *data) { printf("%s\n", __func__); }
+static void job_3_6(void *data) { printf("%s\n", __func__); }
+static void job_1_1(void *data) { printf("%s\n", __func__); }
+static void job_3_7(void *data) { printf("%s\n", __func__); }
+static void job_2_3(void *data) { printf("%s\n", __func__); }
+static void job_2_8(void *data) { printf("%s\n", __func__); }
+static void job_1_9(void *data) { printf("%s\n", __func__); }
+
+static void more_important_jobs(void *data)
+{
+ printf("%s\n", __func__);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_2);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_9);
+}
+
+static void more_jobs(void *data)
+{
+ printf("%s\n", __func__);
+ qb_loop_timer_add(l, QB_LOOP_HIGH, 3109, NULL, job_1_1, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_7);
+ qb_loop_timer_add(l, QB_LOOP_LOW, 1000, NULL, more_important_jobs, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_7);
+ qb_loop_timer_add(l, QB_LOOP_LOW, 2341, NULL, job_3_7, &th);
+ qb_loop_timer_add(l, QB_LOOP_LOW, 900, NULL, job_3_6, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_5);
+ qb_loop_timer_add(l, QB_LOOP_MED, 4000, NULL, more_jobs, &th);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_9);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_9);
+ qb_loop_job_add(l, QB_LOOP_MED, NULL, job_2_3);
+}
+
+static int32_t read_stdin(int32_t fd, int32_t revents, void *data)
+{
+ char buf[100];
+ ssize_t len = read(fd, buf, 100);
+ buf[len-1] = '\0';
+ printf("typed > \"%s\"\n", buf);
+ if (strcmp(buf, "more") == 0) {
+ more_jobs(NULL);
+ }
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_9);
+ return 0;
+}
+
+int main(int argc, char * argv[])
+{
+
+ l = qb_loop_create();
+
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_9);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_2_4);
+ qb_loop_job_add(l, QB_LOOP_HIGH, NULL, job_1_2);
+ qb_loop_job_add(l, QB_LOOP_MED, NULL, job_3_7);
+// qb_loop_timer_add(l, QB_LOOP_HIGH, 40, NULL, more_jobs, &th);
+ qb_loop_job_add(l, QB_LOOP_MED, NULL, job_2_8);
+ qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_3_6);
+
+ qb_loop_poll_add(l, QB_LOOP_LOW, 0, POLLIN | POLLPRI | POLLNVAL,
+ NULL, read_stdin);
+
+ qb_loop_run(l);
+ return 0;
+}
+
+
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Feb 27, 3:29 AM (1 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1466108
Default Alt Text
(131 KB)
Attached To
Mode
rQ LibQB
Attached
Detach File
Event Timeline
Log In to Comment