diff --git a/libknet/host.c b/libknet/host.c index 9672903e..ab6800eb 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,229 +1,227 @@ #include "config.h" #include #include #include #include #include #include "libknet-private.h" int knet_host_get(knet_handle_t knet_h, uint16_t node_id, struct knet_host **host) { int ret; if ((ret = pthread_rwlock_rdlock(&knet_h->list_rwlock)) != 0) return ret; *host = knet_h->host_index[node_id]; if (*host == NULL) { pthread_rwlock_unlock(&knet_h->list_rwlock); errno = ENOENT; return ENOENT; } return 0; } int knet_host_acquire(knet_handle_t knet_h, struct knet_host **host) { int ret; if ((ret = pthread_rwlock_rdlock(&knet_h->list_rwlock)) != 0) return ret; *host = knet_h->host_head; return 0; } int knet_host_release(knet_handle_t knet_h, struct knet_host **host) { int ret; *host = NULL; if ((ret = pthread_rwlock_unlock(&knet_h->list_rwlock)) != 0) return ret; return 0; } int knet_host_foreach(knet_handle_t knet_h, knet_link_fn_t linkfun, struct knet_host_search *data) { int lockstatus; struct knet_host *host; lockstatus = pthread_rwlock_rdlock(&knet_h->list_rwlock); if ((lockstatus != 0) && (lockstatus != EDEADLK)) return lockstatus; for (host = knet_h->host_head; host != NULL; host = host->next) { if ((linkfun(knet_h, host, data)) != KNET_HOST_FOREACH_NEXT) break; } if (lockstatus == 0) pthread_rwlock_unlock(&knet_h->list_rwlock); return 0; } int knet_host_add(knet_handle_t knet_h, uint16_t node_id) { int link_idx, ret = 0; /* success */ - struct knet_host *host, *tail; + struct knet_host *host; if ((ret = pthread_rwlock_wrlock(&knet_h->list_rwlock)) != 0) goto exit_clean; if (knet_h->host_index[node_id] != NULL) { errno = ret = EEXIST; goto exit_unlock; } if ((host = malloc(sizeof(struct knet_host))) == NULL) goto exit_unlock; memset(host, 0, sizeof(struct knet_host)); host->node_id = node_id; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) host->link[link_idx].link_id = link_idx; /* adding new host to the index */ knet_h->host_index[node_id] = host; if (!knet_h->host_head) { knet_h->host_head = host; + knet_h->host_tail = host; } else { - tail = knet_h->host_head; - while (tail->next != NULL) - tail = tail->next; - - tail->next = host; + knet_h->host_tail->next = host; + knet_h->host_tail = host; } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); exit_clean: return ret; } int knet_host_remove(knet_handle_t knet_h, uint16_t node_id) { int ret = 0; /* success */ struct knet_host *host, *removed; if ((ret = pthread_rwlock_wrlock(&knet_h->list_rwlock)) != 0) goto exit_clean; if (knet_h->host_index[node_id] == NULL) { errno = ret = EINVAL; goto exit_unlock; } removed = NULL; /* removing host from list */ if (knet_h->host_head->node_id == node_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->node_id == node_id) { removed = host->next; host->next = removed->next; break; } } } if (removed != NULL) { knet_h->host_index[node_id] = NULL; free(removed); } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); exit_clean: return ret; } int knet_host_dst_cache_update(knet_handle_t knet_h, uint16_t node_id) { int write_retry = 0; try_again: if (write(knet_h->pipefd[1], &node_id, sizeof(node_id)) != sizeof(node_id)) { if ((write_retry < 10) && ((errno = EAGAIN) || (errno = EWOULDBLOCK))) { write_retry++; goto try_again; } else { return -1; } } return 0; } /* bcast = 0 -> unicast packet | 1 -> broadcast|mcast */ /* make this bcast/ucast aware */ int knet_should_deliver(struct knet_host *host, int bcast, seq_num_t seq_num) { size_t i, j; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = NULL; seq_num_t *dst_seq_num; if (bcast) { dst_cbuf = host->bcast_circular_buffer; dst_seq_num = &host->bcast_seq_num_rx; } else { dst_cbuf = host->ucast_circular_buffer; dst_seq_num = &host->ucast_seq_num_rx; } seq_dist = (seq_num < *dst_seq_num) ? (SEQ_MAX - seq_num) + *dst_seq_num : *dst_seq_num - seq_num; j = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ return (dst_cbuf[j] == 0) ? 1 : 0; } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; if (i > j) { memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i); memset(dst_cbuf, 0, j + 1); } else { memset(dst_cbuf + i, 0, j - i + 1); } *dst_seq_num = seq_num; return 1; } void knet_has_been_delivered(struct knet_host *host, int bcast, seq_num_t seq_num) { if (bcast) { host->bcast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->ucast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } diff --git a/libknet/libknet-private.h b/libknet/libknet-private.h index efccdcc7..7c95894e 100644 --- a/libknet/libknet-private.h +++ b/libknet/libknet-private.h @@ -1,60 +1,61 @@ #ifndef __LIBKNET_PRIVATE_H__ #define __LIBKNET_PRIVATE_H__ /* * NOTE: you shouldn't need to include this header normally */ #include "libknet.h" #define KNET_DATABUFSIZE 131072 /* 128k */ #define KNET_PINGBUFSIZE sizeof(struct knet_frame) #define timespec_diff(start, end, diff) \ do { \ if (end.tv_sec > start.tv_sec) \ *(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \ + end.tv_nsec - start.tv_nsec; \ else \ *(diff) = end.tv_nsec - start.tv_nsec; \ } while (0); struct knet_handle { uint16_t node_id; unsigned int enabled:1; int sockfd; int pipefd[2]; int tap_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; struct knet_host *host_head; + struct knet_host *host_tail; struct knet_host *host_index[KNET_MAX_HOST]; struct knet_listener *listener_head; struct knet_frame *tap_to_links_buf; unsigned char *tap_to_links_buf_crypt; struct knet_frame *recv_from_links_buf; unsigned char *recv_from_links_buf_crypt; struct knet_frame *pingbuf; unsigned char *pingbuf_crypt; pthread_t tap_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_rwlock_t list_rwlock; struct crypto_instance *crypto_instance; seq_num_t bcast_seq_num_tx; uint8_t dst_host_filter; int (*dst_host_filter_fn) ( const unsigned char *outdata, ssize_t outdata_len, uint16_t src_node_id, uint16_t *dst_host_ids, size_t *dst_host_ids_entries); }; int _fdset_cloexec(int fd); int _fdset_nonblock(int fd); int knet_should_deliver(struct knet_host *host, int bcast, seq_num_t seq_num); void knet_has_been_delivered(struct knet_host *host, int bcast, seq_num_t seq_num); #endif