diff --git a/libknet/compress.c b/libknet/compress.c index 3f69e9d6..80a30681 100644 --- a/libknet/compress.c +++ b/libknet/compress.c @@ -1,126 +1,137 @@ /* * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include "internals.h" #include "compress.h" #include "logging.h" #include "compress_zlib.h" #include "compress_lz4.h" /* * internal module switch data */ /* * DO NOT CHANGE ORDER HERE OR ONWIRE COMPATIBILITY * WILL BREAK! * * add after zlib and before NULL/NULL/NULL. */ compress_model_t compress_modules_cmds[] = { { "none", NULL, NULL, NULL }, { "zlib", zlib_val_level, zlib_compress, zlib_decompress }, { "lz4", lz4_val_level, lz4_compress, lz4_decompress }, { "lz4hc", lz4hc_val_level, lz4hc_compress, lz4_decompress }, { NULL, NULL, NULL, NULL }, }; static int get_model(const char *model) { int idx = 0; while (compress_modules_cmds[idx].model_name != NULL) { if (!strcmp(compress_modules_cmds[idx].model_name, model)) return idx; idx++; } return -1; } static int get_max_model(void) { int idx = 0; while (compress_modules_cmds[idx].model_name != NULL) { idx++; } return idx - 1; } static int val_level( knet_handle_t knet_h, int compress_model, int compress_level) { return compress_modules_cmds[compress_model].val_level(knet_h, compress_level); } int compress_init( knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg) { int cmp_model; knet_h->compress_max_model = get_max_model(); if (!knet_handle_compress_cfg) { return 0; } log_debug(knet_h, KNET_SUB_COMPRESS, - "Initizializing compress module [%s/%d]", - knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level); + "Initizializing compress module [%s/%d/%u]", + knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold); cmp_model = get_model(knet_handle_compress_cfg->compress_model); if (cmp_model < 0) { log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model); errno = EINVAL; return -1; } if (cmp_model > 0) { if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) { log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s", knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model); errno = EINVAL; return -1; } - + if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) { + log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).", + KNET_MAX_PACKET_SIZE); + errno = EINVAL; + return -1; + } + if (knet_handle_compress_cfg->compress_threshold == 0) { + knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD; + log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD); + } else { + knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold; + } } knet_h->compress_model = cmp_model; knet_h->compress_level = knet_handle_compress_cfg->compress_level; return 0; } int compress( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { return compress_modules_cmds[knet_h->compress_model].compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len); } int decompress( knet_handle_t knet_h, int compress_model, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { return compress_modules_cmds[compress_model].decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len); } diff --git a/libknet/internals.h b/libknet/internals.h index 129c39a3..77554802 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,481 +1,482 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __KNET_INTERNALS_H__ #define __KNET_INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include "libknet.h" #include "onwire.h" #include "compat.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_DATABUFSIZE_COMPRESS_PAD 1024 #define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define PCKT_RX_BUFS 512 #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_mmsghdr { struct msghdr msg_hdr; /* Message header */ unsigned int msg_len; /* Number of bytes transmitted */ }; struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned int latency_fix; /* precision */ uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ uint64_t flags; /* status */ struct knet_link_status status; /* internals */ uint8_t link_id; uint8_t transport_type; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ unsigned int latency_exp; uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint8_t has_valid_mtu; }; #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { char buf[KNET_DATABUFSIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ uint16_t frag_size; /* normal frag size (not the last one) */ uint16_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ knet_node_id_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* internals */ char circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t rx_seq_num; seq_num_t untimed_rx_seq_num; seq_num_t timed_rx_seq_num; uint8_t got_data; /* defrag/reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK]; char circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * to this fd */ void *data; /* pointer to the data */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 struct knet_handle { knet_node_id_t host_id; unsigned int enabled:1; struct knet_sock sockfd[KNET_DATAFD_MAX]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; unsigned int pmtud_interval; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_transport_ops *transport_ops[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ uint32_t reconnect_int; knet_node_id_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_RX_BUFS]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; int lock_init_done; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */ pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */ struct crypto_instance *crypto_instance; uint16_t sec_header_size; uint16_t sec_block_size; uint16_t sec_hash_size; uint16_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; int compress_model; int compress_max_model; int compress_level; + uint32_t compress_threshold; unsigned char *recv_from_links_buf_decompress; unsigned char *send_to_links_buf_compress; seq_num_t tx_seq_num; pthread_mutex_t tx_seq_num_mutex; uint8_t has_loop_link; uint8_t loop_link; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_node_id, int8_t *channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); int fini_in_progress; }; /* * NOTE: every single operation must be implementend * for every protocol. */ typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; const uint8_t transport_id; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * transport callback for incoming dynamic connections * this is called in global read lock context */ int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rdlock */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * transport_tx_sock_error is invoked with global_rwlock and * it's invoked when sendto or sendmmsg returns =< 0 * * it should return: * -1 on internal error * 0 ignore error and continue * 1 retry * any sleep or wait action should happen inside the transport code */ int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg); } knet_transport_ops_t; socklen_t sockaddr_len(const struct sockaddr_storage *ss); /** * This is a kernel style list implementation. * * @author Steven Dake */ struct knet_list_head { struct knet_list_head *next; struct knet_list_head *prev; }; /** * @def KNET_LIST_DECLARE() * Declare and initialize a list head. */ #define KNET_LIST_DECLARE(name) \ struct knet_list_head name = { &(name), &(name) } #define KNET_INIT_LIST_HEAD(ptr) do { \ (ptr)->next = (ptr); (ptr)->prev = (ptr); \ } while (0) /** * Initialize the list entry. * * Points next and prev pointers to head. * @param head pointer to the list head */ static inline void knet_list_init(struct knet_list_head *head) { head->next = head; head->prev = head; } /** * Add this element to the list. * * @param element the new element to insert. * @param head pointer to the list head */ static inline void knet_list_add(struct knet_list_head *element, struct knet_list_head *head) { head->next->prev = element; element->next = head->next; element->prev = head; head->next = element; } /** * Add to the list (but at the end of the list). * * @param element pointer to the element to add * @param head pointer to the list head * @see knet_list_add() */ static inline void knet_list_add_tail(struct knet_list_head *element, struct knet_list_head *head) { head->prev->next = element; element->next = head; element->prev = head->prev; head->prev = element; } /** * Delete an entry from the list. * * @param _remove the list item to remove */ static inline void knet_list_del(struct knet_list_head *_remove) { _remove->next->prev = _remove->prev; _remove->prev->next = _remove->next; } /** * Replace old entry by new one * @param old: the element to be replaced * @param new: the new element to insert */ static inline void knet_list_replace(struct knet_list_head *old, struct knet_list_head *new) { new->next = old->next; new->next->prev = new; new->prev = old->prev; new->prev->next = new; } /** * Tests whether list is the last entry in list head * @param list: the entry to test * @param head: the head of the list * @return boolean true/false */ static inline int knet_list_is_last(const struct knet_list_head *list, const struct knet_list_head *head) { return list->next == head; } /** * A quick test to see if the list is empty (pointing to it's self). * @param head pointer to the list head * @return boolean true/false */ static inline int32_t knet_list_empty(const struct knet_list_head *head) { return head->next == head; } /** * Get the struct for this entry * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_entry(ptr,type,member)\ ((type *)((char *)(ptr)-(char*)(&((type *)0)->member))) /** * Get the first element from a list * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_first_entry(ptr, type, member) \ knet_list_entry((ptr)->next, type, member) /** * Iterate over a list * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) /** * Iterate over a list backwards * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each_reverse(pos, head) \ for (pos = (head)->prev; pos != (head); pos = pos->prev) /** * Iterate over a list safe against removal of list entry * @param pos: the &struct list_head to use as a loop counter. * @param n: another &struct list_head to use as temporary storage * @param head: the head for your list. */ #define knet_list_for_each_safe(pos, n, head) \ for (pos = (head)->next, n = pos->next; pos != (head); \ pos = n, n = pos->next) /** * Iterate over list of given type * @param pos: the type * to use as a loop counter. * @param head: the head for your list. * @param member: the name of the list_struct within the struct. */ #define knet_list_for_each_entry(pos, head, member) \ for (pos = knet_list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = knet_list_entry(pos->member.next, typeof(*pos), member)) #endif diff --git a/libknet/libknet.h b/libknet/libknet.h index 11f72a76..ebde18f1 100644 --- a/libknet/libknet.h +++ b/libknet/libknet.h @@ -1,1636 +1,1647 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __LIBKNET_H__ #define __LIBKNET_H__ #include #include #include /* * libknet limits */ /* * Maximum number of hosts */ typedef uint16_t knet_node_id_t; #define KNET_MAX_HOST 65536 /* * Maximum number of links between 2 hosts */ #define KNET_MAX_LINK 8 /* * Maximum packet size that should be written to datafd * see knet_handle_new for details */ #define KNET_MAX_PACKET_SIZE 65536 /* * Buffers used for pretty logging * host is used to store both ip addresses and hostnames */ #define KNET_MAX_HOST_LEN 256 #define KNET_MAX_PORT_LEN 6 /* * Some notifications can be generated either on TX or RX */ #define KNET_NOTIFY_TX 0 #define KNET_NOTIFY_RX 1 /* * Link flags */ /* * Where possible, set traffic priority to high. * On Linux this sets the TOS to INTERACTIVE (6), * see tc-prio(8) for more infomation */ #define KNET_LINK_FLAG_TRAFFICHIPRIO (1ULL << 0) typedef struct knet_handle *knet_handle_t; /* * Handle structs/API calls */ /* * knet_handle_new * * host_id - Each host in a knet is identified with a unique * ID. when creating a new handle local host_id * must be specified (0 to UINT16T_MAX are all valid). * It is the user's responsibility to check that the value * is unique, or bad things might happen. * * log_fd - Write file descriptor. If set to a value > 0, it will be used * to write log packets (see below) from libknet to the application. * Setting to 0 will disable logging from libknet. * It is possible to enable logging at any given time (see logging API * below). * Make sure to either read from this filedescriptor properly and/or * mark it O_NONBLOCK, otherwise if the fd becomes full, libknet could * block. * * default_log_level - * If logfd is specified, it will initialize all subsystems to log * at default_log_level value. (see logging API below) * * on success, a new knet_handle_t is returned. * on failure, NULL is returned and errno is set. */ knet_handle_t knet_handle_new(knet_node_id_t host_id, int log_fd, uint8_t default_log_level); /* * knet_handle_free * * knet_h - pointer to knet_handle_t * * Destroy a knet handle, free all resources * * knet_handle_free returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_free(knet_handle_t knet_h); /* * knet_handle_enable_sock_notify * * knet_h - pointer to knet_handle_t * * sock_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * sock_notify_fn * A callback function that is invoked every time * a socket in the datafd pool will report an error (-1) * or an end of read (0) (see socket.7). * This function MUST NEVER block or add substantial delays. * The callback is invoked in an internal unlocked area * to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd * to swap/replace the bad fd. * if both err and errno are 0, it means that the socket * has received a 0 byte packet (EOF?). * The callback function must either remove the fd from knet * (by calling knet_handle_remove_fd()) or dup a new fd in its place. * Failure to do this can cause problems. * * knet_handle_enable_sock_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_enable_sock_notify(knet_handle_t knet_h, void *sock_notify_fn_private_data, void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno)); /* sorry! can't call it errno ;) */ /* * knet_handle_add_datafd * * IMPORTANT: In order to add datafd to knet, knet_handle_enable_sock_notify * _MUST_ be set and be able to handle both errors (-1) and * 0 bytes read / write from the provided datafd. * On read error (< 0) from datafd, the socket is automatically * removed from polling to avoid spinning on dead sockets. * It is safe to call knet_handle_remove_datafd even on sockets * that have been removed. * * knet_h - pointer to knet_handle_t * * *datafd - read/write file descriptor. * knet will read data here to send to the other hosts * and will write data received from the network. * Each data packet can be of max size KNET_MAX_PACKET_SIZE! * Applications using knet_send/knet_recv will receive a * proper error if the packet size is not within boundaries. * Applications using their own functions to write to the * datafd should NOT write more than KNET_MAX_PACKET_SIZE. * * Please refer to handle.c on how to set up a socketpair. * * datafd can be 0, and knet_handle_add_datafd will create a properly * populated socket pair the same way as ping_test, or a value * higher than 0. A negative number will return an error. * On exit knet_handle_free will take care to cleanup the * socketpair only if they have been created by knet_handle_add_datafd. * * It is possible to pass either sockets or normal fds. * User provided datafd will be marked as non-blocking and close-on-exit. * * *channel - This value has the same effect of VLAN tagging. * A negative value will auto-allocate a channel. * Setting a value between 0 and 31 will try to allocate that * specific channel (unless already in use). * * It is possible to add up to 32 datafds but be aware that each * one of them must have a receiving end on the other host. * * Example: * hostA channel 0 will be delivered to datafd on hostB channel 0 * hostA channel 1 to hostB channel 1. * * Each channel must have a unique file descriptor. * * If your application could have 2 channels on one host and one * channel on another host, then you can use dst_host_filter * to manipulate channel values on TX and RX. * * knet_handle_add_datafd returns: * * 0 on success * *datafd will be populated with a socket if the original value was 0 * or if a specific fd was set, the value is untouched. * *channel will be populated with a channel number if the original value * was negative or the value is untouched if a specific channel * was requested. * * -1 on error and errno is set. * *datafd and *channel are untouched or empty. */ #define KNET_DATAFD_MAX 32 int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel); /* * knet_handle_remove_datafd * * knet_h - pointer to knet_handle_t * * datafd - file descriptor to remove. * NOTE that if the socket/fd was created by knet_handle_add_datafd, * the socket will be closed by libknet. * * knet_handle_remove_datafd returns: * * 0 on success * * -1 on error and errno is set. */ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd); /* * knet_handle_get_channel * * knet_h - pointer to knet_handle_t * * datafd - get the channel associated to this datafd * * *channel - will contain the result * * knet_handle_get_channel returns: * * 0 on success * and *channel will contain the result * * -1 on error and errno is set. * and *channel content is meaningless */ int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel); /* * knet_handle_get_datafd * * knet_h - pointer to knet_handle_t * * channel - get the datafd associated to this channel * * *datafd - will contain the result * * knet_handle_get_datafd returns: * * 0 on success * and *datafd will contain the results * * -1 on error and errno is set. * and *datafd content is meaningless */ int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd); /* * knet_recv * * knet_h - pointer to knet_handle_t * * buff - pointer to buffer to store the received data * * buff_len - buffer lenght * * knet_recv is a commodity function to wrap iovec operations * around a socket. It returns a call to readv(2). */ ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel); /* * knet_send * * knet_h - pointer to knet_handle_t * * buff - pointer to the buffer of data to send * * buff_len - length of data to send * * knet_send is a commodity function to wrap iovec operations * around a socket. It returns a call to writev(2). */ ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel); /* * knet_send_sync * * knet_h - pointer to knet_handle_t * * buff - pointer to the buffer of data to send * * buff_len - length of data to send * * channel - data channel to use (see knet_handle_add_datafd) * * All knet RX/TX operations are async for performance reasons. * There are applications that might need a sync version of data * transmission and receive errors in case of failure to deliver * to another host. * knet_send_sync bypasses the whole TX async layer and delivers * data directly to the link layer, and returns errors accordingly. * knet_send_sync allows to send only one packet to one host at * a time. It does NOT support multiple destinations or multicast * packets. Decision is still based on dst_host_filter_fn. * * knet_send_sync returns 0 on success and -1 on error. * * In addition to normal sendmmsg errors, knet_send_sync can fail * due to: * * ECANCELED - data forward is disabled * EFAULT - dst_host_filter fatal error * EINVAL - dst_host_filter did not provide * dst_host_ids_entries on unicast pckts * E2BIG - dst_host_filter did return more than one * dst_host_ids_entries on unicast pckts * ENOMSG - received unknown message type * EHOSTDOWN - unicast pckt cannot be delivered because * dest host is not connected yet * ECHILD - crypto failed * EAGAIN - sendmmsg was unable to send all messages and * there was no progress during retry */ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel); /* * knet_handle_enable_filter * * knet_h - pointer to knet_handle_t * * dst_host_filter_fn_private_data * void pointer to data that can be used to identify * the callback. * * dst_host_filter_fn - * is a callback function that is invoked every time * a packet hits datafd (see knet_handle_new). * the function allows users to tell libknet where the * packet has to be delivered. * * const unsigned char *outdata - is a pointer to the * current packet * ssize_t outdata_len - lenght of the above data * uint8_t tx_rx - filter is called on tx or rx * (see defines below) * knet_node_id_t this_host_id - host_id processing the packet * knet_node_id_t src_host_id - host_id that generated the * packet * knet_node_id_t *dst_host_ids - array of KNET_MAX_HOST knet_node_id_t * where to store the destinations * size_t *dst_host_ids_entries - number of hosts to send the message * * dst_host_filter_fn should return * -1 on error, packet is discarded. * 0 packet is unicast and should be sent to dst_host_ids and there are * dst_host_ids_entries in the buffer. * 1 packet is broadcast/multicast and is sent all hosts. * contents of dst_host_ids and dst_host_ids_entries are ignored. * (see also kronosnetd/etherfilter.* for an example that filters based * on ether protocol) * * knet_handle_enable_filter returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_enable_filter(knet_handle_t knet_h, void *dst_host_filter_fn_private_data, int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries)); /* * knet_handle_setfwd * * knet_h - pointer to knet_handle_t * * enable - set to 1 to allow data forwarding, 0 to disable data forwarding. * * knet_handle_setfwd returns: * * 0 on success * -1 on error and errno is set. * * By default data forwarding is off and no traffic will pass through knet until * it is set on. */ int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled); /* * knet_handle_pmtud_setfreq * * knet_h - pointer to knet_handle_t * * interval - define the interval in seconds between PMTUd scans * range from 1 to 86400 (24h) * * knet_handle_pmtud_setfreq returns: * * 0 on success * -1 on error and errno is set. * * default interval is 60. */ #define KNET_PMTUD_DEFAULT_INTERVAL 60 int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval); /* * knet_handle_pmtud_getfreq * * knet_h - pointer to knet_handle_t * * interval - pointer where to store the current interval value * * knet_handle_pmtud_setfreq returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval); /* * knet_handle_enable_pmtud_notify * * knet_h - pointer to knet_handle_t * * pmtud_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * pmtud_notify_fn * is a callback function that is invoked every time * a path MTU size change is detected. * The function allows libknet to notify the user * of data MTU, that's the max value that can be send * onwire without fragmentation. The data MTU will always * be lower than real link MTU because it accounts for * protocol overhead, knet packet header and (if configured) * crypto overhead, * This function MUST NEVER block or add substantial delays. * * knet_handle_enable_pmtud_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)); /* * knet_handle_pmtud_get * * knet_h - pointer to knet_handle_t * * data_mtu - pointer where to store data_mtu (see above) * * knet_handle_pmtud_get returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu); /* * knet_handle_crypto * * knet_h - pointer to knet_handle_t * * knet_handle_crypto_cfg - * pointer to a knet_handle_crypto_cfg structure * * crypto_model should contain the model name. * Currently only "nss" is supported. * Setting to "none" will disable crypto. * * crypto_cipher_type * should contain the cipher algo name. * It can be set to "none" to disable * encryption. * Currently supported by "nss" model: * "3des", "aes128", "aes192" and "aes256". * * crypto_hash_type * should contain the hashing algo name. * It can be set to "none" to disable * hashing. * Currently supported by "nss" model: * "md5", "sha1", "sha256", "sha384" and "sha512". * * private_key will contain the private shared key. * It has to be at least KNET_MIN_KEY_LEN long. * * private_key_len * length of the provided private_key. * * Implementation notes/current limitations: * - enabling crypto, will increase latency as packets have * to processed. * - enabling crypto might reduce the overall throughtput * due to crypto data overhead. * - re-keying is not implemented yet. * - private/public key encryption/hashing is not currently * planned. * - crypto key must be the same for all hosts in the same * knet instance. * - it is safe to call knet_handle_crypto multiple times at runtime. * The last config will be used. * IMPORTANT: a call to knet_handle_crypto can fail due to: * 1) failure to obtain locking * 2) errors to initializing the crypto level. * This can happen even in subsequent calls to knet_handle_crypto. * A failure in crypto init, might leave your traffic unencrypted! * It's best to stop data forwarding (see above), change crypto config, * start forward again. * * knet_handle_crypto returns: * * 0 on success * -1 on error and errno is set. * -2 on crypto subsystem initialization error. No errno is provided at the moment (yet). */ #define KNET_MIN_KEY_LEN 256 #define KNET_MAX_KEY_LEN 4096 struct knet_handle_crypto_cfg { char crypto_model[16]; char crypto_cipher_type[16]; char crypto_hash_type[16]; unsigned char private_key[KNET_MAX_KEY_LEN]; unsigned int private_key_len; }; int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg); /* * knet_handle_compress * * knet_h - pointer to knet_handle_t * * knet_handle_compress_cfg - * pointer to a knet_handle_compress_cfg structure * * compress_model should contain the mode name. * Currently only "zlib" and "lz4" are supported. * Setting to "none" will disable compress. * + * compress_threshold + * tells the transmission thread to NOT compress + * any packets that are smaller than the value + * indicated. Default 100 bytes. + * Set to 0 to reset to the default. + * Set to 1 to compress everything. + * Max accepted value is KNET_MAX_PACKET_SIZE. + * * compress_level some compression libraries allows tuning of compression * parameters. * For example zlib value ranges from 0 to 9 where 0 is no * compression and 9 is max compression. * This value is passed pristine to the compression library. * zlib: 0 (no compression), 1 (minimal) .. 9 (max compression). * lz4: 1 (max compression)... 9 (fastest compression). * lz4hc: 1 (min compression) ... LZ4HC_MAX_CLEVEL (16) or LZ4HC_CLEVEL_MAX (12) * depends on the installed version of lz4hc. libknet can detects the max * value and will print an appropriate warning. * Please refere to the library man pages * on how to be set this value, as it is passed * unmodified to the compression algorithm where supported. * * Implementation notes: * - it is possible to enable/disable compression at any time. * - nodes can be using different compression algorithm at any time. * - knet does NOT implement compression algorithm directly. it relies * on external libraries for this functionality. Please read * the libraries man pages to figure out which algorithm/compression * level is best for the data you are planning to transmit. * * knet_handle_compress returns: * * 0 on success * -1 on error and errno is set. EINVAL means that either the model or the * level are not supported. */ +#define KNET_COMPRESS_THRESHOLD 100 + struct knet_handle_compress_cfg { - char compress_model[16]; - int compress_level; + char compress_model[16]; + uint32_t compress_threshold; + int compress_level; }; int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg); /* * host structs/API calls */ /* * knet_host_add * * knet_h - pointer to knet_handle_t * * host_id - each host in a knet is identified with a unique ID * (see also knet_handle_new documentation above) * * knet_host_add returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id); /* * knet_host_remove * * knet_h - pointer to knet_handle_t * * host_id - each host in a knet is identified with a unique ID * (see also knet_handle_new documentation above) * * knet_host_remove returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id); /* * knet_host_set_name * * knet_h - pointer to knet_handle_t * * host_id - see above * * name - this name will be used for pretty logging and eventually * search for hosts (see also get_name and get_id below). * Only up to KNET_MAX_HOST_LEN - 1 bytes will be accepted and * name has to be unique for each host. * * knet_host_set_name returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name); /* * knet_host_get_name_by_host_id * * knet_h - pointer to knet_handle_t * * host_id - see above * * name - pointer to a preallocated buffer of at least size KNET_MAX_HOST_LEN * where the current host name will be stored * (as set by knet_host_set_name or default by knet_host_add) * * knet_host_get_name_by_host_id returns: * * 0 on success * -1 on error and errno is set (name is left untouched) */ int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id, char *name); /* * knet_host_get_id_by_host_name * * knet_h - pointer to knet_handle_t * * name - name to lookup, max len KNET_MAX_HOST_LEN * * host_id - where to store the result * * knet_host_get_id_by_host_name returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, knet_node_id_t *host_id); /* * knet_host_get_host_list * * knet_h - pointer to knet_handle_t * * host_ids - array of at lest KNET_MAX_HOST size * * host_ids_entries - * number of entries writted in host_ids * * knet_host_get_host_list returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_host_list(knet_handle_t knet_h, knet_node_id_t *host_ids, size_t *host_ids_entries); /* * define switching policies */ #define KNET_LINK_POLICY_PASSIVE 0 #define KNET_LINK_POLICY_ACTIVE 1 #define KNET_LINK_POLICY_RR 2 /* * knet_host_set_policy * * knet_h - pointer to knet_handle_t * * host_id - see above * * policy - there are currently 3 kind of simple switching policies * as defined above, based on link configuration. * KNET_LINK_POLICY_PASSIVE - the active link with the lowest * priority will be used. * if one or more active links share * the same priority, the one with * lowest link_id will be used. * * KNET_LINK_POLICY_ACTIVE - all active links will be used * simultaneously to send traffic. * link priority is ignored. * * KNET_LINK_POLICY_RR - round-robin policy, every packet * will be send on a different active * link. * * knet_host_set_policy returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t policy); /* * knet_host_get_policy * * knet_h - pointer to knet_handle_t * * host_id - see above * * policy - will contain the current configured switching policy. * Default is passive when creating a new host. * * knet_host_get_policy returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *policy); /* * knet_host_enable_status_change_notify * * knet_h - pointer to knet_handle_t * * host_status_change_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * host_status_change_notify_fn * is a callback function that is invoked every time * there is a change in the host status. * host status is identified by: * - reachable, this host can send/receive data to/from host_id * - remote, 0 if the host_id is connected locally or 1 if * the there is one or more knet host(s) in between. * NOTE: re-switching is NOT currently implemented, * but this is ready for future and can avoid * an API/ABI breakage later on. * - external, 0 if the host_id is configured locally or 1 if * it has been added from remote nodes config. * NOTE: dynamic topology is NOT currently implemented, * but this is ready for future and can avoid * an API/ABI breakage later on. * This function MUST NEVER block or add substantial delays. * * knet_host_status_change_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)); /* * define host status structure for quick lookup * struct is in flux as more stats will be added soon * * reachable host_id can be seen either directly connected * or via another host_id * * remote 0 = node is connected locally, 1 is visible via * via another host_id * * external 0 = node is configured/known locally, * 1 host_id has been received via another host_id */ struct knet_host_status { uint8_t reachable; uint8_t remote; uint8_t external; /* add host statistics */ }; /* * knet_host_status_get * * knet_h - pointer to knet_handle_t * * status - pointer to knet_host_status struct (see above) * * knet_handle_pmtud_get returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id, struct knet_host_status *status); /* * link structs/API calls * * every host allocated/managed by knet_host_* has * KNET_MAX_LINK structures to define the network * paths that connect 2 hosts. * * Each link is identified by a link_id that has a * values between 0 and KNET_MAX_LINK - 1. * * KNOWN LIMITATIONS: * * - let's assume the scenario where two hosts are connected * with any number of links. link_id must match on both sides. * If host_id 0 link_id 0 is configured to connect IP1 to IP2 and * host_id 0 link_id 1 is configured to connect IP3 to IP4, * host_id 1 link_id 0 _must_ connect IP2 to IP1 and likewise * host_id 1 link_id 1 _must_ connect IP4 to IP3. * We might be able to lift this restriction in future, by using * other data to determine src/dst link_id, but for now, deal with it. */ /* * commodity functions to convert strings to sockaddr and viceversa */ /* * knet_strtoaddr * * host - IPaddr/hostname to convert * be aware only the first IP address will be returned * in case a hostname resolves to multiple IP * * port - port to connect to * * ss - sockaddr_storage where to store the converted data * * sslen - len of the sockaddr_storage * * knet_strtoaddr returns same error codes as getaddrinfo * */ int knet_strtoaddr(const char *host, const char *port, struct sockaddr_storage *ss, socklen_t sslen); /* * knet_addrtostr * * ss - sockaddr_storage to convert * * sslen - len of the sockaddr_storage * * host - IPaddr/hostname where to store data * (recommended size: KNET_MAX_HOST_LEN) * * port - port buffer where to store data * (recommended size: KNET_MAX_PORT_LEN) * * knet_strtoaddr returns same error codes as getnameinfo */ int knet_addrtostr(const struct sockaddr_storage *ss, socklen_t sslen, char *addr_buf, size_t addr_buf_size, char *port_buf, size_t port_buf_size); /* * knet_handle_get_transport_list * * knet_h - pointer to knet_handle_t * * transport_list - an array of struct transport_info that must be * at least of size struct transport_info * KNET_MAX_TRANSPORTS * * transport_list_entries - pointer to a size_t where to store how many transports * are available in this build of libknet. * * knet_handle_get_transport_list returns: * * 0 on success * -1 on error and errno is set. */ #define KNET_TRANSPORT_LOOPBACK 0 #define KNET_TRANSPORT_UDP 1 #define KNET_TRANSPORT_SCTP 2 #define KNET_MAX_TRANSPORTS 3 /* * The Loopback transport is only valid for connections to localhost, the host * with the same node_id specified in knet_handle_new(). Only one link of this * type is allowed. Data sent down a LOOPBACK link will be copied directly from * the knet send datafd to the knet receive datafd so the application must be set * up to take data from that socket at least as often as it is sent or deadlocks * could occur. If used, a LOOPBACK link must be the only link configured to the * local host. */ struct transport_info { const char *name; /* UDP/SCTP/etc... */ uint8_t id; /* value that can be used for link_set_config */ uint8_t properties; /* currently unused */ }; int knet_handle_get_transport_list(knet_handle_t knet_h, struct transport_info *transport_list, size_t *transport_list_entries); /* * knet_handle_get_transport_name_by_id * * knet_h - pointer to knet_handle_t * * transport - one of the above KNET_TRANSPORT_xxx constants * * knet_handle_get_transport_name_by_id returns: * * pointer to the name on success or * NULL on error and errno is set. */ const char *knet_handle_get_transport_name_by_id(knet_handle_t knet_h, uint8_t transport); /* * knet_handle_get_transport_id_by_name * * knet_h - pointer to knet_handle_t * * name - transport name (UDP/SCTP/etc) * * knet_handle_get_transport_name_by_id returns: * * KNET_MAX_TRANSPORTS on error and errno is set accordingly * KNET_TRANSPORT_xxx on success. */ uint8_t knet_handle_get_transport_id_by_name(knet_handle_t knet_h, const char *name); /* * knet_handle_set_transport_reconnect_interval * * knet_h - pointer to knet_handle_t * * msecs - milliseconds * * knet_handle_set_transport_reconnect_interval returns: * * 0 on success * -1 on error and errno is set. */ #define KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL 1000 int knet_handle_set_transport_reconnect_interval(knet_handle_t knet_h, uint32_t msecs); /* * knet_handle_get_transport_reconnect_interval * * knet_h - pointer to knet_handle_t * * msecs - milliseconds * * knet_handle_get_transport_reconnect_interval returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_get_transport_reconnect_interval(knet_handle_t knet_h, uint32_t *msecs); /* * knet_link_set_config * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * transport - one of the above KNET_TRANSPORT_xxx constants * * src_addr - sockaddr_storage that can be either IPv4 or IPv6 * * dst_addr - sockaddr_storage that can be either IPv4 or IPv6 * this can be null if we don't know the incoming * IP address/port and the link will remain quiet * till the node on the other end will initiate a * connection * * flags - KNET_LINK_FLAG_* * * knet_link_set_config returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint64_t flags); /* * knet_link_get_config * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * transport - see above * * src_addr - sockaddr_storage that can be either IPv4 or IPv6 * * dst_addr - sockaddr_storage that can be either IPv4 or IPv6 * * dynamic - 0 if dst_addr is static or 1 if dst_addr is dynamic. * In case of 1, dst_addr can be NULL and it will be left * untouched. * * flags - KNET_LINK_FLAG_* * * knet_link_get_config returns: * * 0 on success. * -1 on error and errno is set. */ int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint8_t *dynamic, uint64_t *flags); /* * knet_link_clear_config * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * knet_link_clear_config returns: * * 0 on success. * -1 on error and errno is set. */ int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id); /* * knet_link_set_enable * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * enabled - 0 disable the link, 1 enable the link * * knet_link_set_enable returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int enabled); /* * knet_link_get_enable * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * enabled - 0 disable the link, 1 enable the link * * knet_link_get_enable returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int *enabled); /* * knet_link_set_ping_timers * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * interval - specify the ping interval * * timeout - if no pong is received within this time, * the link is declared dead * * precision - how many values of latency are used to calculate * the average link latency (see also get_status below) * * knet_link_set_ping_timers returns: * * 0 on success * -1 on error and errno is set. */ #define KNET_LINK_DEFAULT_PING_INTERVAL 1000 /* 1 second */ #define KNET_LINK_DEFAULT_PING_TIMEOUT 2000 /* 2 seconds */ #define KNET_LINK_DEFAULT_PING_PRECISION 2048 /* samples */ int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, time_t interval, time_t timeout, unsigned int precision); /* * knet_link_get_ping_timers * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * interval - ping intervall * * timeout - if no pong is received within this time, * the link is declared dead * * precision - how many values of latency are used to calculate * the average link latency (see also get_status below) * * knet_link_get_ping_timers returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, time_t *interval, time_t *timeout, unsigned int *precision); /* * knet_link_set_pong_count * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * pong_count - how many valid ping/pongs before a link is marked UP. * default: 5, value should be > 0 * * knet_link_set_pong_count returns: * * 0 on success * -1 on error and errno is set. */ #define KNET_LINK_DEFAULT_PONG_COUNT 5 int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t pong_count); /* * knet_link_get_pong_count * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * pong_count - see above * * knet_link_get_pong_count returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *pong_count); /* * knet_link_set_priority * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * priority - specify the switching priority for this link * see also knet_host_set_policy * * knet_link_set_priority returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t priority); /* * knet_link_get_priority * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * priority - gather the switching priority for this link * see also knet_host_set_policy * * knet_link_get_priority returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *priority); /* * knet_link_get_link_list * * knet_h - pointer to knet_handle_t * * link_ids - array of at lest KNET_MAX_LINK size * with the list of configured links for a certain host. * * link_ids_entries - * number of entries contained in link_ids * * knet_link_get_link_list returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *link_ids, size_t *link_ids_entries); /* * define link status structure for quick lookup * * src/dst_{ipaddr,port} strings are filled by * getnameinfo(3) when configuring the link. * if the link is dynamic (see knet_link_set_config) * dst_ipaddr/port will contain ipaddr/port of the currently * connected peer or "Unknown" if it was not possible * to determine the ipaddr/port at runtime. * * enabled see also knet_link_set/get_enable. * * connected the link is connected to a peer and ping/pong traffic * is flowing. * * dynconnected the link has dynamic ip on the other end, and * we can see the other host is sending pings to us. * * latency average latency of this link * see also knet_link_set/get_timeout. * * pong_last if the link is down, this value tells us how long * ago this link was active. A value of 0 means that the link * has never been active. * * knet_link_stats structure that contains details statistics for the link */ #define MAX_LINK_EVENTS 16 struct knet_link_stats { /* onwire values */ uint64_t tx_data_packets; uint64_t rx_data_packets; uint64_t tx_data_bytes; uint64_t rx_data_bytes; uint64_t rx_ping_packets; uint64_t tx_ping_packets; uint64_t rx_ping_bytes; uint64_t tx_ping_bytes; uint64_t rx_pong_packets; uint64_t tx_pong_packets; uint64_t rx_pong_bytes; uint64_t tx_pong_bytes; uint64_t rx_pmtu_packets; uint64_t tx_pmtu_packets; uint64_t rx_pmtu_bytes; uint64_t tx_pmtu_bytes; /* Only filled in when requested */ uint64_t tx_total_packets; uint64_t rx_total_packets; uint64_t tx_total_bytes; uint64_t rx_total_bytes; uint64_t tx_total_errors; uint64_t tx_total_retries; uint32_t tx_pmtu_errors; uint32_t tx_pmtu_retries; uint32_t tx_ping_errors; uint32_t tx_ping_retries; uint32_t tx_pong_errors; uint32_t tx_pong_retries; uint32_t tx_data_errors; uint32_t tx_data_retries; /* measured in usecs */ uint32_t latency_min; uint32_t latency_max; uint32_t latency_ave; uint32_t latency_samples; /* how many times the link has been going up/down */ uint32_t down_count; uint32_t up_count; /* * circular buffer of time_t structs collecting the history * of up/down events on this link. * the index indicates current/last event. * it is safe to walk back the history by decreasing the index */ time_t last_up_times[MAX_LINK_EVENTS]; time_t last_down_times[MAX_LINK_EVENTS]; int8_t last_up_time_index; int8_t last_down_time_index; /* Always add new stats at the end */ }; struct knet_link_status { size_t size; /* For ABI checking */ char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; char dst_ipaddr[KNET_MAX_HOST_LEN]; char dst_port[KNET_MAX_PORT_LEN]; uint8_t enabled; /* link is configured and admin enabled for traffic */ uint8_t connected; /* link is connected for data (local view) */ uint8_t dynconnected; /* link has been activated by remote dynip */ unsigned long long latency; /* average latency computed by fix/exp */ struct timespec pong_last; unsigned int mtu; /* current detected MTU on this link */ unsigned int proto_overhead; /* contains the size of the IP protocol, knet headers and * crypto headers (if configured). This value is filled in * ONLY after the first PMTUd run on that given link, * and can change if link configuration or crypto configuration * changes at runtime. * WARNING: in general mtu + proto_overhead might or might * not match the output of ifconfig mtu due to crypto * requirements to pad packets to some specific boundaries. */ /* Link statistics */ struct knet_link_stats stats; }; /* * knet_link_get_status * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * status - pointer to knet_link_status struct (see above) * * struct_size - max size of knet_link_status - allows library to * add fields without ABI change. Returned structure * will be truncated to this length and .size member * indicates the full size. * * knet_link_get_status returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct knet_link_status *status, size_t struct_size); /* * logging structs/API calls */ /* * libknet is composed of several subsystems. In order * to easily distinguish log messages coming from different * places, each subsystem has its own ID. * * 0-19 config/management * 20-39 internal threads * 40-59 transports * 60-69 crypto implementations */ #define KNET_SUB_COMMON 0 /* common.c */ #define KNET_SUB_HANDLE 1 /* handle.c alloc/dealloc config changes */ #define KNET_SUB_HOST 2 /* host add/del/modify */ #define KNET_SUB_LISTENER 3 /* listeners add/del/modify... */ #define KNET_SUB_LINK 4 /* link add/del/modify */ #define KNET_SUB_TRANSPORT 5 /* Transport common */ #define KNET_SUB_CRYPTO 6 /* crypto.c config generic layer */ #define KNET_SUB_COMPRESS 7 /* compress.c config generic layer */ #define KNET_SUB_FILTER 19 /* allocated for users to log from dst_filter */ #define KNET_SUB_DSTCACHE 20 /* switching thread (destination cache handling) */ #define KNET_SUB_HEARTBEAT 21 /* heartbeat thread */ #define KNET_SUB_PMTUD 22 /* Path MTU Discovery thread */ #define KNET_SUB_TX 23 /* send to link thread */ #define KNET_SUB_RX 24 /* recv from link thread */ #define KNET_SUB_TRANSP_BASE 40 /* Base log level for transports */ #define KNET_SUB_TRANSP_LOOPBACK (KNET_SUB_TRANSP_BASE + KNET_TRANSPORT_LOOPBACK) #define KNET_SUB_TRANSP_UDP (KNET_SUB_TRANSP_BASE + KNET_TRANSPORT_UDP) #define KNET_SUB_TRANSP_SCTP (KNET_SUB_TRANSP_BASE + KNET_TRANSPORT_SCTP) #define KNET_SUB_NSSCRYPTO 60 /* nsscrypto.c */ #define KNET_SUB_ZLIBCOMP 70 /* compress_zlib.c */ #define KNET_SUB_LZ4COMP 71 /* compress_lz4.c */ #define KNET_SUB_LZ4HCCOMP 72 /* compress_lz4.c */ #define KNET_SUB_UNKNOWN 254 #define KNET_MAX_SUBSYSTEMS KNET_SUB_UNKNOWN + 1 /* * Convert between subsystem IDs and names */ /* * knet_log_get_subsystem_name * * return internal name of the subsystem or "common" */ const char *knet_log_get_subsystem_name(uint8_t subsystem); /* * knet_log_get_subsystem_id * * return internal ID of the subsystem or KNET_SUB_COMMON */ uint8_t knet_log_get_subsystem_id(const char *name); /* * 4 log levels are enough for everybody */ #define KNET_LOG_ERR 0 /* unrecoverable errors/conditions */ #define KNET_LOG_WARN 1 /* recoverable errors/conditions */ #define KNET_LOG_INFO 2 /* info, link up/down, config changes.. */ #define KNET_LOG_DEBUG 3 /* * Convert between log level values and names */ /* * knet_log_get_loglevel_name * * return internal name of the log level or "ERROR" for unknown values */ const char *knet_log_get_loglevel_name(uint8_t level); /* * knet_log_get_loglevel_id * * return internal log level ID or KNET_LOG_ERR for invalid names */ uint8_t knet_log_get_loglevel_id(const char *name); /* * every log message is composed by a text message (including a trailing \n) * and message level/subsystem IDs. * In order to make debugging easier it is possible to send those packets * straight to stdout/stderr (see knet_bench.c stdout option). */ #define KNET_MAX_LOG_MSG_SIZE 256 struct knet_log_msg { char msg[KNET_MAX_LOG_MSG_SIZE - (sizeof(uint8_t)*2)]; uint8_t subsystem; /* KNET_SUB_* */ uint8_t msglevel; /* KNET_LOG_* */ }; /* * knet_log_set_log_level * * knet_h - same as above * * subsystem - same as above * * level - same as above * * knet_log_set_loglevel allows fine control of log levels by subsystem. * See also knet_handle_new for defaults. * * knet_log_set_loglevel returns: * * 0 on success * -1 on error and errno is set. */ int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t level); /* * knet_log_get_log_level * * knet_h - same as above * * subsystem - same as above * * level - same as above * * knet_log_get_loglevel returns: * * 0 on success * -1 on error and errno is set. */ int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t *level); #endif diff --git a/libknet/tests/api_knet_handle_compress.c b/libknet/tests/api_knet_handle_compress.c index 0a6e90f7..55bac3d2 100644 --- a/libknet/tests/api_knet_handle_compress.c +++ b/libknet/tests/api_knet_handle_compress.c @@ -1,151 +1,169 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h; int logfds[2]; struct knet_handle_compress_cfg knet_handle_compress_cfg; memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); printf("Test knet_handle_compress incorrect knet_h\n"); if ((!knet_handle_compress(NULL, &knet_handle_compress_cfg)) || (errno != EINVAL)) { printf("knet_handle_compress accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_new(1, logfds[1], KNET_LOG_DEBUG); if (!knet_h) { printf("knet_handle_new failed: %s\n", strerror(errno)); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_handle_compress with invalid cfg\n"); if ((!knet_handle_compress(knet_h, NULL)) || (errno != EINVAL)) { printf("knet_handle_compress accepted invalid cfg or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_handle_compress with un-initialized cfg\n"); memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { printf("knet_handle_compress accepted invalid un-initialized cfg\n"); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_handle_compress with none compress model (disable compress)\n"); memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); strncpy(knet_handle_compress_cfg.compress_model, "none", sizeof(knet_handle_compress_cfg.compress_model) - 1); if (knet_handle_compress(knet_h, &knet_handle_compress_cfg) != 0) { printf("knet_handle_compress did not accept none compress mode cfg\n"); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_handle_compress with zlib compress and negative level\n"); memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); knet_handle_compress_cfg.compress_level = -1; if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { printf("knet_handle_compress accepted invalid (-1) compress level for zlib\n"); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_handle_compress with zlib compress and excessive compress level\n"); memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); knet_handle_compress_cfg.compress_level = 10; if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { printf("knet_handle_compress accepted invalid (10) compress level for zlib\n"); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); - printf("Test knet_handle_compress with zlib compress model normal compress level)\n"); + printf("Test knet_handle_compress with zlib compress and excessive compress threshold\n"); memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); knet_handle_compress_cfg.compress_level = 1; + knet_handle_compress_cfg.compress_threshold = KNET_MAX_PACKET_SIZE +1; + + if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { + printf("knet_handle_compress accepted invalid compress threshold\n"); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with zlib compress model normal compress level and threshold\n"); + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); + knet_handle_compress_cfg.compress_level = 1; + knet_handle_compress_cfg.compress_threshold = 64; if (knet_handle_compress(knet_h, &knet_handle_compress_cfg) != 0) { printf("knet_handle_compress did not accept zlib compress mode with compress level 1 cfg\n"); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { need_root(); test(); return PASS; } diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index d2406914..90885c54 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -1,1025 +1,1026 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "internals.h" #include "netutils.h" #include "transports.h" #include "threads_common.h" #include "test-common.h" #define MAX_NODES 128 static int senderid = -1; static int thisnodeid = -1; static knet_handle_t knet_h; static int datafd = 0; static int8_t channel = 0; static int globallistener = 0; static int continous = 0; static struct sockaddr_storage allv4; static struct sockaddr_storage allv6; static int broadcast_test = 1; static pthread_t rx_thread = (pthread_t)NULL; static char *rx_buf[PCKT_FRAG_MAX]; static int wait_for_perf_rx = 0; static char *compresscfg = NULL; static int bench_shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; #define TEST_PING 0 #define TEST_PING_AND_DATA 1 #define TEST_PERF_BY_SIZE 2 #define TEST_PERF_BY_TIME 3 static int test_type = TEST_PING; #define TEST_START 2 #define TEST_STOP 4 #define TEST_COMPLETE 6 #define ONE_GIGABYTE 1073741824 static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE; static uint64_t perf_by_time_secs = 10; struct node { int nodeid; int links; struct sockaddr_storage address[KNET_MAX_LINK]; }; static void print_help(void) { printf("knet_bench usage:\n"); printf(" -h print this help (no really)\n"); printf(" -d enable debug logs (default INFO)\n"); printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n"); printf(" Example: -c nss:aes128:sha1\n"); - printf(" -z [implementation]:[level] compress configuration. (default disabled)\n"); + printf(" -z [implementation]:[level]:[threshold] compress configuration. (default disabled)\n"); printf(" Example: -z zlib:5\n"); printf(" -p [active|passive|rr] (default: passive)\n"); printf(" -P [udp|sctp] (default: udp) protocol (transport) to use\n"); printf(" -t [nodeid] This nodeid (required)\n"); printf(" -n [nodeid],[link1_ip_addr],[link2_..] Other nodes information (at least one required)\n"); printf(" Example: -t 1,192.168.8.1,3ffe::8:1,..\n"); printf(" can be repeated up to %d and should contain also the localnode info\n", MAX_NODES); printf(" -b [port] baseport (default: 50000)\n"); printf(" -l enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n"); printf(" -o enable baseport offset per nodeid\n"); printf(" -w dont wait for all nodes to be up before starting the test (default: wait)\n"); printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n"); printf(" test type (default: ping)\n"); printf(" ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n"); printf(" ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n"); printf(" perf-by-size: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size/quantity of packets and measuring the time, then quit\n"); printf(" perf-by-time: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size of packets for a given amount of time (10 seconds)\n"); printf(" and measuring the quantity of data transmitted, then quit\n"); printf(" -s nodeid that will generate traffic for benchmarks\n"); printf(" -S [size|seconds] when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n"); printf(" when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n"); printf(" -C repeat the test continously (default: off)\n"); } static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx) { int i; char *temp = NULL; char port_str[10]; memset(port_str, 0, sizeof(port_str)); sprintf(port_str, "%d", port); for (i = 0; i < onidx; i++) { nodes[i].nodeid = atoi(strtok(nodesinfo[i], ",")); if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) { printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST); exit(FAIL); } if (thisnodeid == nodes[i].nodeid) { *thisidx = i; } while((temp = strtok(NULL, ","))) { if (nodes[i].links == KNET_MAX_LINK) { printf("Too many links configured. Max %d\n", KNET_MAX_LINK); exit(FAIL); } if (knet_strtoaddr(temp, port_str, &nodes[i].address[nodes[i].links], sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert %s to sockaddress\n", temp); exit(FAIL); } nodes[i].links++; } } if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert 0.0.0.0 to sockaddress\n"); exit(FAIL); } if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert :: to sockaddress\n"); exit(FAIL); } for (i = 1; i < onidx; i++) { if (nodes[0].links != nodes[i].links) { printf("knet_bench does not support unbalanced link configuration\n"); exit(FAIL); } } return; } static int private_data; static void sock_notify(void *pvt_data, int local_datafd, int8_t local_channel, uint8_t tx_rx, int error, int errorno) { printf("Error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd); return; } static int ping_dst_host_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *dst_channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries) { if (broadcast_test) { return 1; } if (tx_rx == KNET_NOTIFY_TX) { memmove(&dst_host_ids[0], outdata, 2); } else { dst_host_ids[0] = this_host_id; } *dst_host_ids_entries = 1; return 0; } static void setup_knet(int argc, char *argv[]) { int logfd; int rv; char *cryptocfg = NULL, *policystr = NULL, *protostr = NULL; char *othernodeinfo[MAX_NODES]; struct node nodes[MAX_NODES]; int thisidx = -1; int onidx = 0; int debug = KNET_LOG_INFO; int port = 50000, portoffset = 0; int thisport = 0, otherport = 0; int thisnewport = 0, othernewport = 0; struct sockaddr_in *so_in; struct sockaddr_in6 *so_in6; struct sockaddr_storage *src; int i, link_idx, allnodesup = 0; int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0; int protocol = KNET_TRANSPORT_UDP, protofound = 0; int wait = 1; struct knet_handle_crypto_cfg knet_handle_crypto_cfg; char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL; struct knet_handle_compress_cfg knet_handle_compress_cfg; memset(nodes, 0, sizeof(nodes)); while ((rv = getopt(argc, argv, "CT:S:s:ldowb:t:n:c:p:P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); exit(PASS); break; case 'd': debug = KNET_LOG_DEBUG; break; case 'c': if (cryptocfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } cryptocfg = optarg; break; case 'p': if (policystr) { printf("Error: -p can only be specified once\n"); exit(FAIL); } policystr = optarg; if (!strcmp(policystr, "active")) { policy = KNET_LINK_POLICY_ACTIVE; policyfound = 1; } /* * we can't use rr because clangs can't compile * an array of 3 strings, one of which is 2 bytes long */ if (!strcmp(policystr, "round-robin")) { policy = KNET_LINK_POLICY_RR; policyfound = 1; } if (!strcmp(policystr, "passive")) { policy = KNET_LINK_POLICY_PASSIVE; policyfound = 1; } if (!policyfound) { printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr); exit(FAIL); } break; case 'P': if (protostr) { printf("Error: -P can only be specified once\n"); exit(FAIL); } protostr = optarg; if (!strcmp(protostr, "udp")) { protocol = KNET_TRANSPORT_UDP; protofound = 1; } if (!strcmp(protostr, "sctp")) { protocol = KNET_TRANSPORT_SCTP; protofound = 1; } if (!protofound) { printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr); exit(FAIL); } break; case 't': if (thisnodeid >= 0) { printf("Error: -t can only be specified once\n"); exit(FAIL); } thisnodeid = atoi(optarg); if ((thisnodeid < 0) || (thisnodeid > 65536)) { printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid); exit(FAIL); } break; case 'n': if (onidx == MAX_NODES) { printf("Error: too many other nodes. Max %d\n", MAX_NODES); exit(FAIL); } othernodeinfo[onidx] = optarg; onidx++; break; case 'b': port = atoi(optarg); if ((port < 1) || (port > 65536)) { printf("Error: port %d out of range (1 - 65536)\n", port); exit(FAIL); } break; case 'o': if (globallistener) { printf("Error: -l cannot be used with -o\n"); exit(FAIL); } portoffset = 1; break; case 'l': if (portoffset) { printf("Error: -o cannot be used with -l\n"); exit(FAIL); } globallistener = 1; break; case 'w': wait = 0; break; case 's': if (senderid >= 0) { printf("Error: -s can only be specified once\n"); exit(FAIL); } senderid = atoi(optarg); if ((senderid < 0) || (senderid > 65536)) { printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid); exit(FAIL); } break; case 'T': if (!strcmp("ping", optarg)) { test_type = TEST_PING; } if (!strcmp("ping_data", optarg)) { test_type = TEST_PING_AND_DATA; } if (!strcmp("perf-by-size", optarg)) { test_type = TEST_PERF_BY_SIZE; } if (!strcmp("perf-by-time", optarg)) { test_type = TEST_PERF_BY_TIME; } break; case 'S': perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE; perf_by_time_secs = (uint64_t)atoi(optarg); break; case 'C': continous = 1; break; case 'z': if (compresscfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } compresscfg = optarg; break; default: break; } } if (thisnodeid < 0) { printf("Who am I?!? missing -t from command line?\n"); exit(FAIL); } if (onidx < 1) { printf("no other nodes configured?!? missing -n from command line\n"); exit(FAIL); } parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx); if (thisidx < 0) { printf("no config for this node found\n"); exit(FAIL); } if (senderid >= 0) { for (i=0; i < onidx; i++) { if (senderid == nodes[i].nodeid) { break; } } if (i == onidx) { printf("Unable to find senderid in nodelist\n"); exit(FAIL); } } if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) { printf("Error: performance test requires -s to be set (for now)\n"); exit(FAIL); } logfd = start_logging(stdout); knet_h = knet_handle_new(thisnodeid, logfd, debug); if (!knet_h) { printf("Unable to knet_handle_new: %s\n", strerror(errno)); exit(FAIL); } if (cryptocfg) { memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg)); cryptomodel = strtok(cryptocfg, ":"); cryptotype = strtok(NULL, ":"); cryptohash = strtok(NULL, ":"); if (cryptomodel) { strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1); } if (cryptotype) { strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1); } if (cryptohash) { strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1); } knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN; if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) { printf("Unable to init crypto\n"); exit(FAIL); } } if (compresscfg) { memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":")); knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":")); + knet_handle_compress_cfg.compress_threshold = atoi(strtok(NULL, ":")); if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) { printf("Unable to configure compress\n"); exit(FAIL); } } if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } datafd = 0; channel = -1; if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) { printf("knet_handle_add_datafd failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } if (knet_handle_pmtud_setfreq(knet_h, 60) < 0) { printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_host_add(knet_h, nodes[i].nodeid) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) { printf("knet_host_set_policy failed: %s\n", strerror(errno)); exit(FAIL); } for (link_idx = 0; link_idx < nodes[i].links; link_idx++) { if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in->sin_port); thisnewport = thisport + nodes[i].nodeid; so_in->sin_port = (htons(thisnewport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; otherport = ntohs(so_in->sin_port); othernewport = otherport + nodes[thisidx].nodeid; so_in->sin_port = (htons(othernewport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in6->sin6_port); thisnewport = thisport + nodes[i].nodeid; so_in6->sin6_port = (htons(thisnewport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; otherport = ntohs(so_in6->sin6_port); othernewport = otherport + nodes[thisidx].nodeid; so_in6->sin6_port = (htons(othernewport)); } } if (!globallistener) { src = &nodes[thisidx].address[link_idx]; } else { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { src = &allv4; } else { src = &allv6; } } if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx, protocol, src, &nodes[i].address[link_idx], 0) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); exit(FAIL); } if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; so_in->sin_port = (htons(thisport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; so_in->sin_port = (htons(otherport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; so_in6->sin6_port = (htons(thisport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; so_in6->sin6_port = (htons(otherport)); } } if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) { printf("knet_link_set_ping_timers failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) { printf("knet_link_set_pong_count failed: %s\n", strerror(errno)); exit(FAIL); } } } if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); exit(FAIL); } if (wait) { while(!allnodesup) { allnodesup = 1; for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if(knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) { printf("waiting host %d to be reachable\n", nodes[i].nodeid); allnodesup = 0; } } if (!allnodesup) { sleep(1); } } sleep(1); } } static void *_rx_thread(void *args) { int rx_epoll; struct epoll_event ev; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, msg_recv; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; uint64_t rx_pkts = 0; uint64_t rx_bytes = 0; unsigned int current_pckt_size = 0; for (i = 0; i < PCKT_FRAG_MAX; i++) { rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!rx_buf[i]) { printf("RXT: Unable to malloc!\n"); return NULL; } memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_in[i].iov_base = (void *)rx_buf[i]; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (rx_epoll < 0) { printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n"); return NULL; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = datafd; if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) { printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n"); return NULL; } memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (!bench_shutdown_in_progress) { if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) { msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg_recv < 0) { printf("RXT: error from recvmmsg: %s\n", strerror(errno)); } switch(test_type) { case TEST_PING_AND_DATA: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len == 0) { printf("RXT: received 0 bytes message?\n"); } printf("received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base); } break; case TEST_PERF_BY_TIME: case TEST_PERF_BY_SIZE: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len < 64) { if (msg[i].msg_len == 0) { printf("RXT: received 0 bytes message?\n"); } if (msg[i].msg_len == TEST_START) { if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { printf("Unable to get start time!\n"); } } if (msg[i].msg_len == TEST_STOP) { double average_rx_mbytes; double average_rx_pkts; double time_diff_sec; if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { printf("Unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); /* * adjust for sleep(2) between sending the last data and TEST_STOP */ time_diff = time_diff - 2000000000llu; /* * convert to seconds */ time_diff_sec = (double)time_diff / 1000000000llu; average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024)); average_rx_pkts = (double)(rx_pkts / time_diff_sec); printf("Execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n", time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts); rx_pkts = 0; rx_bytes = 0; current_pckt_size = 0; } if (msg[i].msg_len == TEST_COMPLETE) { wait_for_perf_rx = 1; } continue; } rx_pkts++; rx_bytes = rx_bytes + msg[i].msg_len; current_pckt_size = msg[i].msg_len; } break; } } } epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev); close(rx_epoll); return NULL; } static void setup_data_txrx_common(void) { if (!rx_thread) { if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } printf("Setting up rx thread\n"); if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) { printf("Unable to start rx thread\n"); exit(FAIL); } } } static void stop_rx_thread(void) { void *retval; int i; if (rx_thread) { printf("Shutting down rx thread\n"); sleep(2); pthread_cancel(rx_thread); pthread_join(rx_thread, &retval); for (i = 0; i < PCKT_FRAG_MAX; i ++) { free(rx_buf[i]); } } } static void send_ping_data(void) { char buf[65535]; ssize_t len; memset(&buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "Hello world!"); if (compresscfg) { len = sizeof(buf); } else { len = strlen(buf); } if (knet_send(knet_h, buf, len, channel) != len) { printf("Error sending hello world: %s\n", strerror(errno)); } sleep(1); } static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send) { int sent_msgs, prev_sent, progress, total_sent; total_sent = 0; sent_msgs = 0; prev_sent = 0; progress = 1; retry: errno = 0; sent_msgs = _sendmmsg(datafd, &msg[0], msgs_to_send, MSG_NOSIGNAL); if (sent_msgs < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { usleep(KNET_THREADS_TIMERES / 16); goto retry; } printf("Unable to send messages: %s\n", strerror(errno)); return -1; } total_sent = total_sent + sent_msgs; if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) { if ((sent_msgs) || (progress)) { msgs_to_send = msgs_to_send - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } goto retry; } if (!progress) { printf("Unable to send more messages after retry\n"); } } return total_sent; } static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[]) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!tx_buf[i]) { printf("TXT: Unable to malloc!\n"); return -1; } memset(tx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_out[i].iov_base = (void *)tx_buf[i]; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_iov = &iov_out[i]; msg[i].msg_hdr.msg_iovlen = 1; } return 0; } static void send_perf_data_by_size(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; int i; uint64_t total_pkts_to_tx; uint64_t packets_to_send; uint32_t packetsize = 64; setup_send_buffers_common(msg, iov_out, tx_buf); while (packetsize <= KNET_MAX_PACKET_SIZE) { for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } total_pkts_to_tx = perf_by_size_size / packetsize; printf("Testing with %u packet size. Total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); while (total_pkts_to_tx > 0) { if (total_pkts_to_tx >= PCKT_FRAG_MAX) { packets_to_send = PCKT_FRAG_MAX; } else { packets_to_send = total_pkts_to_tx; } sent_msgs = send_messages(&msg[0], packets_to_send); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } total_pkts_to_tx = total_pkts_to_tx - sent_msgs; } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); if (packetsize == KNET_MAX_PACKET_SIZE) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } static void send_perf_data_by_time(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; int i; uint32_t packetsize = 65536; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; setup_send_buffers_common(msg, iov_out, tx_buf); memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (packetsize <= KNET_MAX_PACKET_SIZE) { for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } printf("Testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { printf("Unable to get start time!\n"); } time_diff = 0; while (time_diff < (perf_by_time_secs * 1000000000llu)) { sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { printf("Unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); if (packetsize == KNET_MAX_PACKET_SIZE) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } static void cleanup_all(void) { if (pthread_mutex_lock(&shutdown_mutex)) { return; } if (bench_shutdown_in_progress) { pthread_mutex_unlock(&shutdown_mutex); return; } bench_shutdown_in_progress = 1; pthread_mutex_unlock(&shutdown_mutex); if (rx_thread) { stop_rx_thread(); } knet_handle_stop(knet_h); } static void sigint_handler(int signum) { printf("Cleaning up... got signal: %d\n", signum); cleanup_all(); exit(PASS); } int main(int argc, char *argv[]) { if (signal(SIGINT, sigint_handler) == SIG_ERR) { printf("Unable to configure SIGINT handler\n"); exit(FAIL); } need_root(); setup_knet(argc, argv); setup_data_txrx_common(); sleep(5); restart: switch(test_type) { default: case TEST_PING: /* basic ping, no data */ sleep(5); break; case TEST_PING_AND_DATA: send_ping_data(); break; case TEST_PERF_BY_SIZE: if (senderid == thisnodeid) { send_perf_data_by_size(); } else { printf("Waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; case TEST_PERF_BY_TIME: if (senderid == thisnodeid) { send_perf_data_by_time(); } else { printf("Waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; } if (continous) { goto restart; } cleanup_all(); return PASS; } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 41724d4b..2f8b84cc 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,679 +1,679 @@ /* * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { sent_msgs = 0; prev_sent = 0; progress = 1; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport_type == KNET_TRANSPORT_LOOPBACK) { continue; } msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; for (i=0; istatus.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, ssize_t inlen, int8_t channel, int is_sync) { ssize_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; uint8_t frag_idx; unsigned int temp_data_mtu; size_t host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; unsigned int i; int send_local = 0; int data_compressed = 0; inbuf = knet_h->recv_from_sock_buf; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } } /* Send to localhost if appropriate and enabled */ if (knet_h->has_loop_link) { send_local = 0; if (bcast) { send_local = 1; } else { for (i=0; i< dst_host_ids_entries_temp; i++) { if (dst_host_ids_temp[i] == knet_h->host_id) { send_local = 1; } } } if (send_local) { const unsigned char *buf = inbuf->khp_data_userdata; ssize_t buflen = inlen; struct knet_link *local_link; local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %ld\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; usleep(KNET_THREADS_TIMERES / 16); goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming mininum IPv4 mtu (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * compress data */ - if (knet_h->compress_model > 0) { + if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { ssize_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; err = compress(knet_h, (const unsigned char *)inbuf->khp_data_userdata, inlen, knet_h->send_to_links_buf_compress, &cmp_outlen); if (err < 0) { log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno)); } else { if (cmp_outlen < inlen) { memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); inlen = cmp_outlen; data_compressed = 1; } } } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (data_compressed) { inbuf->khp_data_compress = knet_h->compress_model; } else { inbuf->khp_data_compress = 0; } if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; frag_len = frag_len - temp_data_mtu; frag_idx++; } iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; iovcnt_out = 1; } if (knet_h->crypto_instance) { frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], &outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } iovcnt_out = 1; } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg->msg_iov, 1); } else { inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL); } if (inlen == 0) { savederrno = 0; docallback = 1; goto out; } if (inlen < 0) { savederrno = errno; docallback = 1; goto out; } knet_h->recv_from_sock_buf->kh_type = type; _parse_recv_from_sock(knet_h, inlen, channel, 0); out: if (inlen < 0) { struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev, type; int8_t channel; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = sizeof(struct sockaddr_storage); msg.msg_iov = &iov_in; msg.msg_iovlen = 1; knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id); for (i = 0; i < PCKT_FRAG_MAX; i++) { knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = -1; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; }