diff --git a/libknet/handle.c b/libknet/handle.c index f56565c7..c21ec49d 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,359 +1,403 @@ #include "config.h" #include #include #include #include #include #include #include "libknet-private.h" #define KNET_MAX_EVENTS 8 #define KNET_PING_TIMERES 200000 #define KNET_DATABUFSIZE 131072 /* 128k */ #define KNET_PINGBUFSIZE sizeof(struct knet_frame) -static void *_handle_control_thread(void *data); +static void *_handle_tap_to_links_thread(void *data); +static void *_handle_recv_from_links_thread(void *data); static void *_handle_heartbt_thread(void *data); knet_handle_t knet_handle_new(int fd, uint16_t node_id) { knet_handle_t knet_h; struct epoll_event ev; if ((knet_h = malloc(sizeof(struct knet_handle))) == NULL) return NULL; memset(knet_h, 0, sizeof(struct knet_handle)); - if ((knet_h->databuf = malloc(KNET_DATABUFSIZE))== NULL) + if ((knet_h->tap_to_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail1; - memset(knet_h->databuf, 0, KNET_DATABUFSIZE); + memset(knet_h->tap_to_links_buf, 0, KNET_DATABUFSIZE); - if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL) + if ((knet_h->recv_from_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail2; + memset(knet_h->recv_from_links_buf, 0, KNET_DATABUFSIZE); + + if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL) + goto exit_fail3; + memset(knet_h->pingbuf, 0, KNET_PINGBUFSIZE); if (pthread_rwlock_init(&knet_h->list_rwlock, NULL) != 0) - goto exit_fail3; + goto exit_fail4; knet_h->sockfd = fd; - knet_h->epollfd = epoll_create(KNET_MAX_EVENTS); + knet_h->tap_to_links_epollfd = epoll_create(KNET_MAX_EVENTS); + knet_h->recv_from_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->node_id = node_id; - if (knet_h->epollfd < 0) - goto exit_fail4; + if ((knet_h->tap_to_links_epollfd < 0) || + (knet_h->recv_from_links_epollfd < 0)) + goto exit_fail5; - if (_fdset_cloexec(knet_h->epollfd) != 0) + if ((_fdset_cloexec(knet_h->tap_to_links_epollfd) != 0) || + (_fdset_cloexec(knet_h->recv_from_links_epollfd != 0))) goto exit_fail5; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd; - if (epoll_ctl(knet_h->epollfd, + if (epoll_ctl(knet_h->tap_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd, &ev) != 0) goto exit_fail5; - if (pthread_create(&knet_h->control_thread, 0, - _handle_control_thread, (void *) knet_h) != 0) + if (pthread_create(&knet_h->tap_to_links_thread, 0, + _handle_tap_to_links_thread, (void *) knet_h) != 0) goto exit_fail5; + if (pthread_create(&knet_h->recv_from_links_thread, 0, + _handle_recv_from_links_thread, (void *) knet_h) != 0) + goto exit_fail6; + if (pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h) != 0) - goto exit_fail6; + goto exit_fail7; return knet_h; +exit_fail7: + pthread_cancel(knet_h->recv_from_links_thread); + exit_fail6: - pthread_cancel(knet_h->control_thread); + pthread_cancel(knet_h->tap_to_links_thread); exit_fail5: - close(knet_h->epollfd); + if (knet_h->tap_to_links_epollfd >= 0) + close(knet_h->tap_to_links_epollfd); + if (knet_h->recv_from_links_epollfd >= 0) + close(knet_h->recv_from_links_epollfd); -exit_fail4: pthread_rwlock_destroy(&knet_h->list_rwlock); +exit_fail4: + free(knet_h->pingbuf); + exit_fail3: - free(knet_h->databuf); + free(knet_h->recv_from_links_buf); exit_fail2: - free(knet_h->pingbuf); + free(knet_h->tap_to_links_buf); exit_fail1: free(knet_h); return NULL; } int knet_handle_free(knet_handle_t knet_h) { void *retval; if ((knet_h->host_head != NULL) || (knet_h->listener_head != NULL)) goto exit_busy; pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; - pthread_cancel(knet_h->control_thread); - pthread_join(knet_h->control_thread, &retval); + pthread_cancel(knet_h->tap_to_links_thread); + pthread_join(knet_h->tap_to_links_thread, &retval); + if (retval != PTHREAD_CANCELED) + goto exit_busy; + + pthread_cancel(knet_h->recv_from_links_thread); + pthread_join(knet_h->recv_from_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; - close(knet_h->epollfd); + close(knet_h->tap_to_links_epollfd); + close(knet_h->recv_from_links_epollfd); pthread_rwlock_destroy(&knet_h->list_rwlock); - free(knet_h->databuf); + free(knet_h->tap_to_links_buf); + free(knet_h->recv_from_links_buf); free(knet_h->pingbuf); free(knet_h); return 0; exit_busy: errno = EBUSY; return -EBUSY; } void knet_handle_setfwd(knet_handle_t knet_h, int enabled) { knet_h->enabled = (enabled == 1) ? 1 : 0; } void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision) { lnk->ping_interval = interval * 1000; /* microseconds */ lnk->pong_timeout = timeout * 1000; /* microseconds */ lnk->latency_fix = precision; lnk->latency_exp = precision - \ ((lnk->ping_interval * precision) / 8000000); } -static void _handle_data_send(knet_handle_t knet_h) +static void _handle_tap_to_links(knet_handle_t knet_h) { int j; ssize_t len, snt; struct knet_host *i; - len = read(knet_h->sockfd, knet_h->databuf->kf_data, + len = read(knet_h->sockfd, knet_h->tap_to_links_buf->kf_data, KNET_DATABUFSIZE - KNET_FRAME_SIZE); if (len == 0) { /* TODO: disconnection, should never happen! */ return; } len += KNET_FRAME_SIZE; if (knet_h->enabled != 1) /* data forward is disabled */ return; /* TODO: packet inspection */ - knet_h->databuf->kf_type = KNET_FRAME_DATA; + knet_h->tap_to_links_buf->kf_type = KNET_FRAME_DATA; if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; for (i = knet_h->host_head; i != NULL; i = i->next) { for (j = 0; j < KNET_MAX_LINK; j++) { if (i->link[j].enabled != 1) /* link is disabled */ continue; snt = sendto(i->link[j].sock, - knet_h->databuf, len, MSG_DONTWAIT, + knet_h->tap_to_links_buf, len, MSG_DONTWAIT, (struct sockaddr *) &i->link[j].address, sizeof(struct sockaddr_storage)); if ((i->active == 0) && (snt == len)) break; } } pthread_rwlock_unlock(&knet_h->list_rwlock); } -static void _handle_recv_frame(knet_handle_t knet_h, int sockfd) +static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd) { ssize_t len; struct sockaddr_storage address; socklen_t addrlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; addrlen = sizeof(struct sockaddr_storage); - len = recvfrom(sockfd, knet_h->databuf, KNET_DATABUFSIZE, + len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE, MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen); if (len < (KNET_FRAME_SIZE + 1)) goto exit_unlock; - if (ntohl(knet_h->databuf->kf_magic) != KNET_FRAME_MAGIC) + if (ntohl(knet_h->recv_from_links_buf->kf_magic) != KNET_FRAME_MAGIC) goto exit_unlock; - if (knet_h->databuf->kf_version != KNET_FRAME_VERSION) + if (knet_h->recv_from_links_buf->kf_version != KNET_FRAME_VERSION) goto exit_unlock; src_host = NULL; src_link = NULL; - if ((knet_h->databuf->kf_type & KNET_FRAME_PMSK) != 0) { - knet_h->databuf->kf_node = ntohs(knet_h->databuf->kf_node); - src_host = knet_h->host_index[knet_h->databuf->kf_node]; + if ((knet_h->recv_from_links_buf->kf_type & KNET_FRAME_PMSK) != 0) { + knet_h->recv_from_links_buf->kf_node = ntohs(knet_h->recv_from_links_buf->kf_node); + src_host = knet_h->host_index[knet_h->recv_from_links_buf->kf_node]; if (src_host == NULL) /* host not found */ goto exit_unlock; src_link = src_host->link + - (knet_h->databuf->kf_link % KNET_MAX_LINK); + (knet_h->recv_from_links_buf->kf_link % KNET_MAX_LINK); } - switch (knet_h->databuf->kf_type) { + switch (knet_h->recv_from_links_buf->kf_type) { case KNET_FRAME_DATA: if (knet_h->enabled != 1) /* data forward is disabled */ break; write(knet_h->sockfd, - knet_h->databuf->kf_data, len - KNET_FRAME_SIZE); + knet_h->recv_from_links_buf->kf_data, len - KNET_FRAME_SIZE); break; case KNET_FRAME_PING: - knet_h->databuf->kf_type = KNET_FRAME_PONG; - knet_h->databuf->kf_node = htons(knet_h->node_id); + knet_h->recv_from_links_buf->kf_type = KNET_FRAME_PONG; + knet_h->recv_from_links_buf->kf_node = htons(knet_h->node_id); - sendto(src_link->sock, knet_h->databuf, len, MSG_DONTWAIT, + sendto(src_link->sock, knet_h->recv_from_links_buf, len, MSG_DONTWAIT, (struct sockaddr *) &src_link->address, sizeof(struct sockaddr_storage)); break; case KNET_FRAME_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->pong_last); - timespec_diff(knet_h->databuf->kf_time, + timespec_diff(knet_h->recv_from_links_buf->kf_time, src_link->pong_last, &latency_last); src_link->latency = ((src_link->latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->latency < src_link->pong_timeout) src_link->enabled = 1; break; default: goto exit_unlock; } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); } static void _handle_check_each(knet_handle_t knet_h, struct knet_link *dst_link) { int len; struct timespec clock_now, pong_last; unsigned long long diff_ping; /* caching last pong to avoid race conditions */ pong_last = dst_link->pong_last; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) return; timespec_diff(dst_link->ping_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->ping_interval * 1000llu)) { knet_h->pingbuf->kf_time = clock_now; knet_h->pingbuf->kf_link = dst_link->link_id; len = sendto(dst_link->sock, knet_h->pingbuf, KNET_PINGBUFSIZE, MSG_DONTWAIT, (struct sockaddr *) &dst_link->address, sizeof(struct sockaddr_storage)); if (len == KNET_PINGBUFSIZE) dst_link->ping_last = clock_now; } if (dst_link->enabled == 1) { timespec_diff(pong_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->pong_timeout * 1000llu)) dst_link->enabled = 0; /* TODO: might need write lock */ } } static void *_handle_heartbt_thread(void *data) { int j; knet_handle_t knet_h; struct knet_host *i; knet_h = (knet_handle_t) data; /* preparing ping buffer */ knet_h->pingbuf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->pingbuf->kf_version = KNET_FRAME_VERSION; knet_h->pingbuf->kf_type = KNET_FRAME_PING; knet_h->pingbuf->kf_node = htons(knet_h->node_id); while (1) { usleep(KNET_PING_TIMERES); if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) continue; for (i = knet_h->host_head; i != NULL; i = i->next) { for (j = 0; j < KNET_MAX_LINK; j++) { if (i->link[j].ready != 1) continue; _handle_check_each(knet_h, &i->link[j]); } } pthread_rwlock_unlock(&knet_h->list_rwlock); } return NULL; } -static void *_handle_control_thread(void *data) +static void *_handle_tap_to_links_thread(void *data) +{ + knet_handle_t knet_h; + struct epoll_event events[KNET_MAX_EVENTS]; + + knet_h = (knet_handle_t) data; + + /* preparing data buffer */ + knet_h->tap_to_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC); + knet_h->tap_to_links_buf->kf_version = KNET_FRAME_VERSION; + + while (1) { + if (epoll_wait(knet_h->tap_to_links_epollfd, events, KNET_MAX_EVENTS, -1) >= 1) + _handle_tap_to_links(knet_h); + } + + return NULL; + +} + +static void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h; struct epoll_event events[KNET_MAX_EVENTS]; knet_h = (knet_handle_t) data; /* preparing data buffer */ - knet_h->databuf->kf_magic = htonl(KNET_FRAME_MAGIC); - knet_h->databuf->kf_version = KNET_FRAME_VERSION; + knet_h->recv_from_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC); + knet_h->recv_from_links_buf->kf_version = KNET_FRAME_VERSION; while (1) { - nev = epoll_wait(knet_h->epollfd, events, KNET_MAX_EVENTS, -1); + nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { - if (events[i].data.fd == knet_h->sockfd) { - _handle_data_send(knet_h); - } else { - _handle_recv_frame(knet_h, events[i].data.fd); - } + _handle_recv_from_links(knet_h, events[i].data.fd); } } return NULL; } diff --git a/libknet/libknet-private.h b/libknet/libknet-private.h index 750986aa..a02accbd 100644 --- a/libknet/libknet-private.h +++ b/libknet/libknet-private.h @@ -1,36 +1,39 @@ #ifndef __KNETHANDLE_H__ #define __KNETHANDLE_H__ /* NOTE: you shouldn't need to include this header normally, it is provided for * testing purpose only. */ #include "libknet.h" #define timespec_diff(start, end, diff) \ do { \ if (end.tv_sec > start.tv_sec) \ *(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \ + end.tv_nsec - start.tv_nsec; \ else \ *(diff) = end.tv_nsec - start.tv_nsec; \ } while (0); struct knet_handle { int sockfd; - int epollfd; + int tap_to_links_epollfd; + int recv_from_links_epollfd; uint16_t node_id; unsigned int enabled:1; struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; struct knet_listener *listener_head; - struct knet_frame *databuf; + struct knet_frame *tap_to_links_buf; + struct knet_frame *recv_from_links_buf; struct knet_frame *pingbuf; - pthread_t control_thread; + pthread_t tap_to_links_thread; + pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_rwlock_t list_rwlock; }; int _fdset_cloexec(int fd); #endif diff --git a/libknet/listener.c b/libknet/listener.c index fbef9117..6237ea98 100644 --- a/libknet/listener.c +++ b/libknet/listener.c @@ -1,122 +1,122 @@ #include "config.h" #include #include #include #include #include #include "libknet-private.h" int knet_listener_acquire(knet_handle_t knet_h, struct knet_listener **head, int writelock) { int ret; if (writelock != 0) ret = pthread_rwlock_wrlock(&knet_h->list_rwlock); else ret = pthread_rwlock_rdlock(&knet_h->list_rwlock); if (head) *head = (ret == 0) ? knet_h->listener_head : NULL; return ret; } int knet_listener_release(knet_handle_t knet_h) { return pthread_rwlock_unlock(&knet_h->list_rwlock); } int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener) { int value; struct epoll_event ev; listener->sock = socket(listener->address.ss_family, SOCK_DGRAM, 0); if (listener->sock < 0) return listener->sock; value = KNET_RING_RCVBUFF; setsockopt(listener->sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)); if (_fdset_cloexec(listener->sock) != 0) goto exit_fail1; if (bind(listener->sock, (struct sockaddr *) &listener->address, sizeof(struct sockaddr_storage)) != 0) goto exit_fail1; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = listener->sock; - if (epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, listener->sock, &ev) != 0) + if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, listener->sock, &ev) != 0) goto exit_fail1; if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0) goto exit_fail2; /* pushing new host to the front */ listener->next = knet_h->listener_head; knet_h->listener_head = listener; pthread_rwlock_unlock(&knet_h->list_rwlock); return 0; exit_fail2: - epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev); + epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev); exit_fail1: close(listener->sock); return -1; } int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener) { int i, ret; struct epoll_event ev; /* kernel < 2.6.9 bug (see epoll_ctl man) */ struct knet_host *host; struct knet_listener *l; if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0) return -EINVAL; ret = 0; /* checking if listener is in use */ for (host = knet_h->host_head; host != NULL; host = host->next) { for (i = 0; i < KNET_MAX_LINK; i++) { if (host->link[i].ready != 1) continue; if (host->link[i].sock == listener->sock) { ret = -EBUSY; goto exit_fail1; } } } /* TODO: use a doubly-linked list? */ if (listener == knet_h->listener_head) { knet_h->listener_head = knet_h->listener_head->next; } else { for (l = knet_h->listener_head; l != NULL; l = l->next) { if (listener == l->next) { l->next = l->next->next; break; } } } - epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev); + epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev); close(listener->sock); exit_fail1: pthread_rwlock_unlock(&knet_h->list_rwlock); if (ret < 0) errno = -ret; return ret; } diff --git a/tests/listener_test.c b/tests/listener_test.c index b4edf350..c6a16c1b 100644 --- a/tests/listener_test.c +++ b/tests/listener_test.c @@ -1,134 +1,134 @@ #include "config.h" #include #include #include #include #include #include "libknet-private.h" #define KNET_TEST_PORT 50000 static knet_handle_t knet_h; struct knet_listener *listener; static void test_add_listener(void) { struct sockaddr_in *address; listener = malloc(sizeof(struct knet_listener)); if (listener == NULL) { printf("Unable to create listener\n"); exit(EXIT_FAILURE); } memset(listener, 0, sizeof(struct knet_listener)); address = (struct sockaddr_in *) &listener->address; address->sin_family = AF_INET; address->sin_port = htons(KNET_TEST_PORT); address->sin_addr.s_addr = INADDR_ANY; if (knet_listener_add(knet_h, listener) != 0) { printf("Unable to add listener\n"); exit(EXIT_FAILURE); } } static void test_add_host(void) { struct knet_host *host; if (knet_host_add(knet_h, 1) != 0) { printf("Unable to add host to knet_handle\n"); exit(EXIT_FAILURE); } knet_host_get(knet_h, 1, &host); host->link[0].sock = listener->sock; host->link[0].ready = 1; knet_host_release(knet_h, &host); } int main(int argc, char *argv[]) { int err, sock; struct epoll_event ev; sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock < 0) { printf("Unable to create new socket\n"); exit(EXIT_FAILURE); } if ((knet_h = knet_handle_new(sock, 1)) == NULL) { printf("Unable to create new knet_handle_t\n"); exit(EXIT_FAILURE); } printf("Adding listener to handle\n"); test_add_listener(); memset(&ev, 0, sizeof(struct epoll_event)); /* don't try this at home :) */ - err = epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, listener->sock, &ev); + err = epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, listener->sock, &ev); if (err != -1) { printf("Listener file descriptor not found in epollfd\n"); exit(EXIT_FAILURE); } printf("Listener file descriptor was added to epollfd\n"); printf("Adding host to handle\n"); test_add_host(); err = knet_listener_remove(knet_h, listener); if (err != -EBUSY) { printf("Listener socket should be in use\n"); exit(EXIT_FAILURE); } printf("Unable to remove listener with active links\n"); printf("Removing host from handle\n"); err = knet_host_remove(knet_h, 1); if (err != 0) { printf("Unable to remove host from knet_handle\n"); exit(EXIT_FAILURE); } printf("Removing listener\n"); err = knet_listener_remove(knet_h, listener); if (err != 0) { printf("Unable to remove listener from knet_handle\n"); exit(EXIT_FAILURE); } /* don't try this at home :) */ - err = epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev); + err = epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev); if (err != -1) { printf("Listener file was present in epollfd\n"); exit(EXIT_FAILURE); } printf("Listener file descriptor was removed from epollfd\n"); if (knet_handle_free(knet_h) != 0) { printf("Unable to free knet_handle\n"); exit(EXIT_FAILURE); } return 0; }