diff --git a/libknet/handle.c b/libknet/handle.c index 982b8095..8026e1af 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,714 +1,737 @@ #include "config.h" #include #include #include #include #include #include #include "libknet-private.h" #include "crypto.h" #define KNET_MAX_EVENTS 8 #define KNET_PING_TIMERES 200000 static void *_handle_tap_to_links_thread(void *data); static void *_handle_recv_from_links_thread(void *data); static void *_handle_heartbt_thread(void *data); static void *_handle_dst_link_handler_thread(void *data); knet_handle_t knet_handle_new(const struct knet_handle_cfg *knet_handle_cfg) { knet_handle_t knet_h; struct epoll_event ev; /* * validate incoming config request */ if (knet_handle_cfg == NULL) { errno = EINVAL; return NULL; } if (knet_handle_cfg->fd <= 0) { errno = EINVAL; return NULL; } if ((knet_h = malloc(sizeof(struct knet_handle))) == NULL) return NULL; memset(knet_h, 0, sizeof(struct knet_handle)); knet_h->node_id = knet_handle_cfg->node_id; knet_h->sockfd = knet_handle_cfg->fd; if (pipe(knet_h->pipefd)) goto exit_fail1; if ((_fdset_cloexec(knet_h->pipefd[0])) || (_fdset_cloexec(knet_h->pipefd[1])) || (_fdset_nonblock(knet_h->pipefd[0])) || (_fdset_nonblock(knet_h->pipefd[1]))) goto exit_fail2; knet_h->dst_host_filter = knet_handle_cfg->dst_host_filter; knet_h->dst_host_filter_fn = knet_handle_cfg->dst_host_filter_fn; if ((knet_h->dst_host_filter) && (!knet_h->dst_host_filter_fn)) goto exit_fail2; if ((knet_h->tap_to_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail2; memset(knet_h->tap_to_links_buf, 0, KNET_DATABUFSIZE); if ((knet_h->recv_from_links_buf = malloc(KNET_DATABUFSIZE))== NULL) goto exit_fail3; memset(knet_h->recv_from_links_buf, 0, KNET_DATABUFSIZE); if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL) goto exit_fail4; memset(knet_h->pingbuf, 0, KNET_PINGBUFSIZE); if (pthread_rwlock_init(&knet_h->list_rwlock, NULL) != 0) goto exit_fail5; knet_h->tap_to_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->recv_from_links_epollfd = epoll_create(KNET_MAX_EVENTS); knet_h->dst_link_handler_epollfd = epoll_create(KNET_MAX_EVENTS); if ((knet_h->tap_to_links_epollfd < 0) || (knet_h->recv_from_links_epollfd < 0) || (knet_h->dst_link_handler_epollfd < 0)) goto exit_fail6; if ((_fdset_cloexec(knet_h->tap_to_links_epollfd) != 0) || (_fdset_cloexec(knet_h->recv_from_links_epollfd) != 0) || (_fdset_cloexec(knet_h->dst_link_handler_epollfd) != 0)) goto exit_fail6; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd; if (epoll_ctl(knet_h->tap_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd, &ev) != 0) goto exit_fail6; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->pipefd[0]; if (epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_ADD, knet_h->pipefd[0], &ev) != 0) goto exit_fail6; if (pthread_create(&knet_h->dst_link_handler_thread, 0, _handle_dst_link_handler_thread, (void *) knet_h) != 0) goto exit_fail6; if (pthread_create(&knet_h->tap_to_links_thread, 0, _handle_tap_to_links_thread, (void *) knet_h) != 0) goto exit_fail7; if (pthread_create(&knet_h->recv_from_links_thread, 0, _handle_recv_from_links_thread, (void *) knet_h) != 0) goto exit_fail8; if (pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h) != 0) goto exit_fail9; return knet_h; exit_fail9: pthread_cancel(knet_h->recv_from_links_thread); exit_fail8: pthread_cancel(knet_h->tap_to_links_thread); exit_fail7: pthread_cancel(knet_h->dst_link_handler_thread); exit_fail6: if (knet_h->tap_to_links_epollfd >= 0) close(knet_h->tap_to_links_epollfd); if (knet_h->recv_from_links_epollfd >= 0) close(knet_h->recv_from_links_epollfd); if (knet_h->dst_link_handler_epollfd >= 0) close(knet_h->dst_link_handler_epollfd); pthread_rwlock_destroy(&knet_h->list_rwlock); exit_fail5: free(knet_h->pingbuf); exit_fail4: free(knet_h->recv_from_links_buf); exit_fail3: free(knet_h->tap_to_links_buf); exit_fail2: close(knet_h->pipefd[0]); close(knet_h->pipefd[1]); exit_fail1: free(knet_h); return NULL; } int knet_handle_free(knet_handle_t knet_h) { void *retval; if ((knet_h->host_head != NULL) || (knet_h->listener_head != NULL)) goto exit_busy; pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->tap_to_links_thread); pthread_join(knet_h->tap_to_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; pthread_cancel(knet_h->dst_link_handler_thread); pthread_join(knet_h->dst_link_handler_thread, &retval); if (retval != PTHREAD_CANCELED) goto exit_busy; close(knet_h->tap_to_links_epollfd); close(knet_h->recv_from_links_epollfd); close(knet_h->dst_link_handler_epollfd); close(knet_h->pipefd[0]); close(knet_h->pipefd[1]); pthread_rwlock_destroy(&knet_h->list_rwlock); free(knet_h->tap_to_links_buf); free(knet_h->recv_from_links_buf); free(knet_h->pingbuf); crypto_fini(knet_h); free(knet_h); return 0; exit_busy: errno = EBUSY; return -EBUSY; } void knet_handle_setfwd(knet_handle_t knet_h, int enabled) { knet_h->enabled = (enabled == 1) ? 1 : 0; } int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) { if (knet_h->enabled) return -1; crypto_fini(knet_h); if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { return 0; } return crypto_init(knet_h, knet_handle_crypto_cfg); } static int knet_link_updown(knet_handle_t knet_h, uint16_t node_id, struct knet_link *lnk, int configured, int connected) { + unsigned int old_configured = lnk->configured; + unsigned int old_connected = lnk->connected; + if ((lnk->configured == configured) && (lnk->connected == connected)) return 0; - if (_dst_cache_update(knet_h, node_id)) - return -1; - lnk->configured = configured; lnk->connected = connected; + if (_dst_cache_update(knet_h, node_id)) { + lnk->configured = old_configured; + lnk->connected = old_connected; + return -1; + } + return 0; } int knet_link_enable(knet_handle_t knet_h, uint16_t node_id, struct knet_link *lnk, int configured) { return knet_link_updown(knet_h, node_id, lnk, configured, lnk->connected); } +int knet_link_priority(knet_handle_t knet_h, uint16_t node_id, struct knet_link *lnk, uint8_t priority) +{ + uint8_t old_priority = lnk->priority; + + if (lnk->priority == priority) + return 0; + + lnk->priority = priority; + + if (_dst_cache_update(knet_h, node_id)) { + lnk->priority = old_priority; + return -1; + } + + return 0; +} + void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision) { lnk->ping_interval = interval * 1000; /* microseconds */ lnk->pong_timeout = timeout * 1000; /* microseconds */ lnk->latency_fix = precision; lnk->latency_exp = precision - \ ((lnk->ping_interval * precision) / 8000000); } static void _handle_tap_to_links(knet_handle_t knet_h) { ssize_t inlen, len, outlen; struct knet_host *dst_host; int link_idx; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; unsigned char *outbuf = (unsigned char *)knet_h->tap_to_links_buf; inlen = read(knet_h->sockfd, knet_h->tap_to_links_buf->kf_data, KNET_DATABUFSIZE - (KNET_FRAME_SIZE + sizeof(seq_num_t))); if (inlen == 0) { /* TODO: disconnection, should never happen! */ return; } outlen = len = inlen + KNET_FRAME_SIZE + sizeof(seq_num_t); if (knet_h->enabled != 1) /* data forward is disabled */ return; if (knet_h->dst_host_filter) { bcast = knet_h->dst_host_filter_fn( (const unsigned char *)knet_h->tap_to_links_buf->kf_data, inlen, knet_h->tap_to_links_buf->kf_node, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) return; if ((!bcast) && (!dst_host_ids_entries)) return; } if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; if (!bcast) { int host_idx; for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; if (!dst_host) continue; knet_h->tap_to_links_buf->kf_seq_num = htons(++dst_host->ucast_seq_num_tx); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->tap_to_links_buf, len, knet_h->tap_to_links_buf_crypt, &outlen) < 0) { pthread_rwlock_unlock(&knet_h->list_rwlock); return; } outbuf = knet_h->tap_to_links_buf_crypt; } for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { sendto(dst_host->link[dst_host->active_links[link_idx]].sock, outbuf, outlen, MSG_DONTWAIT, (struct sockaddr *) &dst_host->link[dst_host->active_links[link_idx]].address, sizeof(struct sockaddr_storage)); if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } } } else { knet_h->tap_to_links_buf->kf_seq_num = htons(++knet_h->bcast_seq_num_tx); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->tap_to_links_buf, len, knet_h->tap_to_links_buf_crypt, &outlen) < 0) { pthread_rwlock_unlock(&knet_h->list_rwlock); return; } outbuf = knet_h->tap_to_links_buf_crypt; } for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { sendto(dst_host->link[dst_host->active_links[link_idx]].sock, outbuf, outlen, MSG_DONTWAIT, (struct sockaddr *) &dst_host->link[dst_host->active_links[link_idx]].address, sizeof(struct sockaddr_storage)); if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } } } pthread_rwlock_unlock(&knet_h->list_rwlock); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd) { ssize_t len, outlen; struct sockaddr_storage address; socklen_t addrlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; unsigned char *outbuf = (unsigned char *)knet_h->recv_from_links_buf; if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) return; addrlen = sizeof(struct sockaddr_storage); len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE, MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen); if (knet_h->crypto_instance) { if (crypto_authenticate_and_decrypt(knet_h->crypto_instance, (unsigned char *)knet_h->recv_from_links_buf, &len) < 0) goto exit_unlock; } if (len < (KNET_FRAME_SIZE + 1)) goto exit_unlock; if (ntohl(knet_h->recv_from_links_buf->kf_magic) != KNET_FRAME_MAGIC) goto exit_unlock; if (knet_h->recv_from_links_buf->kf_version != KNET_FRAME_VERSION) goto exit_unlock; knet_h->recv_from_links_buf->kf_node = ntohs(knet_h->recv_from_links_buf->kf_node); src_host = knet_h->host_index[knet_h->recv_from_links_buf->kf_node]; if (src_host == NULL) { /* host not found */ goto exit_unlock; } src_link = NULL; if ((knet_h->recv_from_links_buf->kf_type & KNET_FRAME_PMSK) != 0) { src_link = src_host->link + (knet_h->recv_from_links_buf->kf_link % KNET_MAX_LINK); } switch (knet_h->recv_from_links_buf->kf_type) { case KNET_FRAME_DATA: if (knet_h->enabled != 1) /* data forward is disabled */ break; knet_h->recv_from_links_buf->kf_seq_num = ntohs(knet_h->recv_from_links_buf->kf_seq_num); if (knet_h->dst_host_filter) { int host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( (const unsigned char *)knet_h->recv_from_links_buf->kf_data, len, knet_h->recv_from_links_buf->kf_node, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) goto exit_unlock; if ((!bcast) && (!dst_host_ids_entries)) goto exit_unlock; /* check if we are dst for this packet */ if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->node_id) { found = 1; break; } } if (!found) goto exit_unlock; } } if (!knet_should_deliver(src_host, bcast, knet_h->recv_from_links_buf->kf_seq_num)) goto exit_unlock; if (write(knet_h->sockfd, knet_h->recv_from_links_buf->kf_data, len - (KNET_FRAME_SIZE + sizeof(seq_num_t))) == len - (KNET_FRAME_SIZE + sizeof(seq_num_t))) knet_has_been_delivered(src_host, bcast, knet_h->recv_from_links_buf->kf_seq_num); break; case KNET_FRAME_PING: outlen = KNET_PINGBUFSIZE; knet_h->recv_from_links_buf->kf_type = KNET_FRAME_PONG; knet_h->recv_from_links_buf->kf_node = htons(knet_h->node_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->recv_from_links_buf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) break; outbuf = knet_h->recv_from_links_buf_crypt; } sendto(src_link->sock, outbuf, outlen, MSG_DONTWAIT, (struct sockaddr *) &src_link->address, sizeof(struct sockaddr_storage)); break; case KNET_FRAME_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->pong_last); timespec_diff(knet_h->recv_from_links_buf->kf_time, src_link->pong_last, &latency_last); src_link->latency = ((src_link->latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->latency < src_link->pong_timeout) { if (!src_link->connected) { knet_link_updown(knet_h, src_host->node_id, src_link, src_link->configured, 1); } } break; default: goto exit_unlock; } exit_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); } static void _handle_dst_link_updates(knet_handle_t knet_h) { uint16_t dst_host_id; struct knet_host *dst_host; int link_idx; int best_priority = -1; if (read(knet_h->pipefd[0], &dst_host_id, sizeof(dst_host_id)) != sizeof(dst_host_id)) return; if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0) return; dst_host = knet_h->host_index[dst_host_id]; if (!dst_host) goto out_unlock; dst_host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (dst_host->link[link_idx].configured != 1) /* link is not configured */ continue; if (dst_host->link[link_idx].connected != 1) /* link is not enabled */ continue; if (dst_host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (dst_host->link[link_idx].priority > best_priority) { dst_host->active_links[0] = link_idx; best_priority = dst_host->link[link_idx].priority; } dst_host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ dst_host->active_links[dst_host->active_link_entries] = link_idx; dst_host->active_link_entries++; } } /* no active links, we can clean the circular buffers and indexes */ if (!dst_host->active_link_entries) { memset(dst_host->bcast_circular_buffer, 0, KNET_CBUFFER_SIZE); memset(dst_host->ucast_circular_buffer, 0, KNET_CBUFFER_SIZE); dst_host->bcast_seq_num_rx = 0; dst_host->ucast_seq_num_rx = 0; } out_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); return; } static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { int len; ssize_t outlen = KNET_PINGBUFSIZE; struct timespec clock_now, pong_last; unsigned long long diff_ping; unsigned char *outbuf = (unsigned char *)knet_h->pingbuf; /* caching last pong to avoid race conditions */ pong_last = dst_link->pong_last; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) return; timespec_diff(dst_link->ping_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->ping_interval * 1000llu)) { knet_h->pingbuf->kf_time = clock_now; knet_h->pingbuf->kf_link = dst_link->link_id; if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h->crypto_instance, (const unsigned char *)knet_h->pingbuf, KNET_PINGBUFSIZE, knet_h->pingbuf_crypt, &outlen) < 0) return; outbuf = knet_h->pingbuf_crypt; } len = sendto(dst_link->sock, outbuf, outlen, MSG_DONTWAIT, (struct sockaddr *) &dst_link->address, sizeof(struct sockaddr_storage)); if (len == outlen) dst_link->ping_last = clock_now; } if (dst_link->connected == 1) { timespec_diff(pong_last, clock_now, &diff_ping); if (diff_ping >= (dst_link->pong_timeout * 1000llu)) { knet_link_updown(knet_h, dst_host->node_id, dst_link, dst_link->configured, 0); } } } static void *_handle_heartbt_thread(void *data) { knet_handle_t knet_h; struct knet_host *dst_host; int link_idx; knet_h = (knet_handle_t) data; /* preparing ping buffer */ knet_h->pingbuf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->pingbuf->kf_version = KNET_FRAME_VERSION; knet_h->pingbuf->kf_type = KNET_FRAME_PING; knet_h->pingbuf->kf_node = htons(knet_h->node_id); while (1) { usleep(KNET_PING_TIMERES); if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) continue; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (dst_host->link[link_idx].configured != 1) continue; _handle_check_each(knet_h, dst_host, &dst_host->link[link_idx]); } } pthread_rwlock_unlock(&knet_h->list_rwlock); } return NULL; } static void *_handle_tap_to_links_thread(void *data) { knet_handle_t knet_h; struct epoll_event events[KNET_MAX_EVENTS]; knet_h = (knet_handle_t) data; /* preparing data buffer */ knet_h->tap_to_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC); knet_h->tap_to_links_buf->kf_version = KNET_FRAME_VERSION; knet_h->tap_to_links_buf->kf_type = KNET_FRAME_DATA; knet_h->tap_to_links_buf->kf_node = htons(knet_h->node_id); while (1) { if (epoll_wait(knet_h->tap_to_links_epollfd, events, KNET_MAX_EVENTS, -1) >= 1) _handle_tap_to_links(knet_h); } return NULL; } static void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_MAX_EVENTS]; while (1) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd); } } return NULL; } static void *_handle_dst_link_handler_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_MAX_EVENTS]; while (1) { if (epoll_wait(knet_h->dst_link_handler_epollfd, events, KNET_MAX_EVENTS, -1) >= 1) _handle_dst_link_updates(knet_h); } return NULL; } diff --git a/libknet/libknet.h b/libknet/libknet.h index ba8362d3..80396596 100644 --- a/libknet/libknet.h +++ b/libknet/libknet.h @@ -1,178 +1,179 @@ #ifndef __LIBKNET_H__ #define __LIBKNET_H__ #include #include typedef struct knet_handle *knet_handle_t; #define KNET_RING_DEFPORT 50000 #define KNET_RING_RCVBUFF 8388608 #define KNET_MAX_HOST 65536 #define KNET_MAX_LINK 8 #define KNET_MAX_HOST_LEN 64 #define KNET_CBUFFER_SIZE 4096 /* typedef uint64_t seq_num_t; #define SEQ_MAX UINT64_MAX */ typedef uint16_t seq_num_t; #define SEQ_MAX UINT16_MAX struct knet_link { uint8_t link_id; int sock; char ipaddr[KNET_MAX_HOST_LEN]; char port[6]; struct sockaddr_storage address; unsigned int configured:1; /* link is configured and ready to be used */ unsigned int connected:1; /* link is enabled for data */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long latency; /* average latency computed by fix/exp */ unsigned int latency_exp; unsigned int latency_fix; unsigned long long ping_interval; unsigned long long pong_timeout; struct timespec ping_last; struct timespec pong_last; }; #define KNET_LINK_POLICY_PASSIVE 0 #define KNET_LINK_POLICY_ACTIVE 1 #define KNET_LINK_POLICY_RR 2 struct knet_host { uint16_t node_id; char name[KNET_MAX_HOST_LEN]; char bcast_circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t bcast_seq_num_rx; char ucast_circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t ucast_seq_num_tx; seq_num_t ucast_seq_num_rx; struct knet_listener *listener; struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; uint8_t link_handler_policy; struct knet_host *next; }; struct knet_listener { int sock; char ipaddr[KNET_MAX_HOST_LEN]; char port[6]; struct sockaddr_storage address; struct knet_listener *next; }; union knet_frame_data { struct { seq_num_t kfd_seq_num; uint8_t kfd_data[0]; } data; struct { uint8_t kfd_link; struct timespec kfd_time; } ping; } __attribute__((packed)); struct knet_frame { uint32_t kf_magic; uint8_t kf_version; uint8_t kf_type; uint16_t kf_node; union knet_frame_data kf_payload; } __attribute__((packed)); #define kf_seq_num kf_payload.data.kfd_seq_num #define kf_data kf_payload.data.kfd_data #define kf_link kf_payload.ping.kfd_link #define kf_time kf_payload.ping.kfd_time #define KNET_FRAME_SIZE (sizeof(struct knet_frame) - sizeof(union knet_frame_data)) #define KNET_FRAME_MAGIC 0x12344321 #define KNET_FRAME_VERSION 0x01 #define KNET_FRAME_DATA 0x00 #define KNET_FRAME_PING 0x81 #define KNET_FRAME_PONG 0x82 #define KNET_FRAME_PMSK 0x80 /* ping/pong packet mask */ #define KNET_MIN_KEY_LEN 1024 #define KNET_MAX_KEY_LEN 4096 #define KNET_DST_FILTER_DISABLE 0 /* pckt goes everywhere */ #define KNET_DST_FILTER_ENABLE 1 /* pckt goes via dst_host_filter, see knet_ether_filter for example */ /* * dst_host_filter_fn should return * -1 on error, pkt is discarded * 0 all good, send pkt to dst_host_ids and there are dst_host_ids_entries in buffer ready * 1 send it to all hosts. contents of dst_host_ids and dst_host_ids_entries is ignored. */ struct knet_handle_cfg { int fd; uint16_t node_id; uint8_t dst_host_filter; int (*dst_host_filter_fn) ( const unsigned char *outdata, ssize_t outdata_len, uint16_t src_node_id, uint16_t *dst_host_ids, size_t *dst_host_ids_entries); }; int ether_host_filter_fn (const unsigned char *outdata, ssize_t outdata_len, uint16_t src_node_id, uint16_t *dst_host_ids, size_t *dst_host_ids_entries); knet_handle_t knet_handle_new(const struct knet_handle_cfg *knet_handle_cfg); void knet_handle_setfwd(knet_handle_t knet_h, int enabled); int knet_handle_free(knet_handle_t knet_h); struct knet_handle_crypto_cfg { char crypto_model[16]; char crypto_cipher_type[16]; char crypto_hash_type[16]; unsigned char private_key[KNET_MAX_KEY_LEN]; unsigned int private_key_len; }; int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg); int knet_host_add(knet_handle_t knet_h, uint16_t node_id); int knet_host_acquire(knet_handle_t knet_h, struct knet_host **host); int knet_host_get(knet_handle_t knet_h, uint16_t node_id, struct knet_host **host); int knet_host_release(knet_handle_t knet_h, struct knet_host **host); int knet_host_remove(knet_handle_t knet_h, uint16_t node_id); int knet_host_set_policy(knet_handle_t knet_h, uint16_t node_id, int policy); int knet_link_enable(knet_handle_t knet_h, uint16_t node_id, struct knet_link *lnk, int configured); void knet_link_timeout(struct knet_link *lnk, time_t interval, time_t timeout, int precision); +int knet_link_priority(knet_handle_t knet_h, uint16_t node_id, struct knet_link *lnk, uint8_t priority); #define KNET_HOST_FOREACH_NEXT 0 /* next host */ #define KNET_HOST_FOREACH_FOUND 1 /* host found, exit loop */ struct knet_host_search { int param1; /* user parameter 1 */ void *data1; /* user data pointer 1 */ void *data2; /* user data pointer 2 */ int retval; /* search return value */ }; typedef int (*knet_link_fn_t)(knet_handle_t knet_h, struct knet_host *host, struct knet_host_search *data); int knet_host_foreach(knet_handle_t knet_h, knet_link_fn_t linkfun, struct knet_host_search *data); int knet_listener_acquire(knet_handle_t knet_h, struct knet_listener **head, int writelock); int knet_listener_release(knet_handle_t knet_h); int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener); int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener); #endif