Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3153439
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
16 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/ring.c b/ring.c
index 6e134a02..a1ab999c 100644
--- a/ring.c
+++ b/ring.c
@@ -1,524 +1,521 @@
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include "knethandle.h"
#include "netutils.h"
#include "utils.h"
#define KNET_MAX_EVENTS 8
#define KNET_PING_TIMERES 200000
#define KNET_DATABUFSIZE 131072 /* 128k */
-#define KNET_PINGBUFSIZE (sizeof(struct knet_frame) + sizeof(struct timespec))
+#define KNET_PINGBUFSIZE sizeof(struct knet_frame)
static void *knet_control_thread(void *data);
static void *knet_heartbt_thread(void *data);
knet_handle_t knet_handle_new(int fd)
{
knet_handle_t knet_h;
struct epoll_event ev;
if ((knet_h = malloc(sizeof(struct knet_handle))) == NULL)
return NULL;
memset(knet_h, 0, sizeof(struct knet_handle));
if ((knet_h->databuf = malloc(KNET_DATABUFSIZE))== NULL)
goto exit_fail1;
memset(knet_h->databuf, 0, KNET_DATABUFSIZE);
if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL)
goto exit_fail2;
memset(knet_h->pingbuf, 0, KNET_PINGBUFSIZE);
if (pthread_rwlock_init(&knet_h->list_rwlock, NULL) != 0)
goto exit_fail3;
knet_h->sockfd = fd;
knet_h->epollfd = epoll_create(KNET_MAX_EVENTS);
if (knet_h->epollfd < 0)
goto exit_fail4;
if (knet_fdset_cloexec(knet_h->epollfd) != 0)
goto exit_fail5;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd;
if (epoll_ctl(knet_h->epollfd,
EPOLL_CTL_ADD, knet_h->sockfd, &ev) != 0)
goto exit_fail5;
if (pthread_create(&knet_h->control_thread, 0,
knet_control_thread, (void *) knet_h) != 0)
goto exit_fail5;
if (pthread_create(&knet_h->heartbt_thread, 0,
knet_heartbt_thread, (void *) knet_h) != 0)
goto exit_fail6;
return knet_h;
exit_fail6:
pthread_cancel(knet_h->control_thread);
exit_fail5:
close(knet_h->epollfd);
exit_fail4:
pthread_rwlock_destroy(&knet_h->list_rwlock);
exit_fail3:
free(knet_h->databuf);
exit_fail2:
free(knet_h->pingbuf);
exit_fail1:
free(knet_h);
return NULL;
}
int knet_handle_free(knet_handle_t knet_h)
{
void *retval;
if ((knet_h->host_head != NULL) || (knet_h->listener_head != NULL))
goto exit_busy;
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
if (retval != PTHREAD_CANCELED)
goto exit_busy;
pthread_cancel(knet_h->control_thread);
pthread_join(knet_h->control_thread, &retval);
if (retval != PTHREAD_CANCELED)
goto exit_busy;
close(knet_h->epollfd);
pthread_rwlock_destroy(&knet_h->list_rwlock);
free(knet_h->databuf);
free(knet_h->pingbuf);
free(knet_h);
return 0;
exit_busy:
errno = EBUSY;
return -EBUSY;
}
void knet_handle_setfwd(knet_handle_t knet_h, int enabled)
{
knet_h->enabled = (enabled == 1) ? 1 : 0;
}
int knet_host_acquire(knet_handle_t knet_h, struct knet_host **head, int writelock)
{
int ret;
if (writelock != 0)
ret = pthread_rwlock_wrlock(&knet_h->list_rwlock);
else
ret = pthread_rwlock_rdlock(&knet_h->list_rwlock);
if (head)
*head = (ret == 0) ? knet_h->host_head : NULL;
return ret;
}
int knet_host_release(knet_handle_t knet_h)
{
return pthread_rwlock_unlock(&knet_h->list_rwlock);
}
int knet_listener_acquire(knet_handle_t knet_h, struct knet_listener **head, int writelock)
{
int ret;
if (writelock != 0)
ret = pthread_rwlock_wrlock(&knet_h->list_rwlock);
else
ret = pthread_rwlock_rdlock(&knet_h->list_rwlock);
if (head)
*head = (ret == 0) ? knet_h->listener_head : NULL;
return ret;
}
int knet_listener_release(knet_handle_t knet_h)
{
return pthread_rwlock_unlock(&knet_h->list_rwlock);
}
int knet_host_add(knet_handle_t knet_h, struct knet_host *host)
{
if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0)
return -1;
/* pushing new host to the front */
host->next = knet_h->host_head;
knet_h->host_head = host;
pthread_rwlock_unlock(&knet_h->list_rwlock);
return 0;
}
int knet_host_remove(knet_handle_t knet_h, struct knet_host *host)
{
struct knet_host *hp;
if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0)
return -1;
/* TODO: use a doubly-linked list? */
if (host == knet_h->host_head) {
knet_h->host_head = host->next;
} else {
for (hp = knet_h->host_head; hp != NULL; hp = hp->next) {
if (host == hp->next) {
hp->next = hp->next->next;
break;
}
}
}
pthread_rwlock_unlock(&knet_h->list_rwlock);
return 0;
}
int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener)
{
int value;
struct epoll_event ev;
listener->sock = socket(listener->address.ss_family, SOCK_DGRAM, 0);
if (listener->sock < 0)
return listener->sock;
value = KNET_RING_RCVBUFF;
setsockopt(listener->sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value));
if (knet_fdset_cloexec(listener->sock) != 0)
goto exit_fail1;
if (bind(listener->sock, (struct sockaddr *) &listener->address,
sizeof(struct sockaddr_storage)) != 0)
goto exit_fail1;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = listener->sock;
if (epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, listener->sock, &ev) != 0)
goto exit_fail1;
if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0)
goto exit_fail2;
/* pushing new host to the front */
listener->next = knet_h->listener_head;
knet_h->listener_head = listener;
pthread_rwlock_unlock(&knet_h->list_rwlock);
return 0;
exit_fail2:
epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
exit_fail1:
close(listener->sock);
return -1;
}
int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener)
{
int err;
struct epoll_event ev; /* kernel < 2.6.9 bug (see epoll_ctl man) */
struct knet_listener *lp;
struct knet_host *i;
struct knet_link *j;
if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0)
return -1;
err = 0;
for (i = knet_h->host_head; i != NULL; i = i->next) {
for (j = i->link; j != NULL; j = j->next) {
if (j->sock == listener->sock) {
err = EBUSY;
goto exit_fail1;
}
}
}
/* TODO: use a doubly-linked list? */
if (listener == knet_h->listener_head) {
knet_h->listener_head = knet_h->listener_head->next;
} else {
for (lp = knet_h->listener_head; lp != NULL; lp = lp->next) {
if (listener == lp->next) {
lp->next = lp->next->next;
break;
}
}
}
epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
close(listener->sock);
exit_fail1:
pthread_rwlock_unlock(&knet_h->list_rwlock);
if (err != 0)
errno = err;
return -err;
}
void knet_link_timeout(struct knet_link *lnk,
time_t interval, time_t timeout, int precision)
{
lnk->ping_interval = interval * 1000; /* microseconds */
lnk->pong_timeout = timeout * 1000; /* microseconds */
lnk->latency_fix = precision;
lnk->latency_exp = precision - \
((lnk->ping_interval * precision) / 8000000);
}
static void knet_send_data(knet_handle_t knet_h)
{
ssize_t len, snt;
struct knet_host *i;
struct knet_link *j;
- len = read(knet_h->sockfd, knet_h->databuf + 1,
- KNET_DATABUFSIZE - sizeof(struct knet_frame));
+ len = read(knet_h->sockfd, knet_h->databuf->kf_data,
+ KNET_DATABUFSIZE - KNET_FRAME_SIZE);
if (len == 0) {
/* TODO: disconnection, should never happen! */
return;
}
- len += sizeof(struct knet_frame);
+ len += sizeof(KNET_FRAME_SIZE);
if (knet_h->enabled != 1) /* data forward is disabled */
return;
/* TODO: packet inspection */
- knet_h->databuf->type = KNET_FRAME_DATA;
+ knet_h->databuf->kf_type = KNET_FRAME_DATA;
if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0)
return;
for (i = knet_h->host_head; i != NULL; i = i->next) {
for (j = i->link; j != NULL; j = j->next) {
if (j->enabled != 1) /* link is disabled */
continue;
snt = sendto(j->sock, knet_h->databuf, len, MSG_DONTWAIT,
(struct sockaddr *) &j->address,
sizeof(struct sockaddr_storage));
if ((i->active == 0) && (snt == len))
break;
}
}
pthread_rwlock_unlock(&knet_h->list_rwlock);
}
static void knet_recv_frame(knet_handle_t knet_h, int sockfd)
{
ssize_t len;
struct sockaddr_storage address;
socklen_t addrlen = sizeof(struct sockaddr_storage);
struct knet_host *i;
struct knet_link *j, *link_src;
unsigned long long latency_last;
- struct timespec pong;
if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0)
return;
len = recvfrom(sockfd, knet_h->databuf, KNET_DATABUFSIZE,
MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);
- if (len < sizeof(struct knet_frame))
+ if (len < (KNET_FRAME_SIZE + 1))
goto exit_unlock;
- if (ntohl(knet_h->databuf->magic) != KNET_FRAME_MAGIC)
+ if (ntohl(knet_h->databuf->kf_magic) != KNET_FRAME_MAGIC)
goto exit_unlock;
- if (knet_h->databuf->version != KNET_FRAME_VERSION)
+ if (knet_h->databuf->kf_version != KNET_FRAME_VERSION)
goto exit_unlock;
/* searching host/link, TODO: improve lookup */
link_src = NULL;
for (i = knet_h->host_head; i != NULL; i = i->next) {
for (j = i->link; j != NULL; j = j->next) {
if (cmpaddr(&address, addrlen,
&j->address, addrlen) == 0) {
link_src = j;
break;
}
}
if (link_src)
break;
}
if (link_src == NULL) /* host/link not found */
goto exit_unlock;
- switch (knet_h->databuf->type) {
+ switch (knet_h->databuf->kf_type) {
case KNET_FRAME_DATA:
if (knet_h->enabled != 1) /* data forward is disabled */
break;
write(knet_h->sockfd,
- knet_h->databuf + 1, len - sizeof(struct knet_frame));
+ knet_h->databuf->kf_data, len - sizeof(struct knet_frame));
break;
case KNET_FRAME_PING:
- knet_h->databuf->type = KNET_FRAME_PONG;
+ knet_h->databuf->kf_type = KNET_FRAME_PONG;
sendto(j->sock, knet_h->databuf, len,
MSG_DONTWAIT, (struct sockaddr *) &j->address,
sizeof(struct sockaddr_storage));
break;
case KNET_FRAME_PONG:
clock_gettime(CLOCK_MONOTONIC, &j->pong_last);
- memcpy(&pong, knet_h->databuf + 1, sizeof(struct timespec));
-
- timespec_diff(pong, j->pong_last, &latency_last);
+ timespec_diff(knet_h->databuf->kf_time, j->pong_last, &latency_last);
latency_last /= 1000;
if (latency_last < j->pong_timeout)
j->enabled = 1; /* TODO: might need write lock */
j->latency *= j->latency_exp;
j->latency += latency_last * (j->latency_fix - j->latency_exp);
j->latency /= j->latency_fix;
break;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->list_rwlock);
}
static void knet_heartbeat_check_each(knet_handle_t knet_h, struct knet_link *j)
{
struct timespec clock_now;
- unsigned long long diff_ping, diff_pong;
+ unsigned long long diff_ping;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0)
return;
timespec_diff(j->ping_last, clock_now, &diff_ping);
- diff_ping /= 1000;
- if (diff_ping >= j->ping_interval) {
- clock_gettime(CLOCK_MONOTONIC, &j->ping_last);
+ if ((diff_ping / 1000) >= j->ping_interval) {
+ int len;
- memmove(knet_h->pingbuf + 1,
- &j->ping_last, sizeof(struct timespec));
+ memmove(&knet_h->pingbuf->kf_time, &clock_now, sizeof(struct timespec));
- sendto(j->sock, knet_h->pingbuf, KNET_PINGBUFSIZE,
+ len = sendto(j->sock, knet_h->pingbuf, KNET_PINGBUFSIZE,
MSG_DONTWAIT, (struct sockaddr *) &j->address,
sizeof(struct sockaddr_storage));
+
+ if (len == KNET_PINGBUFSIZE)
+ memmove(&j->ping_last, &clock_now, sizeof(struct timespec));
}
if (j->enabled == 1) {
- timespec_diff(j->pong_last, clock_now, &diff_pong);
- diff_pong /= 1000;
+ timespec_diff(j->pong_last, clock_now, &diff_ping);
- if (diff_pong >= j->pong_timeout)
+ if ((diff_ping / 1000) >= j->pong_timeout)
j->enabled = 0; /* TODO: might need write lock */
}
}
static void *knet_heartbt_thread(void *data)
{
knet_handle_t knet_h;
struct knet_host *i;
struct knet_link *j;
knet_h = (knet_handle_t) data;
/* preparing ping buffer */
- knet_h->pingbuf->magic = htonl(KNET_FRAME_MAGIC);
- knet_h->pingbuf->version = KNET_FRAME_VERSION;
- knet_h->pingbuf->type = KNET_FRAME_PING;
+ knet_h->pingbuf->kf_magic = htonl(KNET_FRAME_MAGIC);
+ knet_h->pingbuf->kf_version = KNET_FRAME_VERSION;
+ knet_h->pingbuf->kf_type = KNET_FRAME_PING;
while (1) {
usleep(KNET_PING_TIMERES);
if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0)
continue;
for (i = knet_h->host_head; i != NULL; i = i->next) {
for (j = i->link; j != NULL; j = j->next)
knet_heartbeat_check_each(knet_h, j);
}
pthread_rwlock_unlock(&knet_h->list_rwlock);
}
return NULL;
}
static void *knet_control_thread(void *data)
{
int i, nev;
knet_handle_t knet_h;
struct epoll_event events[KNET_MAX_EVENTS];
knet_h = (knet_handle_t) data;
/* preparing data buffer */
- knet_h->databuf->magic = htonl(KNET_FRAME_MAGIC);
- knet_h->databuf->version = KNET_FRAME_VERSION;
+ knet_h->databuf->kf_magic = htonl(KNET_FRAME_MAGIC);
+ knet_h->databuf->kf_version = KNET_FRAME_VERSION;
while (1) {
nev = epoll_wait(knet_h->epollfd, events, KNET_MAX_EVENTS, -1);
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->sockfd) {
knet_send_data(knet_h);
} else {
knet_recv_frame(knet_h, events[i].data.fd);
}
}
}
return NULL;
}
diff --git a/ring.h b/ring.h
index 4c8cd9d7..01cfba02 100644
--- a/ring.h
+++ b/ring.h
@@ -1,78 +1,96 @@
#ifndef __RING_H__
#define __RING_H__
#include <stdint.h>
#include <netinet/in.h>
typedef struct knet_handle *knet_handle_t;
#define KNET_RING_DEFPORT 50000
#define KNET_RING_RCVBUFF 8388608
#define KNET_MAX_HOST_LEN 64
struct knet_host {
uint8_t node_id;
char name[KNET_MAX_HOST_LEN];
unsigned int active:1; /* data packets are sent to all links */
struct knet_listener *listener;
struct knet_link *link;
struct knet_host *next;
};
struct knet_link {
int sock;
char ipaddr[KNET_MAX_HOST_LEN];
char port[6];
struct sockaddr_storage address;
unsigned int enabled:1; /* link is enabled for data */
unsigned long long latency; /* average latency computed by fix/exp */
unsigned int latency_exp;
unsigned int latency_fix;
unsigned long long ping_interval;
unsigned long long pong_timeout;
struct timespec ping_last;
struct timespec pong_last;
struct knet_link *next;
};
struct knet_listener {
int sock;
char ipaddr[KNET_MAX_HOST_LEN];
char port[6];
struct sockaddr_storage address;
struct knet_listener *next;
};
+union knet_frame_data {
+ uint8_t kfd_data[0];
+ struct {
+ uint16_t kfd_node;
+ uint8_t kfd_link;
+ struct timespec kfd_time;
+ };
+} __attribute__((packed));
+
+struct knet_frame {
+ uint32_t kf_magic;
+ uint8_t kf_version;
+ uint8_t kf_type;
+ uint16_t __pad;
+ union knet_frame_data kf_payload;
+} __attribute__((packed));
+
+#define kf_data kf_payload.kfd_data
+#define kf_node kf_payload.kfd_node
+#define kf_link kf_payload.kfd_link
+#define kf_time kf_payload.kfd_time
+
+#define KNET_FRAME_SIZE (sizeof(struct knet_frame) \
+ - sizeof(union knet_frame_data))
+
#define KNET_FRAME_MAGIC 0x12344321
#define KNET_FRAME_VERSION 0x01
#define KNET_FRAME_DATA 0x00
#define KNET_FRAME_PING 0x01
#define KNET_FRAME_PONG 0x02
-struct knet_frame {
- uint32_t magic;
- uint8_t version;
- uint8_t type;
- uint16_t __pad;
-} __attribute__((packed));
-
knet_handle_t knet_handle_new(int fd);
int knet_handle_free(knet_handle_t knet_h);
void knet_handle_setfwd(knet_handle_t knet_h, int enabled);
int knet_host_acquire(knet_handle_t knet_h, struct knet_host **head, int writelock);
int knet_host_release(knet_handle_t knet_h);
int knet_host_add(knet_handle_t khandle, struct knet_host *host);
int knet_host_remove(knet_handle_t khandle, struct knet_host *host);
void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision);
int knet_listener_acquire(knet_handle_t knet_h, struct knet_listener **head, int writelock);
int knet_listener_release(knet_handle_t knet_h);
int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener);
int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener);
#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Feb 25, 7:03 PM (1 d, 6 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464941
Default Alt Text
(16 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment