diff --git a/libknet/internals.h b/libknet/internals.h index 4f9db0fe..ce1d0f28 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,568 +1,570 @@ /* * Copyright (C) 2010-2019 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_INTERNALS_H__ #define __KNET_INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include #include "libknet.h" #include "onwire.h" #include "compat.h" #include "threads_common.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_DATABUFSIZE_COMPRESS_PAD 1024 #define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define PCKT_RX_BUFS 512 -#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX +#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1 + +#define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_mmsghdr { struct msghdr msg_hdr; /* Message header */ unsigned int msg_len; /* Number of bytes transmitted */ }; struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned long long pong_timeout_adj; /* timeout adjusted for latency */ uint8_t pong_timeout_backoff; /* see link.h for definition */ unsigned int latency_fix; /* precision */ uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ uint64_t flags; /* status */ struct knet_link_status status; /* internals */ uint8_t link_id; uint8_t transport; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ unsigned int latency_exp; uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused with stats.proto_overhead that includes also knet headers and crypto headers */ struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */ uint8_t has_valid_mtu; }; #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { char buf[KNET_DATABUFSIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ ssize_t frag_size; /* normal frag size (not the last one) */ ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ knet_node_id_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* internals */ char circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t rx_seq_num; seq_num_t untimed_rx_seq_num; seq_num_t timed_rx_seq_num; uint8_t got_data; /* defrag/reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK]; char circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * with this fd */ void *data; /* pointer to the data */ void *access_list_match_entry_head; /* pointer to access list match_entry list head */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 #define KNET_MAX_COMPRESS_METHODS UINT8_MAX struct knet_handle_stats_extra { uint64_t tx_crypt_pmtu_packets; uint64_t tx_crypt_pmtu_reply_packets; uint64_t tx_crypt_ping_packets; uint64_t tx_crypt_pong_packets; }; struct knet_handle { knet_node_id_t host_id; unsigned int enabled:1; - struct knet_sock sockfd[KNET_DATAFD_MAX]; + struct knet_sock sockfd[KNET_DATAFD_MAX + 1]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */ unsigned int pmtud_interval; unsigned int manual_mtu; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ struct knet_handle_stats stats; struct knet_handle_stats_extra stats_extra; uint32_t reconnect_int; knet_node_id_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_RX_BUFS]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; uint8_t threads_status[KNET_THREAD_MAX]; uint8_t threads_flush_queue[KNET_THREAD_MAX]; pthread_mutex_t threads_status_mutex; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */ pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */ pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */ pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */ uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */ int pmtud_waiting; int pmtud_running; int pmtud_forcerun; int pmtud_abort; struct crypto_instance *crypto_instance; size_t sec_block_size; size_t sec_hash_size; size_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; int compress_model; int compress_level; size_t compress_threshold; void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */ unsigned char *recv_from_links_buf_decompress; unsigned char *send_to_links_buf_compress; seq_num_t tx_seq_num; pthread_mutex_t tx_seq_num_mutex; uint8_t has_loop_link; uint8_t loop_link; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_node_id, int8_t *channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); int fini_in_progress; uint64_t flags; }; extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */ /* * NOTE: every single operation must be implementend * for every protocol. */ /* * for now knet supports only IP protocols (udp/sctp) * in future there might be others like ARP * or TIPC. * keep this around as transport information * to use for access lists and other operations */ #define TRANSPORT_PROTO_LOOPBACK 0 #define TRANSPORT_PROTO_IP_PROTO 1 /* * some transports like SCTP can filter incoming * connections before knet has to process * any packets. * GENERIC_ACL -> packet has to be read and filterted * PROTO_ACL -> transport provides filtering at lower levels * and packet does not need to be processed */ typedef enum { USE_NO_ACL, USE_GENERIC_ACL, USE_PROTO_ACL } transport_acl; /* * make it easier to map values in transports.c */ #define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0 #define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1 typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; const uint8_t transport_id; const uint8_t built_in; uint8_t transport_protocol; transport_acl transport_acl_type; /* * connection oriented protocols like SCTP * donĀ“t need dst_addr in sendto calls and * on some OSes are considered EINVAL. */ uint8_t transport_is_connection_oriented; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * transport callback for incoming dynamic connections * this is called in global read lock context */ int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link); /* * return the fd to use for access lists */ int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rdlock */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * transport_tx_sock_error is invoked with global_rwlock and * it's invoked when sendto or sendmmsg returns =< 0 * * it should return: * -1 on internal error * 0 ignore error and continue * 1 retry * any sleep or wait action should happen inside the transport code */ int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg); } knet_transport_ops_t; socklen_t sockaddr_len(const struct sockaddr_storage *ss); struct pretty_names { const char *name; uint8_t val; }; /** * This is a kernel style list implementation. * * @author Steven Dake */ struct knet_list_head { struct knet_list_head *next; struct knet_list_head *prev; }; /** * @def KNET_LIST_DECLARE() * Declare and initialize a list head. */ #define KNET_LIST_DECLARE(name) \ struct knet_list_head name = { &(name), &(name) } #define KNET_INIT_LIST_HEAD(ptr) do { \ (ptr)->next = (ptr); (ptr)->prev = (ptr); \ } while (0) /** * Initialize the list entry. * * Points next and prev pointers to head. * @param head pointer to the list head */ static inline void knet_list_init(struct knet_list_head *head) { head->next = head; head->prev = head; } /** * Add this element to the list. * * @param element the new element to insert. * @param head pointer to the list head */ static inline void knet_list_add(struct knet_list_head *element, struct knet_list_head *head) { head->next->prev = element; element->next = head->next; element->prev = head; head->next = element; } /** * Add to the list (but at the end of the list). * * @param element pointer to the element to add * @param head pointer to the list head * @see knet_list_add() */ static inline void knet_list_add_tail(struct knet_list_head *element, struct knet_list_head *head) { head->prev->next = element; element->next = head; element->prev = head->prev; head->prev = element; } /** * Delete an entry from the list. * * @param _remove the list item to remove */ static inline void knet_list_del(struct knet_list_head *_remove) { _remove->next->prev = _remove->prev; _remove->prev->next = _remove->next; } /** * Replace old entry by new one * @param old: the element to be replaced * @param new: the new element to insert */ static inline void knet_list_replace(struct knet_list_head *old, struct knet_list_head *new) { new->next = old->next; new->next->prev = new; new->prev = old->prev; new->prev->next = new; } /** * Tests whether list is the last entry in list head * @param list: the entry to test * @param head: the head of the list * @return boolean true/false */ static inline int knet_list_is_last(const struct knet_list_head *list, const struct knet_list_head *head) { return list->next == head; } /** * A quick test to see if the list is empty (pointing to it's self). * @param head pointer to the list head * @return boolean true/false */ static inline int32_t knet_list_empty(const struct knet_list_head *head) { return head->next == head; } /** * Get the struct for this entry * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_entry(ptr,type,member)\ ((type *)((char *)(ptr)-(char*)(&((type *)0)->member))) /** * Get the first element from a list * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_first_entry(ptr, type, member) \ knet_list_entry((ptr)->next, type, member) /** * Iterate over a list * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) /** * Iterate over a list backwards * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each_reverse(pos, head) \ for (pos = (head)->prev; pos != (head); pos = pos->prev) /** * Iterate over a list safe against removal of list entry * @param pos: the &struct list_head to use as a loop counter. * @param n: another &struct list_head to use as temporary storage * @param head: the head for your list. */ #define knet_list_for_each_safe(pos, n, head) \ for (pos = (head)->next, n = pos->next; pos != (head); \ pos = n, n = pos->next) /** * Iterate over list of given type * @param pos: the type * to use as a loop counter. * @param head: the head for your list. * @param member: the name of the list_struct within the struct. */ #define knet_list_for_each_entry(pos, head, member) \ for (pos = knet_list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = knet_list_entry(pos->member.next, typeof(*pos), member)) #endif diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 3462cf70..1d954d6e 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,782 +1,788 @@ /* * Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync) { size_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; uint8_t frag_idx; unsigned int temp_data_mtu; size_t host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; unsigned int i; int j; int send_local = 0; int data_compressed = 0; size_t uncrypted_frag_size; inbuf = knet_h->recv_from_sock_buf; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } if ((!bcast) && (dst_host_ids_entries_temp > KNET_MAX_HOST)) { log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); savederrno = EINVAL; err = -1; goto out_unlock; } } /* Send to localhost if appropriate and enabled */ if (knet_h->has_loop_link) { send_local = 0; if (bcast) { send_local = 1; } else { for (i=0; i< dst_host_ids_entries_temp; i++) { if (dst_host_ids_temp[i] == knet_h->host_id) { send_local = 1; } } } if (send_local) { const unsigned char *buf = inbuf->khp_data_userdata; ssize_t buflen = inlen; struct knet_link *local_link; local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * compress data */ if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, (const unsigned char *)inbuf->khp_data_userdata, inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); if (err < 0) { log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno)); } else { /* Collect stats */ clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (compress_time < knet_h->stats.tx_compress_time_min) { knet_h->stats.tx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.tx_compress_time_max) { knet_h->stats.tx_compress_time_max = compress_time; } knet_h->stats.tx_compress_time_ave = (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + compress_time) / (knet_h->stats.tx_compressed_packets+1); knet_h->stats.tx_compressed_packets++; knet_h->stats.tx_compressed_original_bytes += inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; if (cmp_outlen < inlen) { memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); inlen = cmp_outlen; data_compressed = 1; } } } if (knet_h->compress_model > 0 && !data_compressed) { knet_h->stats.tx_uncompressed_packets++; } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (data_compressed) { inbuf->khp_data_compress = knet_h->compress_model; } else { inbuf->khp_data_compress = 0; } if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; frag_len = frag_len - temp_data_mtu; frag_idx++; } iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; iovcnt_out = 1; } if (knet_h->crypto_instance) { struct timespec start_time; struct timespec end_time; uint64_t crypt_time; frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], (ssize_t *)&outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); if (crypt_time < knet_h->stats.tx_crypt_time_min) { knet_h->stats.tx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.tx_crypt_time_max) { knet_h->stats.tx_crypt_time_max = crypt_time; } knet_h->stats.tx_crypt_time_ave = (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + crypt_time) / (knet_h->stats.tx_crypt_packets+1); uncrypted_frag_size = 0; for (j=0; j < iovcnt_out; j++) { uncrypted_frag_size += iov_out[frag_idx][j].iov_len; } knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); knet_h->stats.tx_crypt_packets++; iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } iovcnt_out = 1; } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg->msg_iov, 1); } else { inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL); } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { - log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); - } else { - knet_h->sockfd[channel].has_error = 1; + if (channel != KNET_INTERNAL_DATA_CHANNEL) { + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { + log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", + knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); + } else { + knet_h->sockfd[channel].has_error = 1; + } } + /* + * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL + * once we add support for internal knet communication + */ } else { knet_h->recv_from_sock_buf->kh_type = type; _parse_recv_from_sock(knet_h, inlen, channel, 0); } - if (docallback) { + if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev, type; int flush, flush_queue_limit; int8_t channel; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = sizeof(struct sockaddr_storage); msg.msg_iov = &iov_in; msg.msg_iovlen = 1; knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id); for (i = 0; i < PCKT_FRAG_MAX; i++) { knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; - channel = -1; + channel = KNET_INTERNAL_DATA_CHANNEL; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } if (channel >= KNET_DATAFD_MAX) { log_debug(knet_h, KNET_SUB_TX, "No available channels"); continue; /* channel not found */ } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; }