diff --git a/libknet/host.c b/libknet/host.c index 8c74009e..8c440075 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,782 +1,779 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "host.h" #include "internals.h" #include "logging.h" static void _host_list_update(knet_handle_t knet_h) { struct knet_host *host; knet_h->host_ids_entries = 0; for (host = knet_h->host_head; host != NULL; host = host->next) { knet_h->host_ids[knet_h->host_ids_entries] = host->host_id; knet_h->host_ids_entries++; } } int knet_host_add(knet_handle_t knet_h, uint16_t host_id) { int savederrno = 0, err = 0; struct knet_host *host = NULL; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_index[host_id]) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } host = malloc(sizeof(struct knet_host)); if (!host) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memset(host, 0, sizeof(struct knet_host)); /* * set host_id */ host->host_id = host_id; /* * set default host->name to host_id for logging */ snprintf(host->name, KNET_MAX_HOST_LEN - 1, "%u", host_id); /* * initialize links internal data */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { host->link[link_idx].link_id = link_idx; } /* * add new host to the index */ knet_h->host_index[host_id] = host; /* * add new host to host list */ - if (!knet_h->host_head) { - knet_h->host_head = host; - knet_h->host_tail = host; - } else { - knet_h->host_tail->next = host; - knet_h->host_tail = host; + if (knet_h->host_head) { + host->next = knet_h->host_head; } + knet_h->host_head = host; _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (err < 0) { free(host); } errno = savederrno; return err; } int knet_host_remove(knet_handle_t knet_h, uint16_t host_id) { int savederrno = 0, err = 0; struct knet_host *host, *removed; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } /* * if links are configured we cannot release the host */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].configured) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s", host_id, strerror(savederrno)); goto exit_unlock; } } removed = NULL; /* * removing host from list */ if (knet_h->host_head->host_id == host_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->host_id == host_id) { removed = host->next; host->next = removed->next; break; } } } knet_h->host_index[host_id] = NULL; free(removed); _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_set_name(knet_handle_t knet_h, uint16_t host_id, const char *name) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (!name) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (strlen(name) >= KNET_MAX_HOST_LEN) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(host->name, name, KNET_MAX_HOST_LEN - 1)) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u", host->host_id); goto exit_unlock; } } snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN - 1, "%s", name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_name_by_host_id(knet_handle_t knet_h, uint16_t host_id, char *name) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { savederrno = EINVAL; err = -1; log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id); goto exit_unlock; } snprintf(name, KNET_MAX_HOST_LEN - 1, "%s", knet_h->host_index[host_id]->name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, uint16_t *host_id) { int savederrno = 0, err = 0, found = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } if (!host_id) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) { found = 1; *host_id = host->host_id; break; } } if (!found) { savederrno = ENOENT; err = -1; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_host_list(knet_handle_t knet_h, uint16_t *host_ids, size_t *host_ids_entries) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!host_ids) || (!host_ids_entries)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids)); *host_ids_entries = knet_h->host_ids_entries; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_set_policy(knet_handle_t knet_h, uint16_t host_id, uint8_t policy) { int savederrno = 0, err = 0; uint8_t old_policy; if (!knet_h) { errno = EINVAL; return -1; } if (policy > KNET_LINK_POLICY_RR) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } old_policy = knet_h->host_index[host_id]->link_handler_policy; knet_h->host_index[host_id]->link_handler_policy = policy; if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) { savederrno = errno; err = -1; knet_h->host_index[host_id]->link_handler_policy = old_policy; log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s", host_id, strerror(savederrno)); } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_policy(knet_handle_t knet_h, uint16_t host_id, uint8_t *policy) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!policy) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } *policy = knet_h->host_index[host_id]->link_handler_policy; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_status(knet_handle_t knet_h, uint16_t host_id, struct knet_host_status *status) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &host->status, sizeof(struct knet_host_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, uint16_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data; knet_h->host_status_change_notify_fn = host_status_change_notify_fn; if (knet_h->host_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen) { /* * access here is protected by calling functions */ if (knet_h->fini_in_progress) { return 0; } if (pthread_rwlock_wrlock(&knet_h->host_rwlock) != 0) { log_debug(knet_h, KNET_SUB_HOST, "Unable to get write lock"); return -1; } if (pthread_mutex_lock(&knet_h->host_mutex) != 0) { log_debug(knet_h, KNET_SUB_HOST, "Unable to get mutex lock"); pthread_rwlock_unlock(&knet_h->host_rwlock); return -1; } if (sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != datalen) { log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe"); pthread_mutex_unlock(&knet_h->host_mutex); pthread_rwlock_unlock(&knet_h->host_rwlock); return -1; } pthread_cond_wait(&knet_h->host_cond, &knet_h->host_mutex); pthread_mutex_unlock(&knet_h->host_mutex); pthread_rwlock_unlock(&knet_h->host_rwlock); return 0; } /* * check if a given packet seq num is in the circular buffers * bcast = 0 -> unicast packet | 1 -> broadcast|mcast * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup */ int _seq_num_lookup(struct knet_host *host, int bcast, seq_num_t seq_num, int defrag_buf) { size_t i, j; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = NULL; char *dst_cbuf_defrag = NULL; seq_num_t *dst_seq_num; if (bcast) { dst_cbuf = host->bcast_circular_buffer; dst_cbuf_defrag = host->bcast_circular_buffer_defrag; dst_seq_num = &host->bcast_seq_num_rx; } else { dst_cbuf = host->ucast_circular_buffer; dst_cbuf_defrag = host->ucast_circular_buffer_defrag; dst_seq_num = &host->ucast_seq_num_rx; } if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { seq_dist = *dst_seq_num - seq_num; } j = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { return (dst_cbuf[j] == 0) ? 1 : 0; } else { return (dst_cbuf_defrag[j] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; if (i > j) { memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i); memset(dst_cbuf, 0, j + 1); memset(dst_cbuf_defrag + i, 0, KNET_CBUFFER_SIZE - i); memset(dst_cbuf_defrag, 0, j + 1); } else { memset(dst_cbuf + i, 0, j - i + 1); memset(dst_cbuf_defrag + i, 0, j - i + 1); } *dst_seq_num = seq_num; return 1; } void _seq_num_set(struct knet_host *host, int bcast, seq_num_t seq_num, int defrag_buf) { if (!defrag_buf) { if (bcast) { host->bcast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->ucast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } } else { if (bcast) { host->bcast_circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->ucast_circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } } return; } int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host) { int savederrno = 0; uint16_t host_id = host->host_id; if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) { savederrno = errno; log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s", strerror(savederrno)); errno = savederrno; return -1; } return 0; } static void _clear_cbuffers(struct knet_host *host) { int i; memset(host->bcast_circular_buffer, 0, KNET_CBUFFER_SIZE); memset(host->ucast_circular_buffer, 0, KNET_CBUFFER_SIZE); host->bcast_seq_num_rx = 0; host->ucast_seq_num_rx = 0; memset(host->bcast_circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); memset(host->ucast_circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); for (i = 0; i < KNET_MAX_LINK; i++) { memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); } } int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host) { int link_idx; int best_priority = -1; int send_link_idx = 0; uint8_t send_link_status[KNET_MAX_LINK]; int clear_cbuffer = 0; int host_has_remote = 0; int reachable = 0; host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].status.enabled != 1) /* link is not enabled */ continue; if (host->link[link_idx].remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) /* track if remote is connected */ host_has_remote = 1; if (host->link[link_idx].status.connected != 1) /* link is not enabled */ continue; if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */ continue; if ((!host->link[link_idx].host_info_up_sent) && (!host->link[link_idx].donnotremoteupdate)) { send_link_status[send_link_idx] = link_idx; send_link_idx++; /* * detect node coming back to life and reset the buffers */ if (host->link[link_idx].remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) { clear_cbuffer = 1; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (host->link[link_idx].priority > best_priority) { host->active_links[0] = link_idx; best_priority = host->link[link_idx].priority; } host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ host->active_links[host->active_link_entries] = link_idx; host->active_link_entries++; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { log_debug(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)", host->host_id, host->link[host->active_links[0]].link_id, host->link[host->active_links[0]].priority); } else { log_debug(knet_h, KNET_SUB_HOST, "host: %u has %u active links", host->host_id, host->active_link_entries); } /* no active links, we can clean the circular buffers and indexes */ if ((!host->active_link_entries) || (clear_cbuffer) || (!host_has_remote)) { if (!host_has_remote) { log_debug(knet_h, KNET_SUB_HOST, "host: %u has no active remote links", host->host_id); } if (!host->active_link_entries) { log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); } if (clear_cbuffer) { log_debug(knet_h, KNET_SUB_HOST, "host: %u is coming back to life", host->host_id); } _clear_cbuffers(host); } if (send_link_idx) { int i; struct knet_hostinfo knet_hostinfo; knet_hostinfo.khi_type = KNET_HOSTINFO_TYPE_LINK_UP_DOWN; knet_hostinfo.khi_bcast = KNET_HOSTINFO_UCAST; knet_hostinfo.khi_dst_node_id = host->host_id; knet_hostinfo.khip_link_status_status = KNET_HOSTINFO_LINK_STATUS_UP; for (i=0; i < send_link_idx; i++) { knet_hostinfo.khip_link_status_link_id = send_link_status[i]; _send_host_info(knet_h, &knet_hostinfo, KNET_HOSTINFO_LINK_STATUS_SIZE); host->link[send_link_status[i]].host_info_up_sent = 1; host->link[send_link_status[i]].donnotremoteupdate = 0; } } if (host->active_link_entries) { reachable = 1; } if (host->status.reachable != reachable) { host->status.reachable = reachable; if (knet_h->host_status_change_notify_fn) { knet_h->host_status_change_notify_fn( knet_h->host_status_change_notify_fn_private_data, host->host_id, host->status.reachable, host->status.remote, host->status.external); } } return 0; } diff --git a/libknet/internals.h b/libknet/internals.h index 73f847b2..2d114704 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,447 +1,446 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __INTERNALS_H__ #define __INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include "libknet.h" #include "onwire.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned int latency_fix; /* precision */ uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ /* status */ struct knet_link_status status; /* internals */ uint8_t link_id; uint8_t transport_type; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ unsigned int remoteconnected:1; /* link is enabled for data (peer view) */ unsigned int donnotremoteupdate:1; /* define source of the update */ unsigned int host_info_up_sent:1; /* 0 if we need to notify remote that link is up */ unsigned int latency_exp; uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint8_t has_valid_mtu; }; #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { char buf[KNET_MAX_PACKET_SIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ uint16_t frag_size; /* normal frag size (not the last one) */ uint16_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ uint16_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* internals */ char bcast_circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t bcast_seq_num_rx; char ucast_circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t ucast_seq_num_tx; seq_num_t ucast_seq_num_rx; /* defrag/(reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK]; char bcast_circular_buffer_defrag[KNET_CBUFFER_SIZE]; char ucast_circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * to this fd */ void *data; /* pointer to the data */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 struct knet_handle { uint16_t host_id; unsigned int enabled:1; struct knet_sock sockfd[KNET_DATAFD_MAX]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; unsigned int pmtud_interval; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; - struct knet_host *host_tail; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_transport_ops *transport_ops[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ uint16_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf[PCKT_FRAG_MAX]; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_FRAG_MAX]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; int lock_init_done; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_rwlock_t host_rwlock; /* send_host_info lock, can switch to mutex? */ pthread_mutex_t host_mutex; /* host mutex for cond wait on pckt send, switch to mutex/sync_send ? */ pthread_cond_t host_cond; /* conditional for above */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ pthread_mutex_t tx_mutex; struct crypto_instance *crypto_instance; uint16_t sec_header_size; uint16_t sec_block_size; uint16_t sec_hash_size; uint16_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; seq_num_t bcast_seq_num_tx; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_node_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, uint16_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); int fini_in_progress; }; /* * NOTE: every single operation must be implementend * for every protocol. */ typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct mmsghdr msg); } knet_transport_ops_t; /** * This is a kernel style list implementation. * * @author Steven Dake */ struct knet_list_head { struct knet_list_head *next; struct knet_list_head *prev; }; /** * @def KNET_LIST_DECLARE() * Declare and initialize a list head. */ #define KNET_LIST_DECLARE(name) \ struct knet_list_head name = { &(name), &(name) } #define KNET_INIT_LIST_HEAD(ptr) do { \ (ptr)->next = (ptr); (ptr)->prev = (ptr); \ } while (0) /** * Initialize the list entry. * * Points next and prev pointers to head. * @param head pointer to the list head */ static inline void knet_list_init(struct knet_list_head *head) { head->next = head; head->prev = head; } /** * Add this element to the list. * * @param element the new element to insert. * @param head pointer to the list head */ static inline void knet_list_add(struct knet_list_head *element, struct knet_list_head *head) { head->next->prev = element; element->next = head->next; element->prev = head; head->next = element; } /** * Add to the list (but at the end of the list). * * @param element pointer to the element to add * @param head pointer to the list head * @see knet_list_add() */ static inline void knet_list_add_tail(struct knet_list_head *element, struct knet_list_head *head) { head->prev->next = element; element->next = head; element->prev = head->prev; head->prev = element; } /** * Delete an entry from the list. * * @param _remove the list item to remove */ static inline void knet_list_del(struct knet_list_head *_remove) { _remove->next->prev = _remove->prev; _remove->prev->next = _remove->next; } /** * Replace old entry by new one * @param old: the element to be replaced * @param new: the new element to insert */ static inline void knet_list_replace(struct knet_list_head *old, struct knet_list_head *new) { new->next = old->next; new->next->prev = new; new->prev = old->prev; new->prev->next = new; } /** * Tests whether list is the last entry in list head * @param list: the entry to test * @param head: the head of the list * @return boolean true/false */ static inline int knet_list_is_last(const struct knet_list_head *list, const struct knet_list_head *head) { return list->next == head; } /** * A quick test to see if the list is empty (pointing to it's self). * @param head pointer to the list head * @return boolean true/false */ static inline int32_t knet_list_empty(const struct knet_list_head *head) { return head->next == head; } /** * Get the struct for this entry * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_entry(ptr,type,member)\ ((type *)((char *)(ptr)-(char*)(&((type *)0)->member))) /** * Get the first element from a list * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_first_entry(ptr, type, member) \ knet_list_entry((ptr)->next, type, member) /** * Iterate over a list * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) /** * Iterate over a list backwards * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each_reverse(pos, head) \ for (pos = (head)->prev; pos != (head); pos = pos->prev) /** * Iterate over a list safe against removal of list entry * @param pos: the &struct list_head to use as a loop counter. * @param n: another &struct list_head to use as temporary storage * @param head: the head for your list. */ #define knet_list_for_each_safe(pos, n, head) \ for (pos = (head)->next, n = pos->next; pos != (head); \ pos = n, n = pos->next) /** * Iterate over list of given type * @param pos: the type * to use as a loop counter. * @param head: the head for your list. * @param member: the name of the list_struct within the struct. */ #define knet_list_for_each_entry(pos, head, member) \ for (pos = knet_list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = knet_list_entry(pos->member.next, typeof(*pos), member)) #endif