diff --git a/libknet/handle.c b/libknet/handle.c index c13990b2..98725eb8 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,488 +1,489 @@ #include "config.h" #include #include #include #include #include #include #include "libknet-private.h" #include "nsscrypto.h" #define KNET_MAX_EVENTS 8 #define KNET_PING_TIMERES 200000 static void *_handle_tap_to_links_thread(void *data); static void *_handle_recv_from_links_thread(void *data); static void *_handle_heartbt_thread(void *data); knet_handle_t knet_handle_new(const struct knet_handle_cfg *knet_handle_cfg) { knet_handle_t knet_h; struct epoll_event ev; /* * validate incoming config request */ if (knet_handle_cfg == NULL) { errno = EINVAL; return NULL; } if (knet_handle_cfg->fd <= 0) { errno = EINVAL; return NULL; } if ((knet_h = malloc(sizeof(struct knet_handle))) == NULL) return NULL; memset(knet_h, 0, sizeof(struct knet_handle)); knet_h->dst_host_filter = knet_handle_cfg->dst_host_filter; knet_h->dst_host_filter_fn = knet_handle_cfg->dst_host_filter_fn; if ((knet_h->dst_host_filter) && (!knet_h->dst_host_filter_fn)) goto exit_fail1; if (crypto_init(knet_h, knet_handle_cfg) < 0) goto exit_fail1; if ((knet_h->tap_to_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail2; memset(knet_h->tap_to_links_buf, 0, KNET_DATABUFSIZE); if ((knet_h->recv_from_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail3; memset(knet_h->recv_from_links_buf, 0, KNET_DATABUFSIZE); if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL) goto exit_fail4; memset(knet_h->pingbuf, 0, KNET_PINGBUFSIZE); if (pthread_rwlock_init(&knet_h->list_rwlock, NULL) != 0) goto exit_fail5; knet_h->sockfd = knet_handle_cfg->fd; knet_h->tap_to_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->recv_from_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->node_id = knet_handle_cfg->node_id; if ((knet_h->tap_to_links_epollfd < 0) || (knet_h->recv_from_links_epollfd < 0)) goto exit_fail6; if ((_fdset_cloexec(knet_h->tap_to_links_epollfd) != 0) || (_fdset_cloexec(knet_h->recv_from_links_epollfd != 0))) goto exit_fail6; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd; if (epoll_ctl(knet_h->tap_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd, &ev) != 0) goto exit_fail6; if (pthread_create(&knet_h->tap_to_links_thread, 0, _handle_tap_to_links_thread, (void *) knet_h) != 0) goto exit_fail6; if (pthread_create(&knet_h->recv_from_links_thread, 0, _handle_recv_from_links_thread, (void *) knet_h) != 0) goto exit_fail7; if (pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h) != 0) goto exit_fail8; return knet_h; exit_fail8: pthread_cancel(knet_h->recv_from_links_thread); exit_fail7: pthread_cancel(knet_h->tap_to_links_thread); exit_fail6: if (knet_h->tap_to_links_epollfd >= 0) close(knet_h->tap_to_links_epollfd); if (knet_h->recv_from_links_epollfd >= 0) close(knet_h->recv_from_links_epollfd); pthread_rwlock_destroy(&knet_h->list_rwlock); exit_fail5: free(knet_h->pingbuf); exit_fail4: free(knet_h->recv_from_links_buf); exit_fail3: free(knet_h->tap_to_links_buf); exit_fail2: crypto_fini(knet_h); exit_fail1: free(knet_h); return NULL; } int knet_handle_free(knet_handle_t knet_h) { void *retval; if ((knet_h->host_head != NULL) || (knet_h->listener_head != NULL)) goto exit_busy; pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->tap_to_links_thread); pthread_join(knet_h->tap_to_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; close(knet_h->tap_to_links_epollfd); close(knet_h->recv_from_links_epollfd); pthread_rwlock_destroy(&knet_h->list_rwlock); free(knet_h->tap_to_links_buf); free(knet_h->recv_from_links_buf); free(knet_h->pingbuf); crypto_fini(knet_h); free(knet_h); return 0; exit_busy: errno = EBUSY; return -EBUSY; } void knet_handle_setfwd(knet_handle_t knet_h, int enabled) { knet_h->enabled = (enabled == 1) ? 1 : 0; } void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision) { lnk->ping_interval = interval * 1000; /* microseconds */ lnk->pong_timeout = timeout * 1000; /* microseconds */ lnk->latency_fix = precision; lnk->latency_exp = precision - \ ((lnk->ping_interval * precision) / 8000000); } static void _handle_tap_to_links(knet_handle_t knet_h) { - int j; ssize_t inlen, len, snt, outlen; - struct knet_host *i; + struct knet_host *dst_host; + int link_idx; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; inlen = read(knet_h->sockfd, knet_h->tap_to_links_buf->kf_data, KNET_DATABUFSIZE - (KNET_FRAME_SIZE + sizeof(seq_num_t))); if (inlen == 0) { /* TODO: disconnection, should never happen! */ return; } len = inlen + KNET_FRAME_SIZE + sizeof(seq_num_t); if (knet_h->enabled != 1) /* data forward is disabled */ return; if (knet_h->dst_host_filter) { bcast = knet_h->dst_host_filter_fn( (const unsigned char *)knet_h->tap_to_links_buf->kf_data, inlen, knet_h->tap_to_links_buf->kf_node, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) return; if ((!bcast) && (!dst_host_ids_entries)) return; } if (!bcast) { // TBD } else { if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; knet_h->tap_to_links_buf->kf_seq_num = ++knet_h->bcast_seq_num; if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->tap_to_links_buf, len, knet_h->tap_to_links_buf_crypt, &outlen) < 0) { pthread_rwlock_unlock(&knet_h->list_rwlock); return; } - for (i = knet_h->host_head; i != NULL; i = i->next) { - for (j = 0; j < KNET_MAX_LINK; j++) { - if (i->link[j].ready != 1) /* link is not configured */ + for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { + for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { + if (dst_host->link[link_idx].ready != 1) /* link is not configured */ continue; - if (i->link[j].enabled != 1) /* link is not enabled */ + if (dst_host->link[link_idx].enabled != 1) /* link is not enabled */ continue; - snt = sendto(i->link[j].sock, + snt = sendto(dst_host->link[link_idx].sock, knet_h->tap_to_links_buf_crypt, outlen, MSG_DONTWAIT, - (struct sockaddr *) &i->link[j].address, + (struct sockaddr *) &dst_host->link[link_idx].address, sizeof(struct sockaddr_storage)); - if ((i->active == 0) && (snt == outlen)) + if ((dst_host->active == 0) && (snt == outlen)) break; } } pthread_rwlock_unlock(&knet_h->list_rwlock); } } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd) { ssize_t len, outlen; struct sockaddr_storage address; socklen_t addrlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; addrlen = sizeof(struct sockaddr_storage); len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE, MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen); if (crypto_authenticate_and_decrypt(knet_h->crypto_instance, (unsigned char *)knet_h->recv_from_links_buf, &len) < 0) goto exit_unlock; if (len < (KNET_FRAME_SIZE + 1)) goto exit_unlock; if (ntohl(knet_h->recv_from_links_buf->kf_magic) != KNET_FRAME_MAGIC) goto exit_unlock; if (knet_h->recv_from_links_buf->kf_version != KNET_FRAME_VERSION) goto exit_unlock; knet_h->recv_from_links_buf->kf_node = ntohs(knet_h->recv_from_links_buf->kf_node); src_host = knet_h->host_index[knet_h->recv_from_links_buf->kf_node]; if (src_host == NULL) { /* host not found */ goto exit_unlock; } src_link = NULL; if ((knet_h->recv_from_links_buf->kf_type & KNET_FRAME_PMSK) != 0) { src_link = src_host->link + (knet_h->recv_from_links_buf->kf_link % KNET_MAX_LINK); } switch (knet_h->recv_from_links_buf->kf_type) { case KNET_FRAME_DATA: if (knet_h->enabled != 1) /* data forward is disabled */ break; if (!knet_should_deliver(src_host, 1, knet_h->recv_from_links_buf->kf_seq_num)) break; write(knet_h->sockfd, knet_h->recv_from_links_buf->kf_data, len - (KNET_FRAME_SIZE + sizeof(seq_num_t))); knet_has_been_delivered(src_host, 1, knet_h->recv_from_links_buf->kf_seq_num); break; case KNET_FRAME_PING: knet_h->recv_from_links_buf->kf_type = KNET_FRAME_PONG; knet_h->recv_from_links_buf->kf_node = htons(knet_h->node_id); if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->recv_from_links_buf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) break; sendto(src_link->sock, knet_h->recv_from_links_buf_crypt, outlen, MSG_DONTWAIT, (struct sockaddr *) &src_link->address, sizeof(struct sockaddr_storage)); break; case KNET_FRAME_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->pong_last); timespec_diff(knet_h->recv_from_links_buf->kf_time, src_link->pong_last, &latency_last); src_link->latency = ((src_link->latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->latency < src_link->pong_timeout) { if (!src_link->enabled) { src_link->enabled = 1; /* TODO: notify packet inspector */ } } break; default: goto exit_unlock; } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); } static void _handle_check_each(knet_handle_t knet_h, struct knet_link *dst_link) { int len; ssize_t outlen; struct timespec clock_now, pong_last; unsigned long long diff_ping; /* caching last pong to avoid race conditions */ pong_last = dst_link->pong_last; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) return; timespec_diff(dst_link->ping_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->ping_interval * 1000llu)) { knet_h->pingbuf->kf_time = clock_now; knet_h->pingbuf->kf_link = dst_link->link_id; if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->pingbuf, KNET_PINGBUFSIZE, knet_h->pingbuf_crypt, &outlen) < 0) return; len = sendto(dst_link->sock, knet_h->pingbuf_crypt, outlen, MSG_DONTWAIT, (struct sockaddr *) &dst_link->address, sizeof(struct sockaddr_storage)); if (len == outlen) dst_link->ping_last = clock_now; } if (dst_link->enabled == 1) { timespec_diff(pong_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->pong_timeout * 1000llu)) { dst_link->enabled = 0; /* TODO: might need write lock */ /* TODO: notify packet inspector */ } } } static void *_handle_heartbt_thread(void *data) { - int j; knet_handle_t knet_h; - struct knet_host *i; + struct knet_host *dst_host; + int link_idx; knet_h = (knet_handle_t) data; /* preparing ping buffer */ knet_h->pingbuf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->pingbuf->kf_version = KNET_FRAME_VERSION; knet_h->pingbuf->kf_type = KNET_FRAME_PING; knet_h->pingbuf->kf_node = htons(knet_h->node_id); while (1) { usleep(KNET_PING_TIMERES); if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) continue; - for (i = knet_h->host_head; i != NULL; i = i->next) { - for (j = 0; j < KNET_MAX_LINK; j++) { - if (i->link[j].ready != 1) continue; - _handle_check_each(knet_h, &i->link[j]); + for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { + for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { + if (dst_host->link[link_idx].ready != 1) + continue; + _handle_check_each(knet_h, &dst_host->link[link_idx]); } } pthread_rwlock_unlock(&knet_h->list_rwlock); } return NULL; } static void *_handle_tap_to_links_thread(void *data) { knet_handle_t knet_h; struct epoll_event events[KNET_MAX_EVENTS]; knet_h = (knet_handle_t) data; /* preparing data buffer */ knet_h->tap_to_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->tap_to_links_buf->kf_version = KNET_FRAME_VERSION; knet_h->tap_to_links_buf->kf_type = KNET_FRAME_DATA; knet_h->tap_to_links_buf->kf_node = htons(knet_h->node_id); while (1) { if (epoll_wait(knet_h->tap_to_links_epollfd, events, KNET_MAX_EVENTS, -1) >= 1) _handle_tap_to_links(knet_h); } return NULL; } static void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_MAX_EVENTS]; while (1) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd); } } return NULL; } diff --git a/libknet/host.c b/libknet/host.c index d20ccc80..ab6416ca 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,197 +1,197 @@ #include "config.h" #include #include #include #include #include "libknet-private.h" int knet_host_get(knet_handle_t knet_h, uint16_t node_id, struct knet_host **host) { int ret; if ((ret = pthread_rwlock_rdlock(&knet_h->list_rwlock)) != 0) return ret; *host = knet_h->host_index[node_id]; if (*host == NULL) { pthread_rwlock_unlock(&knet_h->list_rwlock); errno = ENOENT; return ENOENT; } return 0; } int knet_host_acquire(knet_handle_t knet_h, struct knet_host **host) { int ret; if ((ret = pthread_rwlock_rdlock(&knet_h->list_rwlock)) != 0) return ret; *host = knet_h->host_head; return 0; } int knet_host_release(knet_handle_t knet_h, struct knet_host **host) { int ret; *host = NULL; if ((ret = pthread_rwlock_unlock(&knet_h->list_rwlock)) != 0) return ret; return 0; } int knet_host_foreach(knet_handle_t knet_h, knet_link_fn_t linkfun, struct knet_host_search *data) { int lockstatus; struct knet_host *host; lockstatus = pthread_rwlock_rdlock(&knet_h->list_rwlock); if ((lockstatus != 0) && (lockstatus != EDEADLK)) return lockstatus; for (host = knet_h->host_head; host != NULL; host = host->next) { if ((linkfun(knet_h, host, data)) != KNET_HOST_FOREACH_NEXT) break; } if (lockstatus == 0) pthread_rwlock_unlock(&knet_h->list_rwlock); return 0; } int knet_host_add(knet_handle_t knet_h, uint16_t node_id) { - int i, ret = 0; /* success */ + int link_idx, ret = 0; /* success */ struct knet_host *host; if ((ret = pthread_rwlock_wrlock(&knet_h->list_rwlock)) != 0) goto exit_clean; if (knet_h->host_index[node_id] != NULL) { errno = ret = EEXIST; goto exit_unlock; } if ((host = malloc(sizeof(struct knet_host))) == NULL) goto exit_unlock; memset(host, 0, sizeof(struct knet_host)); host->node_id = node_id; - for (i = 0; i < KNET_MAX_LINK; i++) - host->link[i].link_id = i; + for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) + host->link[link_idx].link_id = link_idx; /* adding new host to the index */ knet_h->host_index[node_id] = host; /* TODO: keep hosts ordered */ /* pushing new host to the front */ host->next = knet_h->host_head; knet_h->host_head = host; exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); exit_clean: return ret; } int knet_host_remove(knet_handle_t knet_h, uint16_t node_id) { int ret = 0; /* success */ - struct knet_host *i, *removed; + struct knet_host *host, *removed; if ((ret = pthread_rwlock_wrlock(&knet_h->list_rwlock)) != 0) goto exit_clean; if (knet_h->host_index[node_id] == NULL) { errno = ret = EINVAL; goto exit_unlock; } removed = NULL; /* removing host from list */ if (knet_h->host_head->node_id == node_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { - for (i = knet_h->host_head; i->next != NULL; i = i->next) { - if (i->next->node_id == node_id) { - removed = i->next; - i->next = removed->next; + for (host = knet_h->host_head; host->next != NULL; host = host->next) { + if (host->next->node_id == node_id) { + removed = host->next; + host->next = removed->next; break; } } } if (removed != NULL) { knet_h->host_index[node_id] = NULL; free(removed); } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); exit_clean: return ret; } /* bcast = 0 -> unicast packet | 1 -> broadcast|mcast */ /* make this bcast/ucast aware */ int knet_should_deliver(struct knet_host *host, int bcast, seq_num_t seq_num) { size_t i, j; /* circular buffer indexes */ seq_num_t seq_dist; seq_dist = (seq_num < host->bcast_seq_num_rx) ? (SEQ_MAX - seq_num) + host->bcast_seq_num_rx : host->bcast_seq_num_rx - seq_num; j = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ return (host->bcast_circular_buffer[j] == 0) ? 1 : 0; } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(host->bcast_circular_buffer, 0, KNET_CBUFFER_SIZE); host->bcast_seq_num_rx = seq_num; } /* cleaning up circular buffer */ i = (host->bcast_seq_num_rx + 1) % KNET_CBUFFER_SIZE; if (i > j) { memset(host->bcast_circular_buffer + i, 0, KNET_CBUFFER_SIZE - i); memset(host->bcast_circular_buffer, 0, j + 1); } else { memset(host->bcast_circular_buffer + i, 0, j - i + 1); } host->bcast_seq_num_rx = seq_num; return 1; } void knet_has_been_delivered(struct knet_host *host, int bcast, seq_num_t seq_num) { if (bcast) { host->bcast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->ucast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } diff --git a/libknet/listener.c b/libknet/listener.c index 6237ea98..ba81b756 100644 --- a/libknet/listener.c +++ b/libknet/listener.c @@ -1,122 +1,124 @@ #include "config.h" #include #include #include #include #include #include "libknet-private.h" int knet_listener_acquire(knet_handle_t knet_h, struct knet_listener **head, int writelock) { int ret; if (writelock != 0) ret = pthread_rwlock_wrlock(&knet_h->list_rwlock); else ret = pthread_rwlock_rdlock(&knet_h->list_rwlock); if (head) *head = (ret == 0) ? knet_h->listener_head : NULL; return ret; } int knet_listener_release(knet_handle_t knet_h) { return pthread_rwlock_unlock(&knet_h->list_rwlock); } int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener) { int value; struct epoll_event ev; listener->sock = socket(listener->address.ss_family, SOCK_DGRAM, 0); if (listener->sock < 0) return listener->sock; value = KNET_RING_RCVBUFF; setsockopt(listener->sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)); if (_fdset_cloexec(listener->sock) != 0) goto exit_fail1; if (bind(listener->sock, (struct sockaddr *) &listener->address, sizeof(struct sockaddr_storage)) != 0) goto exit_fail1; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = listener->sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, listener->sock, &ev) != 0) goto exit_fail1; if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0) goto exit_fail2; /* pushing new host to the front */ listener->next = knet_h->listener_head; knet_h->listener_head = listener; pthread_rwlock_unlock(&knet_h->list_rwlock); return 0; exit_fail2: epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev); exit_fail1: close(listener->sock); return -1; } int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener) { - int i, ret; + int link_idx, ret; struct epoll_event ev; /* kernel < 2.6.9 bug (see epoll_ctl man) */ struct knet_host *host; - struct knet_listener *l; + struct knet_listener *tmp_listener; if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0) return -EINVAL; ret = 0; /* checking if listener is in use */ for (host = knet_h->host_head; host != NULL; host = host->next) { - for (i = 0; i < KNET_MAX_LINK; i++) { - if (host->link[i].ready != 1) continue; + for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { + if (host->link[link_idx].ready != 1) + continue; - if (host->link[i].sock == listener->sock) { + if (host->link[link_idx].sock == listener->sock) { ret = -EBUSY; goto exit_fail1; } } } /* TODO: use a doubly-linked list? */ if (listener == knet_h->listener_head) { knet_h->listener_head = knet_h->listener_head->next; } else { - for (l = knet_h->listener_head; l != NULL; l = l->next) { - if (listener == l->next) { - l->next = l->next->next; + for (tmp_listener = knet_h->listener_head; tmp_listener != NULL; tmp_listener = tmp_listener->next) { + if (listener == tmp_listener->next) { + tmp_listener->next = tmp_listener->next->next; break; } } } epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev); close(listener->sock); exit_fail1: pthread_rwlock_unlock(&knet_h->list_rwlock); - if (ret < 0) errno = -ret; + if (ret < 0) + errno = -ret; return ret; }