Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4511961
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
7 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/ring.c b/ring.c
index e46c1259..4caf31ee 100644
--- a/ring.c
+++ b/ring.c
@@ -1,203 +1,287 @@
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include "ring.h"
#include "utils.h"
+#define KNET_MAX_EVENTS 8
+
struct __knet_handle {
+ int sock[2];
+ int epollfd;
struct knet_host *host_head;
+ pthread_t control_thread;
pthread_rwlock_t host_rwlock;
};
+static void *knet_control_thread(void *data)
+{
+ int i, nev;
+ knet_handle_t knet_h;
+ struct epoll_event events[KNET_MAX_EVENTS];
+
+ knet_h = (knet_handle_t) data;
+
+ while(1) {
+ nev = epoll_wait(knet_h->epollfd, events, KNET_MAX_EVENTS, 500);
+
+ for (i = 0; i < nev; i++) {
+ if (events[i].data.fd == knet_h->sock[0]) {
+ /* TODO: read data, inspect and deliver */
+ }
+ else {
+ /* TODO: read frame, porcess or dispatch */
+ }
+ }
+ }
+
+ return NULL;
+}
+
knet_handle_t knet_handle_new(void)
{
knet_handle_t knet_h;
+ struct epoll_event ev;
knet_h = malloc(sizeof(struct __knet_handle));
if (knet_h == NULL)
return NULL;
memset(knet_h, 0, sizeof(struct __knet_handle));
- if (pthread_rwlock_init(&knet_h->host_rwlock, NULL) != 0) {
- free(knet_h);
- return NULL;
- }
+ if (pthread_rwlock_init(&knet_h->host_rwlock, NULL) != 0)
+ goto exit_fail1;
+
+ if (socketpair(AF_UNIX, SOCK_STREAM, IPPROTO_IP, knet_h->sock) != 0)
+ goto exit_fail2;
+
+ knet_h->epollfd = epoll_create1(FD_CLOEXEC);
+
+ if (knet_h->epollfd < 0)
+ goto exit_fail3;
+
+ ev.events = EPOLLIN;
+ ev.data.fd = knet_h->sock[0];
+
+ if (epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, knet_h->sock[0], &ev) != 0)
+ goto exit_fail4;
+
+ if (pthread_create(&knet_h->control_thread,
+ 0, knet_control_thread, (void *) knet_h) != 0)
+ goto exit_fail4;
return knet_h;
+
+exit_fail4:
+ close(knet_h->epollfd);
+
+exit_fail3:
+ close(knet_h->sock[0]);
+ close(knet_h->sock[1]);
+
+exit_fail2:
+ pthread_rwlock_destroy(&knet_h->host_rwlock);
+
+exit_fail1:
+ free(knet_h);
+ return NULL;
+}
+
+int knet_handle_getfd(knet_handle_t knet_h)
+{
+ return knet_h->sock[1];
}
int knet_host_add(knet_handle_t knet_h, struct knet_host *host)
{
+ struct knet_link *lp;
+ struct epoll_event ev;
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+ ev.events = EPOLLIN;
+
if (pthread_rwlock_wrlock(&knet_h->host_rwlock) != 0)
return -1;
+ for (lp = host->link; lp != NULL; lp = lp->next) {
+ ev.data.fd = lp->sock;
+ ev.data.ptr = lp;
+ /* TODO: check for errors? */
+ epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, lp->sock, &ev);
+ }
+
/* pushing new host to the front */
host->next = knet_h->host_head;
knet_h->host_head = host;
pthread_rwlock_unlock(&knet_h->host_rwlock);
return 0;
}
int knet_host_remove(knet_handle_t knet_h, struct knet_host *host)
{
- struct knet_host *khp;
+ struct knet_host *hp;
+ struct knet_link *lp;
if (pthread_rwlock_wrlock(&knet_h->host_rwlock) != 0)
return -1;
/* TODO: use a doubly-linked list? */
if (host == knet_h->host_head) {
knet_h->host_head = host->next;
} else {
- for (khp = knet_h->host_head; khp != NULL; khp = khp->next) {
- if (host == khp->next) {
- khp->next = khp->next->next;
+ for (hp = knet_h->host_head; hp != NULL; hp = hp->next) {
+ if (host == hp->next) {
+ hp->next = hp->next->next;
break;
}
}
}
+ /* NOTE: kernel versions before 2.6.9 required a non-NULL pointer
+ * TODO: check for EPOLL_CTL_DEL errors? */
+ for (lp = host->link; lp != NULL; lp = lp->next)
+ epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, lp->sock, 0);
+
pthread_rwlock_unlock(&knet_h->host_rwlock);
return 0;
}
int knet_host_foreach(knet_handle_t knet_h, int (*action)(struct knet_host *, void *), void *data)
{
struct knet_host *i;
if (pthread_rwlock_rdlock(&knet_h->host_rwlock) != 0)
return -1;
for (i = knet_h->host_head; i != NULL; i = i->next) {
- if (action(i, data) != 0)
+ if (action && action(i, data) != 0)
break;
}
pthread_rwlock_unlock(&knet_h->host_rwlock);
return 0;
}
int knet_bind(struct sockaddr *address, socklen_t addrlen)
{
int sockfd, err, value;
sockfd = socket(address->sa_family, SOCK_DGRAM, 0);
if (sockfd < 0) {
log_error("Unable to open netsocket error");
return sockfd;
}
value = KNET_RING_RCVBUFF;
err = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value));
if (err != 0)
log_error("Unable to set receive buffer");
value = fcntl(sockfd, F_GETFD, 0);
if (value < 0) {
log_error("Unable to get close-on-exec flag");
goto exit_fail;
}
value |= FD_CLOEXEC;
err = fcntl(sockfd, F_SETFD, value);
if (err < 0) {
log_error("Unable to set close-on-exec flag");
goto exit_fail;
}
err = bind(sockfd, address, addrlen);
if (err < 0) {
log_error("Unable to bind to ring socket");
goto exit_fail;
}
return sockfd;
exit_fail:
close(sockfd);
return -1;
}
ssize_t knet_dispatch(int sockfd, struct knet_frame *frame, size_t len)
{
ssize_t ret;
struct sockaddr_storage address;
socklen_t addrlen;
addrlen = sizeof(struct sockaddr_storage);
ret = recvfrom(sockfd, frame,
len, MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);
if (ret <= 0)
return ret;
if (ret < sizeof(struct knet_frame)) {
errno = EBADMSG;
return -1;
}
if (ntohl(frame->magic) != KNET_FRAME_MAGIC) {
errno = EBADMSG;
return -1;
}
if (frame->version != KNET_FRAME_VERSION) { /* TODO: versioning */
errno = EBADMSG;
return -1;
}
switch (frame->type) {
case KNET_FRAME_DATA:
return ret;
case KNET_FRAME_PING:
frame->type = KNET_FRAME_PONG;
sendto(sockfd, frame, ret, MSG_DONTWAIT, (struct sockaddr *) &address, addrlen);
return 0;
case KNET_FRAME_PONG:
/* TODO: find the link and mark enabled */
return ret;
}
errno = EBADMSG;
return -1;
}
void knet_send(struct knet_host *host, struct knet_frame *frame, size_t len)
{
ssize_t err;
struct knet_host *khp;
struct knet_link *klp;
for (khp = host; khp != NULL; khp = khp->next) {
if (frame->type == KNET_FRAME_DATA) {
/* TODO: packet inspection, might continue */
}
for (klp = khp->link; klp != NULL; klp = klp->next) {
if ((frame->type == KNET_FRAME_DATA) && (!klp->enabled))
continue;
err = sendto(klp->sock, frame, len, MSG_DONTWAIT,
(struct sockaddr *) &klp->address, sizeof(struct sockaddr_storage));
if ((frame->type == KNET_FRAME_DATA) && (!khp->active) && (err == len))
break;
}
}
}
diff --git a/ring.h b/ring.h
index 3ff86376..96e54c48 100644
--- a/ring.h
+++ b/ring.h
@@ -1,48 +1,50 @@
#ifndef __RING_H__
#define __RING_H__
#include <stdint.h>
#include <netinet/in.h>
typedef struct __knet_handle *knet_handle_t;
#define KNET_RING_DEFPORT 50000
#define KNET_RING_RCVBUFF 8192
struct knet_host {
unsigned int active:1; /* data packets are sent to all links */
struct knet_link *link;
struct knet_host *next;
};
struct knet_link {
int sock;
unsigned int enabled:1; /* link is enabled for data */
struct sockaddr_storage address;
struct knet_link *next;
};
#define KNET_FRAME_MAGIC 0x12344321
#define KNET_FRAME_VERSION 0x01
#define KNET_FRAME_DATA 0x00
#define KNET_FRAME_PING 0x01
#define KNET_FRAME_PONG 0x02
struct knet_frame {
uint32_t magic;
uint8_t version;
uint8_t type;
uint16_t __pad;
} __attribute__((packed));
knet_handle_t knet_handle_new(void);
+int knet_handle_getfd(knet_handle_t knet_h);
+
int knet_host_add(knet_handle_t khandle, struct knet_host *host);
int knet_host_remove(knet_handle_t khandle, struct knet_host *host);
int knet_host_foreach(knet_handle_t khandle, int (*action)(struct knet_host *, void *), void *data);
int knet_bind(struct sockaddr *address, socklen_t addrlen);
ssize_t knet_dispatch(int sockfd, struct knet_frame *frame, size_t len);
void knet_send(struct knet_host *host, struct knet_frame *frame, size_t len);
#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Jun 25, 3:23 AM (1 d, 57 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952001
Default Alt Text
(7 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment