diff --git a/libknet/handle.c b/libknet/handle.c index 11f9bc95..9a7e1c3f 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,452 +1,461 @@ #include "config.h" #include #include #include #include #include #include #include "libknet-private.h" #include "nsscrypto.h" #define KNET_MAX_EVENTS 8 #define KNET_PING_TIMERES 200000 static void *_handle_tap_to_links_thread(void *data); static void *_handle_recv_from_links_thread(void *data); static void *_handle_heartbt_thread(void *data); knet_handle_t knet_handle_new(const struct knet_handle_cfg *knet_handle_cfg) { knet_handle_t knet_h; struct epoll_event ev; /* * validate incoming config request */ if (knet_handle_cfg == NULL) { errno = EINVAL; return NULL; } if (knet_handle_cfg->fd <= 0) { errno = EINVAL; return NULL; } if ((knet_h = malloc(sizeof(struct knet_handle))) == NULL) return NULL; memset(knet_h, 0, sizeof(struct knet_handle)); if (crypto_init(knet_h, knet_handle_cfg) < 0) goto exit_fail1; if ((knet_h->tap_to_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail2; memset(knet_h->tap_to_links_buf, 0, KNET_DATABUFSIZE); if ((knet_h->recv_from_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail3; memset(knet_h->recv_from_links_buf, 0, KNET_DATABUFSIZE); if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL) goto exit_fail4; memset(knet_h->pingbuf, 0, KNET_PINGBUFSIZE); if (pthread_rwlock_init(&knet_h->list_rwlock, NULL) != 0) goto exit_fail5; knet_h->sockfd = knet_handle_cfg->fd; knet_h->tap_to_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->recv_from_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->node_id = knet_handle_cfg->node_id; knet_h->dst_nodeid_offset = knet_handle_cfg->dst_nodeid_offset; knet_h->dst_nodeid_len = knet_handle_cfg->dst_nodeid_len; if ((knet_h->tap_to_links_epollfd < 0) || (knet_h->recv_from_links_epollfd < 0)) goto exit_fail6; if ((_fdset_cloexec(knet_h->tap_to_links_epollfd) != 0) || (_fdset_cloexec(knet_h->recv_from_links_epollfd != 0))) goto exit_fail6; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd; if (epoll_ctl(knet_h->tap_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd, &ev) != 0) goto exit_fail6; if (pthread_create(&knet_h->tap_to_links_thread, 0, _handle_tap_to_links_thread, (void *) knet_h) != 0) goto exit_fail6; if (pthread_create(&knet_h->recv_from_links_thread, 0, _handle_recv_from_links_thread, (void *) knet_h) != 0) goto exit_fail7; if (pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h) != 0) goto exit_fail8; return knet_h; exit_fail8: pthread_cancel(knet_h->recv_from_links_thread); exit_fail7: pthread_cancel(knet_h->tap_to_links_thread); exit_fail6: if (knet_h->tap_to_links_epollfd >= 0) close(knet_h->tap_to_links_epollfd); if (knet_h->recv_from_links_epollfd >= 0) close(knet_h->recv_from_links_epollfd); pthread_rwlock_destroy(&knet_h->list_rwlock); exit_fail5: free(knet_h->pingbuf); exit_fail4: free(knet_h->recv_from_links_buf); exit_fail3: free(knet_h->tap_to_links_buf); exit_fail2: crypto_fini(knet_h); exit_fail1: free(knet_h); return NULL; } int knet_handle_free(knet_handle_t knet_h) { void *retval; if ((knet_h->host_head != NULL) || (knet_h->listener_head != NULL)) goto exit_busy; pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->tap_to_links_thread); pthread_join(knet_h->tap_to_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; close(knet_h->tap_to_links_epollfd); close(knet_h->recv_from_links_epollfd); pthread_rwlock_destroy(&knet_h->list_rwlock); free(knet_h->tap_to_links_buf); free(knet_h->recv_from_links_buf); free(knet_h->pingbuf); crypto_fini(knet_h); free(knet_h); return 0; exit_busy: errno = EBUSY; return -EBUSY; } void knet_handle_setfwd(knet_handle_t knet_h, int enabled) { knet_h->enabled = (enabled == 1) ? 1 : 0; } void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision) { lnk->ping_interval = interval * 1000; /* microseconds */ lnk->pong_timeout = timeout * 1000; /* microseconds */ lnk->latency_fix = precision; lnk->latency_exp = precision - \ ((lnk->ping_interval * precision) / 8000000); } static void _handle_tap_to_links(knet_handle_t knet_h) { int j; ssize_t len, snt, outlen; struct knet_host *i; len = read(knet_h->sockfd, knet_h->tap_to_links_buf->kf_data, - KNET_DATABUFSIZE - KNET_FRAME_SIZE); + KNET_DATABUFSIZE - (KNET_FRAME_SIZE + sizeof(seq_num_t))); if (len == 0) { /* TODO: disconnection, should never happen! */ return; } - len += KNET_FRAME_SIZE; + len += KNET_FRAME_SIZE + sizeof(seq_num_t); if (knet_h->enabled != 1) /* data forward is disabled */ return; /* TODO: packet inspection */ if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; + /* Use bcast only pckts for now since we have no pckt inspector */ + knet_h->tap_to_links_buf->kf_seq_num = ++knet_h->bcast_seq_num; + if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->tap_to_links_buf, len, knet_h->tap_to_links_buf_crypt, &outlen) < 0) return; for (i = knet_h->host_head; i != NULL; i = i->next) { for (j = 0; j < KNET_MAX_LINK; j++) { - if (i->link[j].enabled != 1) /* link is disabled */ + if (i->link[j].ready != 1) /* link is not configured */ + continue; + if (i->link[j].enabled != 1) /* link is not enabled */ continue; snt = sendto(i->link[j].sock, knet_h->tap_to_links_buf_crypt, outlen, MSG_DONTWAIT, (struct sockaddr *) &i->link[j].address, sizeof(struct sockaddr_storage)); - if ((i->active == 0) && (snt == len)) + if ((i->active == 0) && (snt == outlen)) break; } } pthread_rwlock_unlock(&knet_h->list_rwlock); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd) { ssize_t len, outlen; struct sockaddr_storage address; socklen_t addrlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; addrlen = sizeof(struct sockaddr_storage); len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE, MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen); if (crypto_authenticate_and_decrypt(knet_h->crypto_instance, (unsigned char *)knet_h->recv_from_links_buf, &len) < 0) goto exit_unlock; if (len < (KNET_FRAME_SIZE + 1)) goto exit_unlock; if (ntohl(knet_h->recv_from_links_buf->kf_magic) != KNET_FRAME_MAGIC) goto exit_unlock; if (knet_h->recv_from_links_buf->kf_version != KNET_FRAME_VERSION) goto exit_unlock; - src_host = NULL; + knet_h->recv_from_links_buf->kf_node = ntohs(knet_h->recv_from_links_buf->kf_node); + src_host = knet_h->host_index[knet_h->recv_from_links_buf->kf_node]; + if (src_host == NULL) { /* host not found */ + goto exit_unlock; + } + src_link = NULL; if ((knet_h->recv_from_links_buf->kf_type & KNET_FRAME_PMSK) != 0) { - knet_h->recv_from_links_buf->kf_node = ntohs(knet_h->recv_from_links_buf->kf_node); - src_host = knet_h->host_index[knet_h->recv_from_links_buf->kf_node]; - - if (src_host == NULL) /* host not found */ - goto exit_unlock; - src_link = src_host->link + (knet_h->recv_from_links_buf->kf_link % KNET_MAX_LINK); } switch (knet_h->recv_from_links_buf->kf_type) { case KNET_FRAME_DATA: if (knet_h->enabled != 1) /* data forward is disabled */ break; + if (!knet_should_deliver(src_host, 1, knet_h->recv_from_links_buf->kf_seq_num)) + break; + write(knet_h->sockfd, - knet_h->recv_from_links_buf->kf_data, len - KNET_FRAME_SIZE); + knet_h->recv_from_links_buf->kf_data, len - (KNET_FRAME_SIZE + sizeof(seq_num_t))); + + knet_has_been_delivered(src_host, 1, knet_h->recv_from_links_buf->kf_seq_num); break; case KNET_FRAME_PING: knet_h->recv_from_links_buf->kf_type = KNET_FRAME_PONG; knet_h->recv_from_links_buf->kf_node = htons(knet_h->node_id); if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->recv_from_links_buf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) break; - sendto(src_link->sock, knet_h->recv_from_links_buf_crypt, outlen, MSG_DONTWAIT, (struct sockaddr *) &src_link->address, sizeof(struct sockaddr_storage)); break; case KNET_FRAME_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->pong_last); timespec_diff(knet_h->recv_from_links_buf->kf_time, src_link->pong_last, &latency_last); src_link->latency = ((src_link->latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->latency < src_link->pong_timeout) { if (!src_link->enabled) { src_link->enabled = 1; /* TODO: notify packet inspector */ } } break; default: goto exit_unlock; } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); } static void _handle_check_each(knet_handle_t knet_h, struct knet_link *dst_link) { int len; ssize_t outlen; struct timespec clock_now, pong_last; unsigned long long diff_ping; /* caching last pong to avoid race conditions */ pong_last = dst_link->pong_last; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) return; timespec_diff(dst_link->ping_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->ping_interval * 1000llu)) { knet_h->pingbuf->kf_time = clock_now; knet_h->pingbuf->kf_link = dst_link->link_id; if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->pingbuf, KNET_PINGBUFSIZE, knet_h->pingbuf_crypt, &outlen) < 0) return; len = sendto(dst_link->sock, knet_h->pingbuf_crypt, outlen, MSG_DONTWAIT, (struct sockaddr *) &dst_link->address, sizeof(struct sockaddr_storage)); if (len == outlen) dst_link->ping_last = clock_now; } if (dst_link->enabled == 1) { timespec_diff(pong_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->pong_timeout * 1000llu)) { dst_link->enabled = 0; /* TODO: might need write lock */ /* TODO: notify packet inspector */ } } } static void *_handle_heartbt_thread(void *data) { int j; knet_handle_t knet_h; struct knet_host *i; knet_h = (knet_handle_t) data; /* preparing ping buffer */ knet_h->pingbuf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->pingbuf->kf_version = KNET_FRAME_VERSION; knet_h->pingbuf->kf_type = KNET_FRAME_PING; knet_h->pingbuf->kf_node = htons(knet_h->node_id); while (1) { usleep(KNET_PING_TIMERES); if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) continue; for (i = knet_h->host_head; i != NULL; i = i->next) { for (j = 0; j < KNET_MAX_LINK; j++) { if (i->link[j].ready != 1) continue; _handle_check_each(knet_h, &i->link[j]); } } pthread_rwlock_unlock(&knet_h->list_rwlock); } return NULL; } static void *_handle_tap_to_links_thread(void *data) { knet_handle_t knet_h; struct epoll_event events[KNET_MAX_EVENTS]; knet_h = (knet_handle_t) data; /* preparing data buffer */ knet_h->tap_to_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->tap_to_links_buf->kf_version = KNET_FRAME_VERSION; knet_h->tap_to_links_buf->kf_type = KNET_FRAME_DATA; + knet_h->tap_to_links_buf->kf_node = htons(knet_h->node_id); while (1) { if (epoll_wait(knet_h->tap_to_links_epollfd, events, KNET_MAX_EVENTS, -1) >= 1) _handle_tap_to_links(knet_h); } return NULL; } static void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_MAX_EVENTS]; while (1) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd); } } return NULL; } diff --git a/libknet/host.c b/libknet/host.c index fe012f0a..d20ccc80 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,150 +1,197 @@ #include "config.h" #include #include #include #include #include "libknet-private.h" int knet_host_get(knet_handle_t knet_h, uint16_t node_id, struct knet_host **host) { int ret; if ((ret = pthread_rwlock_rdlock(&knet_h->list_rwlock)) != 0) return ret; *host = knet_h->host_index[node_id]; if (*host == NULL) { pthread_rwlock_unlock(&knet_h->list_rwlock); errno = ENOENT; return ENOENT; } return 0; } int knet_host_acquire(knet_handle_t knet_h, struct knet_host **host) { int ret; if ((ret = pthread_rwlock_rdlock(&knet_h->list_rwlock)) != 0) return ret; *host = knet_h->host_head; return 0; } int knet_host_release(knet_handle_t knet_h, struct knet_host **host) { int ret; *host = NULL; if ((ret = pthread_rwlock_unlock(&knet_h->list_rwlock)) != 0) return ret; return 0; } int knet_host_foreach(knet_handle_t knet_h, knet_link_fn_t linkfun, struct knet_host_search *data) { int lockstatus; struct knet_host *host; lockstatus = pthread_rwlock_rdlock(&knet_h->list_rwlock); if ((lockstatus != 0) && (lockstatus != EDEADLK)) return lockstatus; for (host = knet_h->host_head; host != NULL; host = host->next) { if ((linkfun(knet_h, host, data)) != KNET_HOST_FOREACH_NEXT) break; } if (lockstatus == 0) pthread_rwlock_unlock(&knet_h->list_rwlock); return 0; } int knet_host_add(knet_handle_t knet_h, uint16_t node_id) { int i, ret = 0; /* success */ struct knet_host *host; if ((ret = pthread_rwlock_wrlock(&knet_h->list_rwlock)) != 0) goto exit_clean; if (knet_h->host_index[node_id] != NULL) { errno = ret = EEXIST; goto exit_unlock; } if ((host = malloc(sizeof(struct knet_host))) == NULL) goto exit_unlock; memset(host, 0, sizeof(struct knet_host)); host->node_id = node_id; for (i = 0; i < KNET_MAX_LINK; i++) host->link[i].link_id = i; /* adding new host to the index */ knet_h->host_index[node_id] = host; /* TODO: keep hosts ordered */ /* pushing new host to the front */ host->next = knet_h->host_head; knet_h->host_head = host; exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); exit_clean: return ret; } int knet_host_remove(knet_handle_t knet_h, uint16_t node_id) { int ret = 0; /* success */ struct knet_host *i, *removed; if ((ret = pthread_rwlock_wrlock(&knet_h->list_rwlock)) != 0) goto exit_clean; if (knet_h->host_index[node_id] == NULL) { errno = ret = EINVAL; goto exit_unlock; } removed = NULL; /* removing host from list */ if (knet_h->host_head->node_id == node_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (i = knet_h->host_head; i->next != NULL; i = i->next) { if (i->next->node_id == node_id) { removed = i->next; i->next = removed->next; break; } } } if (removed != NULL) { knet_h->host_index[node_id] = NULL; free(removed); } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); exit_clean: return ret; } + +/* bcast = 0 -> unicast packet | 1 -> broadcast|mcast */ + +/* make this bcast/ucast aware */ +int knet_should_deliver(struct knet_host *host, int bcast, seq_num_t seq_num) +{ + size_t i, j; /* circular buffer indexes */ + seq_num_t seq_dist; + + seq_dist = (seq_num < host->bcast_seq_num_rx) ? + (SEQ_MAX - seq_num) + host->bcast_seq_num_rx : host->bcast_seq_num_rx - seq_num; + + j = seq_num % KNET_CBUFFER_SIZE; + + if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ + return (host->bcast_circular_buffer[j] == 0) ? 1 : 0; + } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { + memset(host->bcast_circular_buffer, 0, KNET_CBUFFER_SIZE); + host->bcast_seq_num_rx = seq_num; + } + + /* cleaning up circular buffer */ + i = (host->bcast_seq_num_rx + 1) % KNET_CBUFFER_SIZE; + + if (i > j) { + memset(host->bcast_circular_buffer + i, 0, KNET_CBUFFER_SIZE - i); + memset(host->bcast_circular_buffer, 0, j + 1); + } else { + memset(host->bcast_circular_buffer + i, 0, j - i + 1); + } + + host->bcast_seq_num_rx = seq_num; + + return 1; +} + +void knet_has_been_delivered(struct knet_host *host, int bcast, seq_num_t seq_num) +{ + + if (bcast) { + host->bcast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; + } else { + host->ucast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; + } + + return; +} diff --git a/libknet/libknet-private.h b/libknet/libknet-private.h index 2170bf04..151d60a4 100644 --- a/libknet/libknet-private.h +++ b/libknet/libknet-private.h @@ -1,50 +1,51 @@ #ifndef __KNETHANDLE_H__ #define __KNETHANDLE_H__ /* NOTE: you shouldn't need to include this header normally, it is provided for * testing purpose only. */ #include "libknet.h" #define KNET_DATABUFSIZE 131072 /* 128k */ #define KNET_PINGBUFSIZE sizeof(struct knet_frame) #define timespec_diff(start, end, diff) \ do { \ if (end.tv_sec > start.tv_sec) \ *(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \ + end.tv_nsec - start.tv_nsec; \ else \ *(diff) = end.tv_nsec - start.tv_nsec; \ } while (0); struct knet_handle { int sockfd; int tap_to_links_epollfd; int recv_from_links_epollfd; uint16_t node_id; unsigned int enabled:1; struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; struct knet_listener *listener_head; struct knet_frame *tap_to_links_buf; unsigned char *tap_to_links_buf_crypt; struct knet_frame *recv_from_links_buf; unsigned char *recv_from_links_buf_crypt; struct knet_frame *pingbuf; unsigned char *pingbuf_crypt; pthread_t tap_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_rwlock_t list_rwlock; struct crypto_instance *crypto_instance; off_t dst_nodeid_offset; size_t dst_nodeid_len; seq_num_t bcast_seq_num; - seq_num_t ucast_seq_num; }; int _fdset_cloexec(int fd); +int knet_should_deliver(struct knet_host *host, int bcast, seq_num_t seq_num); +void knet_has_been_delivered(struct knet_host *host, int bcast, seq_num_t seq_num); #endif diff --git a/libknet/libknet.h b/libknet/libknet.h index 53466a04..6843cef5 100644 --- a/libknet/libknet.h +++ b/libknet/libknet.h @@ -1,136 +1,141 @@ #ifndef __RING_H__ #define __RING_H__ #include #include typedef struct knet_handle *knet_handle_t; #define KNET_RING_DEFPORT 50000 #define KNET_RING_RCVBUFF 8388608 #define KNET_MAX_HOST 65536 #define KNET_MAX_LINK 8 #define KNET_MAX_HOST_LEN 64 +#define KNET_CBUFFER_SIZE 4096 +/* +typedef uint64_t seq_num_t; +#define SEQ_MAX UINT64_MAX +*/ +typedef uint16_t seq_num_t; +#define SEQ_MAX UINT16_MAX + struct knet_link { uint8_t link_id; int sock; char ipaddr[KNET_MAX_HOST_LEN]; char port[6]; struct sockaddr_storage address; unsigned int ready:1; /* link is configured and ready to be used */ unsigned int enabled:1; /* link is enabled for data */ unsigned long long latency; /* average latency computed by fix/exp */ unsigned int latency_exp; unsigned int latency_fix; unsigned long long ping_interval; unsigned long long pong_timeout; struct timespec ping_last; struct timespec pong_last; }; struct knet_host { uint16_t node_id; char name[KNET_MAX_HOST_LEN]; unsigned int active:1; /* data packets are sent to all links */ + char bcast_circular_buffer[KNET_CBUFFER_SIZE]; + seq_num_t bcast_seq_num_rx; + char ucast_circular_buffer[KNET_CBUFFER_SIZE]; + seq_num_t ucast_seq_num_tx; + seq_num_t ucast_seq_num_rx; struct knet_listener *listener; struct knet_link link[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_listener { int sock; char ipaddr[KNET_MAX_HOST_LEN]; char port[6]; struct sockaddr_storage address; struct knet_listener *next; }; -/* change those to uint8 and UINT8_MAX to test rollover */ -/* -typedef uint32_t seq_num_t; -#define SEQ_MAX UINT32_MAX -*/ -typedef uint16_t seq_num_t; -#define SEQ_MAX UINT16_MAX - union knet_frame_data { struct { seq_num_t kfd_seq_num; uint8_t kfd_data[0]; } data; struct { uint8_t kfd_link; struct timespec kfd_time; } ping; } __attribute__((packed)); struct knet_frame { uint32_t kf_magic; uint8_t kf_version; uint8_t kf_type; uint16_t kf_node; union knet_frame_data kf_payload; } __attribute__((packed)); #define kf_seq_num kf_payload.data.kfd_seq_num #define kf_data kf_payload.data.kfd_data #define kf_link kf_payload.ping.kfd_link #define kf_time kf_payload.ping.kfd_time #define KNET_FRAME_SIZE (sizeof(struct knet_frame) - sizeof(union knet_frame_data)) #define KNET_FRAME_MAGIC 0x12344321 #define KNET_FRAME_VERSION 0x01 #define KNET_FRAME_DATA 0x00 #define KNET_FRAME_PING 0x81 #define KNET_FRAME_PONG 0x82 #define KNET_FRAME_PMSK 0x80 /* ping/pong packet mask */ #define KNET_MIN_KEY_LEN 1024 #define KNET_MAX_KEY_LEN 4096 struct knet_handle_cfg { int fd; uint16_t node_id; char *crypto_cipher_type; char *crypto_hash_type; unsigned char *private_key; unsigned int private_key_len; off_t dst_nodeid_offset; /* destination node_id offset in data packet */ size_t dst_nodeid_len; /* 0 = broadcast to all, 1|2 256/64k nodes, > 2 == 2 */ }; knet_handle_t knet_handle_new(const struct knet_handle_cfg *knet_handle_cfg); void knet_handle_setfwd(knet_handle_t knet_h, int enabled); int knet_handle_free(knet_handle_t knet_h); int knet_host_add(knet_handle_t knet_h, uint16_t node_id); int knet_host_acquire(knet_handle_t knet_h, struct knet_host **host); int knet_host_get(knet_handle_t knet_h, uint16_t node_id, struct knet_host **host); int knet_host_release(knet_handle_t knet_h, struct knet_host **host); int knet_host_remove(knet_handle_t knet_h, uint16_t node_id); void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision); #define KNET_HOST_FOREACH_NEXT 0 /* next host */ #define KNET_HOST_FOREACH_FOUND 1 /* host found, exit loop */ struct knet_host_search { int param1; /* user parameter 1 */ void *data1; /* user data pointer 1 */ void *data2; /* user data pointer 2 */ int retval; /* search return value */ }; typedef int (*knet_link_fn_t)(knet_handle_t knet_h, struct knet_host *host, struct knet_host_search *data); int knet_host_foreach(knet_handle_t knet_h, knet_link_fn_t linkfun, struct knet_host_search *data); int knet_listener_acquire(knet_handle_t knet_h, struct knet_listener **head, int writelock); int knet_listener_release(knet_handle_t knet_h); int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener); int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener); #endif