diff --git a/libknet/internals.h b/libknet/internals.h index 2ff9891a..cb1be075 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,421 +1,419 @@ /* * Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_INTERNALS_H__ #define __KNET_INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include #include #include #include "libknet.h" #include "onwire.h" #include "compat.h" #include "threads_common.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_DATABUFSIZE_COMPRESS_PAD 1024 #define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define PCKT_RX_BUFS 512 #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1 #define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX /* * Size of threads stack. Value is choosen by experimenting, how much is needed * to sucesfully finish test suite, and at the time of writing patch it was * ~300KiB. To have some room for future enhancement it is increased * by factor of 3 and rounded. */ #define KNET_THREAD_STACK_SIZE (1024 * 1024) typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_mmsghdr { struct msghdr msg_hdr; /* Message header */ unsigned int msg_len; /* Number of bytes transmitted */ }; struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned long long pong_timeout_adj; /* timeout adjusted for latency */ uint8_t pong_timeout_backoff; /* see link.h for definition */ unsigned int latency_max_samples; /* precision */ uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ uint64_t flags; /* status */ struct knet_link_status status; /* internals */ pthread_mutex_t link_stats_mutex; /* used to update link stats */ uint8_t link_id; uint8_t transport; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused with stats.proto_overhead that includes also knet headers and crypto headers */ struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */ uint8_t has_valid_mtu; }; #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { char buf[KNET_DATABUFSIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ ssize_t frag_size; /* normal frag size (not the last one) */ ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ knet_node_id_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* internals */ char circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t rx_seq_num; seq_num_t untimed_rx_seq_num; seq_num_t timed_rx_seq_num; uint8_t got_data; /* defrag/reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK]; char circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * with this fd */ void *data; /* pointer to the data */ void *access_list_match_entry_head; /* pointer to access list match_entry list head */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 #define KNET_MAX_COMPRESS_METHODS UINT8_MAX struct knet_handle_stats_extra { uint64_t tx_crypt_pmtu_packets; uint64_t tx_crypt_pmtu_reply_packets; uint64_t tx_crypt_ping_packets; uint64_t tx_crypt_pong_packets; }; struct knet_handle { knet_node_id_t host_id; unsigned int enabled:1; struct knet_sock sockfd[KNET_DATAFD_MAX + 1]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */ unsigned int pmtud_interval; unsigned int manual_mtu; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ struct knet_handle_stats stats; struct knet_handle_stats_extra stats_extra; pthread_mutex_t handle_stats_mutex; /* used to protect handle stats */ uint32_t reconnect_int; knet_node_id_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_RX_BUFS]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; uint8_t threads_status[KNET_THREAD_MAX]; uint8_t threads_flush_queue[KNET_THREAD_MAX]; useconds_t threads_timer_res; pthread_mutex_t threads_status_mutex; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */ pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */ pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */ pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */ uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */ int pmtud_waiting; int pmtud_running; int pmtud_forcerun; int pmtud_abort; struct crypto_instance *crypto_instance; size_t sec_block_size; size_t sec_hash_size; size_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; int compress_model; int compress_level; size_t compress_threshold; void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */ unsigned char *recv_from_links_buf_decompress; unsigned char *send_to_links_buf_compress; seq_num_t tx_seq_num; pthread_mutex_t tx_seq_num_mutex; uint8_t has_loop_link; uint8_t loop_link; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_node_id, int8_t *channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *link_status_change_notify_fn_private_data; void (*link_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t link_id, uint8_t connected, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); int fini_in_progress; uint64_t flags; }; extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */ /* * NOTE: every single operation must be implementend * for every protocol. */ /* * for now knet supports only IP protocols (udp/sctp) * in future there might be others like ARP * or TIPC. * keep this around as transport information * to use for access lists and other operations */ #define TRANSPORT_PROTO_LOOPBACK 0 #define TRANSPORT_PROTO_IP_PROTO 1 /* * some transports like SCTP can filter incoming * connections before knet has to process * any packets. * GENERIC_ACL -> packet has to be read and filterted * PROTO_ACL -> transport provides filtering at lower levels * and packet does not need to be processed */ typedef enum { USE_NO_ACL, USE_GENERIC_ACL, USE_PROTO_ACL } transport_acl; /* * make it easier to map values in transports.c */ #define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0 #define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1 typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; const uint8_t transport_id; const uint8_t built_in; uint8_t transport_protocol; transport_acl transport_acl_type; /* * connection oriented protocols like SCTP * donĀ“t need dst_addr in sendto calls and * on some OSes are considered EINVAL. */ uint8_t transport_is_connection_oriented; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * transport callback for incoming dynamic connections * this is called in global read lock context */ int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link); /* * return the fd to use for access lists */ int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rdlock */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * transport_tx_sock_error is invoked with global_rwlock and * it's invoked when sendto or sendmmsg returns =< 0 * * it should return: * -1 on internal error * 0 ignore error and continue * 1 retry * any sleep or wait action should happen inside the transport code */ int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg); } knet_transport_ops_t; -socklen_t sockaddr_len(const struct sockaddr_storage *ss); - struct pretty_names { const char *name; uint8_t val; }; #endif diff --git a/libknet/netutils.h b/libknet/netutils.h index e668ada6..ee10b2b1 100644 --- a/libknet/netutils.h +++ b/libknet/netutils.h @@ -1,19 +1,19 @@ /* * Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_NETUTILS_H__ #define __KNET_NETUTILS_H__ #include int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1, const struct sockaddr_storage *ss2, socklen_t sslen2); int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src); -socklen_t knet_sockaddr_len(const struct sockaddr_storage *ss); +socklen_t sockaddr_len(const struct sockaddr_storage *ss); #endif diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c index dba48952..eadffa93 100644 --- a/libknet/transport_sctp.c +++ b/libknet/transport_sctp.c @@ -1,1581 +1,1582 @@ /* * Copyright (C) 2016-2020 Red Hat, Inc. All rights reserved. * * Author: Christine Caulfield * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include "compat.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "links_acl_ip.h" #include "logging.h" +#include "netutils.h" #include "common.h" #include "transport_common.h" #include "transports.h" #include "threads_common.h" #ifdef HAVE_NETINET_SCTP_H #include #include "transport_sctp.h" typedef struct sctp_handle_info { struct qb_list_head listen_links_list; struct qb_list_head connect_links_list; int connect_epollfd; int connectsockfd[2]; int listen_epollfd; int listensockfd[2]; pthread_t connect_thread; pthread_t listen_thread; socklen_t event_subscribe_kernel_size; char *event_subscribe_buffer; } sctp_handle_info_t; /* * use by fd_tracker data type */ #define SCTP_NO_LINK_INFO 0 #define SCTP_LISTENER_LINK_INFO 1 #define SCTP_ACCEPTED_LINK_INFO 2 #define SCTP_CONNECT_LINK_INFO 3 /* * this value is per listener */ #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_listen_link_info { struct qb_list_head list; int listen_sock; int accepted_socks[MAX_ACCEPTED_SOCKS]; struct sockaddr_storage src_address; int on_listener_epoll; int on_rx_epoll; int sock_shutdown; } sctp_listen_link_info_t; typedef struct sctp_accepted_link_info { char mread_buf[KNET_DATABUFSIZE]; ssize_t mread_len; sctp_listen_link_info_t *link_info; } sctp_accepted_link_info_t ; typedef struct sctp_connect_link_info { struct qb_list_head list; sctp_listen_link_info_t *listener; struct knet_link *link; struct sockaddr_storage dst_address; int connect_sock; int on_rx_epoll; int close_sock; int sock_shutdown; } sctp_connect_link_info_t; /* * socket handling functions * * those functions do NOT perform locking. locking * should be handled in the right context from callers */ /* * sockets are removed from rx_epoll from callers * see also error handling functions */ static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; struct epoll_event ev; sctp_connect_link_info_t *info = kn_link->transport_link; if (info->connect_sock != -1) { if (info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_rx_epoll = 0; } if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); } else { close(info->connect_sock); info->connect_sock = -1; } } exit_error: errno = savederrno; return err; } static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type) { int err = 0, savederrno = 0; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, handle_info->event_subscribe_buffer, handle_info->event_subscribe_kernel_size) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s", type, strerror(savederrno)); } errno = savederrno; return err; } static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type) { int err = 0, savederrno = 0; int value; int level; #ifdef SOL_SCTP level = SOL_SCTP; #else level = IPPROTO_SCTP; #endif if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) { savederrno = errno; err = -1; goto exit_error; } value = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set reuseaddr on socket %d: %s", sock, strerror(savederrno)); goto exit_error; } value = 1; if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s", strerror(savederrno)); goto exit_error; } if (_enable_sctp_notifications(knet_h, sock, type) < 0) { savederrno = errno; err = -1; } exit_error: errno = savederrno; return err; } static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info = kn_link->transport_link; if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) { savederrno = errno; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP socket %d received error: %s", info->connect_sock, strerror(savederrno)); if ((savederrno != EALREADY) && (savederrno != EINPROGRESS) && (savederrno != EISCONN)) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s", info->connect_sock, strerror(savederrno)); } } errno = savederrno; return err; } static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; struct epoll_event ev; sctp_connect_link_info_t *info = kn_link->transport_link; int connect_sock; struct sockaddr_storage connect_addr; connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (connect_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) { savederrno = errno; err = -1; goto exit_error; } memset(&connect_addr, 0, sizeof(struct sockaddr_storage)); if (knet_strtoaddr(kn_link->status.src_ipaddr, "0", &connect_addr, sockaddr_len(&connect_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to resolve connecting socket: %s", strerror(savederrno)); goto exit_error; } if (bind(connect_sock, (struct sockaddr *)&connect_addr, sockaddr_len(&connect_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind connecting socket: %s", strerror(savederrno)); goto exit_error; } if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 1; info->connect_sock = connect_sock; info->close_sock = 0; kn_link->outsock = info->connect_sock; if (_reconnect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; goto exit_error; } exit_error: if (err) { if (connect_sock >= 0) { close(connect_sock); } } errno = savederrno; return err; } static void _lock_sleep_relock(knet_handle_t knet_h) { int i = 0; /* Don't hold onto the lock while sleeping */ pthread_rwlock_unlock(&knet_h->global_rwlock); while (i < 5) { usleep(knet_h->threads_timer_res / 16); if (!pthread_rwlock_rdlock(&knet_h->global_rwlock)) { /* * lock acquired, we can go out */ return; } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get read lock!"); i++; } } /* * time to crash! if we cannot re-acquire the lock * there is no easy way out of this one */ assert(0); } int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_listen_link_info_t *listen_info; if (recv_err < 0) { switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) { case SCTP_CONNECT_LINK_INFO: if (connect_info->link->transport_connected == 0) { return -1; } break; case SCTP_ACCEPTED_LINK_INFO: listen_info = accepted_info->link_info; if (listen_info->listen_sock != sockfd) { if (listen_info->on_rx_epoll == 0) { return -1; } } break; } if (recv_errno == EAGAIN) { #ifdef DEBUG log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd); #endif _lock_sleep_relock(knet_h); return 1; } return -1; } return 0; } /* * socket error management functions * * both called with global read lock. * * NOTE: we need to remove the fd from the epoll as soon as possible * even before we notify the respective thread to take care of it * because scheduling can make it so that this thread will overload * and the threads supposed to take care of the error will never * be able to take action. * we CANNOT handle FDs here directly (close/reconnect/etc) due * to locking context. We need to delegate that to their respective * management threads within the global write lock. * * this function is called from: * - RX thread with recv_err <= 0 directly on recvmmsg error * - transport_rx_is_data when msg_len == 0 (recv_err = 1) * - transport_rx_is_data on notification (recv_err = 2) * * basically this small abuse of recv_err is to detect notifications * generated by sockets created by listen(). */ int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { struct epoll_event ev; sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_listen_link_info_t *listen_info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) { case SCTP_CONNECT_LINK_INFO: /* * all connect link have notifications enabled * and we accept only data from notification and * generic recvmmsg errors. * * Errors generated by msg_len 0 can be ignored because * they follow a notification (double notification) */ if (recv_err != 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd); if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno)); } } break; case SCTP_ACCEPTED_LINK_INFO: listen_info = accepted_info->link_info; if (listen_info->listen_sock != sockfd) { if (recv_err != 1) { if (listen_info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); return -1; } listen_info->on_rx_epoll = 0; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd); if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno)); } } } else { /* * this means the listen() socket has generated * a notification. now what? :-) */ log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd); } break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd); break; } /* * Under RX pressure we need to give time to IPC to pick up the message */ _lock_sleep_relock(knet_h); return 0; } /* * NOTE: sctp_transport_rx_is_data is called with global rdlock * delegate any FD error management to sctp_transport_rx_sock_error * and keep this code to parsing incoming data only */ int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { size_t i; struct iovec *iov = msg->msg_hdr.msg_iov; size_t iovlen = msg->msg_hdr.msg_iovlen; struct sctp_assoc_change *sac; union sctp_notification *snp; sctp_accepted_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data; if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) { if (msg->msg_len == 0) { /* * NOTE: with event notification enabled, we receive error twice: * 1) from the event notification * 2) followed by a 0 byte msg_len * * the event handler should take care to avoid #2 by stopping * the rx thread from processing more packets than necessary. */ if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { if (connect_info->sock_shutdown) { return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE; } } else { if (listen_info->link_info->sock_shutdown) { return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE; } } /* * this is pretty much dead code and we should never hit it. * keep it for safety and avoid the rx thread to process * bad info / data. */ return KNET_TRANSPORT_RX_NOT_DATA_STOP; } /* * missing MSG_EOR has to be treated as a short read * from the socket and we need to fill in the mread buf * while we wait for MSG_EOR */ if (!(msg->msg_hdr.msg_flags & MSG_EOR)) { /* * copy the incoming data into mread_buf + mread_len (incremental) * and increase mread_len */ memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len); listen_info->mread_len = listen_info->mread_len + msg->msg_len; return KNET_TRANSPORT_RX_NOT_DATA_CONTINUE; } /* * got EOR. * if mread_len is > 0 we are completing a packet from short reads * complete reassembling the packet in mread_buf, copy it back in the iov * and set the iov/msg len numbers (size) correctly */ if (listen_info->mread_len) { /* * add last fragment to mread_buf */ memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len); listen_info->mread_len = listen_info->mread_len + msg->msg_len; /* * move all back into the iovec */ memmove(iov->iov_base, listen_info->mread_buf, listen_info->mread_len); msg->msg_len = listen_info->mread_len; listen_info->mread_len = 0; } return KNET_TRANSPORT_RX_IS_DATA; } if (!(msg->msg_hdr.msg_flags & MSG_EOR)) { return KNET_TRANSPORT_RX_NOT_DATA_STOP; } for (i = 0; i < iovlen; i++) { snp = iov[i].iov_base; switch (snp->sn_header.sn_type) { case SCTP_ASSOC_CHANGE: sac = &snp->sn_assoc_change; switch (sac->sac_state) { case SCTP_COMM_LOST: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_lost", sockfd); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); return KNET_TRANSPORT_RX_OOB_DATA_STOP; break; case SCTP_COMM_UP: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_up", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->link->transport_connected = 1; } break; case SCTP_RESTART: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: restart", sockfd); break; case SCTP_SHUTDOWN_COMP: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: shutdown comp", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->close_sock = 1; } sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); return KNET_TRANSPORT_RX_OOB_DATA_STOP; break; case SCTP_CANT_STR_ASSOC: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: cant str assoc", sockfd); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: unknown %d", sockfd, sac->sac_state); break; } break; case SCTP_SHUTDOWN_EVENT: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event socket %d", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->link->transport_connected = 0; connect_info->sock_shutdown = 1; } else { listen_info->link_info->sock_shutdown = 1; } break; case SCTP_SEND_FAILED: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed socket: %d", sockfd); break; case SCTP_PEER_ADDR_CHANGE: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change socket %d", sockfd); break; case SCTP_REMOTE_ERROR: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error socket %d", sockfd); break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event socket: %d type: %hu", sockfd, snp->sn_header.sn_type); break; } } return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE; } /* * connect / outgoing socket management thread */ /* * _handle_connected_sctp* are called with a global write lock * from the connect_thread */ static void _handle_connected_sctp_socket(knet_handle_t knet_h, int connect_sock) { int err; unsigned int status, len = sizeof(status); sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data; struct knet_link *kn_link = info->link; if (info->close_sock) { if (_close_connect_socket(knet_h, kn_link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp_socket: %s", connect_sock, strerror(errno)); return; } info->close_sock = 0; if (_create_connect_socket(knet_h, kn_link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno)); return; } } _reconnect_socket(knet_h, info->link); err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len); if (err) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s", connect_sock, strerror(errno)); return; } if (status) { log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s", connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port, strerror(status)); /* * No need to create a new socket if connect failed, * just retry connect */ return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s", connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port); } static void _handle_connected_sctp_notifications(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error"); return; } /* * revalidate sockfd */ if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) { return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd); _handle_connected_sctp_socket(knet_h, sockfd); } static void *_sctp_connect_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED); while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s", strerror(errno)); continue; } /* * Sort out which FD has a connection */ savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * minor optimization: deduplicate events * * in some cases we can receive multiple notifcations * of the same FD having issues or need handling. * It's enough to process it once even tho it's safe * to handle them multiple times. */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->connectsockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket"); _handle_connected_sctp_notifications(knet_h); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification on connected sockfd %d\n", events[i].data.fd); } } pthread_rwlock_unlock(&knet_h->global_rwlock); /* * this thread can generate events for itself. * we need to sleep in between loops to allow other threads * to be scheduled */ usleep(knet_h->reconnect_int * 1000); } set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STOPPED); return NULL; } /* * listen/incoming connections management thread */ /* * Listener received a new connection * called with a write lock from main thread */ static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock) { int err = 0, savederrno = 0; int new_fd; int i = -1; sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data; struct epoll_event ev; struct sockaddr_storage ss; socklen_t sock_len = sizeof(ss); char addr_str[KNET_MAX_HOST_LEN]; char port_str[KNET_MAX_PORT_LEN]; sctp_accepted_link_info_t *accept_info = NULL; new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len); if (new_fd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno)); goto exit_error; } if (knet_addrtostr(&ss, sizeof(ss), addr_str, KNET_MAX_HOST_LEN, port_str, KNET_MAX_PORT_LEN) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info"); goto exit_error; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s", addr_str, port_str); if (knet_h->use_access_lists) { if (!check_validate(knet_h, listen_sock, KNET_TRANSPORT_SCTP, &ss)) { savederrno = EINVAL; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Connection rejected from %s/%s", addr_str, port_str); close(new_fd); errno = savederrno; return; } } /* * Keep a track of all accepted FDs */ for (i=0; iaccepted_socks[i] == -1) { info->accepted_socks[i] = new_fd; break; } } if (i == MAX_ACCEPTED_SOCKS) { errno = EBUSY; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!"); goto exit_error; } if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */ savederrno = errno; err = -1; goto exit_error; } if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) { savederrno = errno; err = -1; goto exit_error; } accept_info = malloc(sizeof(sctp_accepted_link_info_t)); if (!accept_info) { savederrno = errno; err = -1; goto exit_error; } memset(accept_info, 0, sizeof(sctp_accepted_link_info_t)); accept_info->link_info = info; if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, accept_info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(errno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = new_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s", new_fd, strerror(errno)); goto exit_error; } info->on_rx_epoll = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d", new_fd, addr_str, port_str, info->listen_sock, i); exit_error: if (err) { if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) { info->accepted_socks[i] = -1; } _set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); free(accept_info); if (new_fd >= 0) { close(new_fd); } } errno = savederrno; return; } /* * Listen thread received a notification of a bad socket that needs closing * called with a write lock from main thread */ static void _handle_listen_sctp_errors(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_accepted_link_info_t *accept_info; sctp_listen_link_info_t *info; struct knet_host *host; int link_idx; int i; if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error"); return; } /* * revalidate sockfd */ if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) { return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd); accept_info = knet_h->knet_transport_fd_tracker[sockfd].data; info = accept_info->link_info; /* * clear all links using this accepted socket as * outbound dynamically connected socket */ for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) && (host->link[link_idx].outsock == sockfd)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)", host->host_id, link_idx, sockfd); host->link[link_idx].status.dynconnected = 0; host->link[link_idx].transport_connected = 0; host->link[link_idx].outsock = 0; memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage)); } } } for (i=0; iaccepted_socks[i]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd); _set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); info->accepted_socks[i] = -1; free(accept_info); close(sockfd); break; /* Keeps covscan happy */ } } } static void *_sctp_listen_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED); while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s", strerror(errno)); continue; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * Sort out which FD has an incoming connection */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->listensockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket"); _handle_listen_sctp_errors(knet_h); } else { if (_is_valid_fd(knet_h, events[i].data.fd) == 1) { _handle_incoming_sctp(knet_h, events[i].data.fd); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket"); } } } pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STOPPED); return NULL; } /* * sctp_link_listener_start/stop are called in global write lock * context from set_config and clear_config. */ static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int listen_sock = -1; struct epoll_event ev; sctp_listen_link_info_t *info = NULL; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * Only allocate a new listener if src address is different */ qb_list_for_each_entry(info, &handle_info->listen_links_list, list) { if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) { if ((check_add(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, -1, &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) { return NULL; } return info; } } info = malloc(sizeof(sctp_listen_link_info_t)); if (!info) { err = -1; goto exit_error; } memset(info, 0, sizeof(sctp_listen_link_info_t)); memset(info->accepted_socks, -1, sizeof(info->accepted_socks)); memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)); listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (listen_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) { savederrno = errno; err = -1; goto exit_error; } if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } if (listen(listen_sock, 5) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s", strerror(savederrno)); goto exit_error; } if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } if ((check_add(knet_h, listen_sock, KNET_TRANSPORT_SCTP, -1, &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to configure default access lists: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 1; info->listen_sock = listen_sock; qb_list_add(&info->list, &handle_info->listen_links_list); log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port); exit_error: if (err) { if ((info) && (info->on_listener_epoll)) { epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev); } if (listen_sock >= 0) { check_rmall(knet_h, listen_sock, KNET_TRANSPORT_SCTP); close(listen_sock); } if (info) { free(info); info = NULL; } } errno = savederrno; return info; } static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int found = 0, i; struct knet_host *host; int link_idx; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *this_link_info = kn_link->transport_link; sctp_listen_link_info_t *info = this_link_info->listener; sctp_connect_link_info_t *link_info; struct epoll_event ev; for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&host->link[link_idx] == kn_link) continue; link_info = host->link[link_idx].transport_link; if ((link_info) && (link_info->listener == info)) { found = 1; break; } } } if ((check_rm(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove default access lists for %d", info->listen_sock); } if (found) { this_link_info->listener = NULL; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock); savederrno = EBUSY; err = -1; goto exit_error; } if (info->on_listener_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 0; } if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } check_rmall(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP); close(info->listen_sock); for (i=0; i< MAX_ACCEPTED_SOCKS; i++) { if (info->accepted_socks[i] > -1) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->accepted_socks[i]; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 0; free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data); close(info->accepted_socks[i]); if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } info->accepted_socks[i] = -1; } } qb_list_del(&info->list); free(info); this_link_info->listener = NULL; exit_error: errno = savederrno; return err; } /* * Links config/clear. Both called with global wrlock from link_set_config/clear_config */ int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link) { int savederrno = 0, err = 0; sctp_connect_link_info_t *info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; info = malloc(sizeof(sctp_connect_link_info_t)); if (!info) { goto exit_error; } memset(info, 0, sizeof(sctp_connect_link_info_t)); kn_link->transport_link = info; info->link = kn_link; memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage)); info->connect_sock = -1; info->listener = sctp_link_listener_start(knet_h, kn_link); if (!info->listener) { savederrno = errno; err = -1; goto exit_error; } if (kn_link->dynamic == KNET_LINK_STATIC) { if (_create_connect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; goto exit_error; } kn_link->outsock = info->connect_sock; } qb_list_add(&info->list, &handle_info->connect_links_list); exit_error: if (err) { if (info) { if (info->connect_sock >= 0) { close(info->connect_sock); } if (info->listener) { sctp_link_listener_stop(knet_h, kn_link); } kn_link->transport_link = NULL; free(info); } } errno = savederrno; return err; } /* * called with global wrlock */ int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info; if (!kn_link) { errno = EINVAL; return -1; } info = kn_link->transport_link; if (!info) { errno = EINVAL; return -1; } if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener transport: %s", strerror(savederrno)); goto exit_error; } if (_close_connect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s", strerror(savederrno)); goto exit_error; } qb_list_del(&info->list); free(info); kn_link->transport_link = NULL; exit_error: errno = savederrno; return err; } /* * transport_free and transport_init are * called only from knet_handle_new and knet_handle_free. * all resources (hosts/links) should have been already freed at this point * and they are called in a write locked context, hence they * don't need their own locking. */ int sctp_transport_free(knet_handle_t knet_h) { sctp_handle_info_t *handle_info; void *thread_status; struct epoll_event ev; if (!knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EINVAL; return -1; } handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * keep it here while we debug list usage and such */ if (!qb_list_empty(&handle_info->listen_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty"); } if (!qb_list_empty(&handle_info->connect_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty"); } if (handle_info->listen_thread) { pthread_cancel(handle_info->listen_thread); pthread_join(handle_info->listen_thread, &thread_status); } if (handle_info->connect_thread) { pthread_cancel(handle_info->connect_thread); pthread_join(handle_info->connect_thread, &thread_status); } if (handle_info->listensockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev); } if (handle_info->connectsockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev); } _close_socketpair(knet_h, handle_info->connectsockfd); _close_socketpair(knet_h, handle_info->listensockfd); if (handle_info->listen_epollfd >= 0) { close(handle_info->listen_epollfd); } if (handle_info->connect_epollfd >= 0) { close(handle_info->connect_epollfd); } free(handle_info->event_subscribe_buffer); free(handle_info); knet_h->transports[KNET_TRANSPORT_SCTP] = NULL; return 0; } static int _sctp_subscribe_init(knet_handle_t knet_h) { int test_socket, savederrno; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; char dummy_events[100]; struct sctp_event_subscribe *events; /* Below we set the first 6 fields of this expanding struct. * SCTP_EVENTS is deprecated, but SCTP_EVENT is not available * on Linux; on the other hand, FreeBSD and old Linux does not * accept small transfers, so we can't simply use this minimum * everywhere. Thus we query and store the native size. */ const unsigned int subscribe_min = 6; test_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_SCTP); if (test_socket < 0) { if (errno == EPROTONOSUPPORT) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP not supported, skipping initialization"); return 0; } savederrno = errno; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create test socket: %s", strerror(savederrno)); return savederrno; } handle_info->event_subscribe_kernel_size = sizeof dummy_events; if (getsockopt(test_socket, IPPROTO_SCTP, SCTP_EVENTS, &dummy_events, &handle_info->event_subscribe_kernel_size)) { close(test_socket); savederrno = errno; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to query kernel size of struct sctp_event_subscribe: %s", strerror(savederrno)); return savederrno; } close(test_socket); if (handle_info->event_subscribe_kernel_size < subscribe_min) { savederrno = ERANGE; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "No kernel support for the necessary notifications: struct sctp_event_subscribe is %u bytes, %u needed", handle_info->event_subscribe_kernel_size, subscribe_min); return savederrno; } events = malloc(handle_info->event_subscribe_kernel_size); if (!events) { savederrno = errno; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Failed to allocate event subscribe buffer: %s", strerror(savederrno)); return savederrno; } memset(events, 0, handle_info->event_subscribe_kernel_size); events->sctp_data_io_event = 1; events->sctp_association_event = 1; events->sctp_address_event = 1; events->sctp_send_failure_event = 1; events->sctp_peer_error_event = 1; events->sctp_shutdown_event = 1; handle_info->event_subscribe_buffer = (char *)events; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Size of struct sctp_event_subscribe is %u in kernel, %zu in user space", handle_info->event_subscribe_kernel_size, sizeof(struct sctp_event_subscribe)); return 0; } int sctp_transport_init(knet_handle_t knet_h) { int err = 0, savederrno = 0; sctp_handle_info_t *handle_info; struct epoll_event ev; if (knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EEXIST; return -1; } handle_info = malloc(sizeof(sctp_handle_info_t)); if (!handle_info) { return -1; } memset(handle_info, 0,sizeof(sctp_handle_info_t)); knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info; savederrno = _sctp_subscribe_init(knet_h); if (savederrno) { err = -1; goto exit_fail; } qb_list_init(&handle_info->listen_links_list); qb_list_init(&handle_info->connect_links_list); handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->listen_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->listen_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s", strerror(savederrno)); goto exit_fail; } handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->connect_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->connect_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s", strerror(savederrno)); goto exit_fail; } /* * Start connect & listener threads */ set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED); savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED); savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s", strerror(savederrno)); goto exit_fail; } exit_fail: if (err < 0) { sctp_transport_free(knet_h); } errno = savederrno; return err; } int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link) { kn_link->outsock = sockfd; kn_link->status.dynconnected = 1; kn_link->transport_connected = 1; return 0; } int sctp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link) { sctp_connect_link_info_t *this_link_info = kn_link->transport_link; sctp_listen_link_info_t *info = this_link_info->listener; return info->listen_sock; } #endif diff --git a/libknet/transport_udp.c b/libknet/transport_udp.c index 37004dff..5cbb34f7 100644 --- a/libknet/transport_udp.c +++ b/libknet/transport_udp.c @@ -1,439 +1,440 @@ /* * Copyright (C) 2016-2020 Red Hat, Inc. All rights reserved. * * Author: Christine Caulfield * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #if defined (IP_RECVERR) || defined (IPV6_RECVERR) #include #endif #include "libknet.h" #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" +#include "netutils.h" #include "transport_common.h" #include "transport_udp.h" #include "transports.h" #include "threads_common.h" typedef struct udp_handle_info { struct qb_list_head links_list; } udp_handle_info_t; typedef struct udp_link_info { struct qb_list_head list; struct sockaddr_storage local_address; int socket_fd; int on_epoll; } udp_link_info_t; int udp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int sock = -1; struct epoll_event ev; udp_link_info_t *info; udp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_UDP]; #if defined (IP_RECVERR) || defined (IPV6_RECVERR) int value; #endif /* * Only allocate a new link if the local address is different */ qb_list_for_each_entry(info, &handle_info->links_list, list) { if (memcmp(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Re-using existing UDP socket for new link"); kn_link->outsock = info->socket_fd; kn_link->transport_link = info; kn_link->transport_connected = 1; return 0; } } info = malloc(sizeof(udp_link_info_t)); if (!info) { err = -1; goto exit_error; } memset(info, 0, sizeof(udp_link_info_t)); sock = socket(kn_link->src_addr.ss_family, SOCK_DGRAM, 0); if (sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_transport_socket(knet_h, sock, &kn_link->src_addr, kn_link->flags, "UDP") < 0) { savederrno = errno; err = -1; goto exit_error; } #ifdef IP_RECVERR if (kn_link->src_addr.ss_family == AF_INET) { value = 1; if (setsockopt(sock, SOL_IP, IP_RECVERR, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set RECVERR on socket: %s", strerror(savederrno)); goto exit_error; } log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IP_RECVERR enabled on socket: %i", sock); } #else log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IP_RECVERR not available in this build/platform"); #endif #ifdef IPV6_RECVERR if (kn_link->src_addr.ss_family == AF_INET6) { value = 1; if (setsockopt(sock, SOL_IPV6, IPV6_RECVERR, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set RECVERR on socket: %s", strerror(savederrno)); goto exit_error; } log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IPV6_RECVERR enabled on socket: %i", sock); } #else log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IPV6_RECVERR not available in this build/platform"); #endif if (bind(sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr))) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_epoll = 1; if (_set_fd_tracker(knet_h, sock, KNET_TRANSPORT_UDP, 0, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } memmove(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)); info->socket_fd = sock; qb_list_add(&info->list, &handle_info->links_list); kn_link->outsock = sock; kn_link->transport_link = info; kn_link->transport_connected = 1; exit_error: if (err) { if (info) { if (info->on_epoll) { epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sock, &ev); } free(info); } if (sock >= 0) { close(sock); } } errno = savederrno; return err; } int udp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int found = 0; struct knet_host *host; int link_idx; udp_link_info_t *info = kn_link->transport_link; struct epoll_event ev; for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&host->link[link_idx] == kn_link) continue; if (host->link[link_idx].transport_link == info) { found = 1; break; } } } if (found) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "UDP socket %d still in use", info->socket_fd); savederrno = EBUSY; err = -1; goto exit_error; } if (info->on_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->socket_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->socket_fd, &ev) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to remove UDP socket from epoll poll: %s", strerror(errno)); goto exit_error; } info->on_epoll = 0; } if (_set_fd_tracker(knet_h, info->socket_fd, KNET_MAX_TRANSPORTS, 0, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } close(info->socket_fd); qb_list_del(&info->list); free(kn_link->transport_link); exit_error: errno = savederrno; return err; } int udp_transport_free(knet_handle_t knet_h) { udp_handle_info_t *handle_info; if (!knet_h->transports[KNET_TRANSPORT_UDP]) { errno = EINVAL; return -1; } handle_info = knet_h->transports[KNET_TRANSPORT_UDP]; /* * keep it here while we debug list usage and such */ if (!qb_list_empty(&handle_info->links_list)) { log_err(knet_h, KNET_SUB_TRANSP_UDP, "Internal error. handle list is not empty"); return -1; } free(handle_info); knet_h->transports[KNET_TRANSPORT_UDP] = NULL; return 0; } int udp_transport_init(knet_handle_t knet_h) { udp_handle_info_t *handle_info; if (knet_h->transports[KNET_TRANSPORT_UDP]) { errno = EEXIST; return -1; } handle_info = malloc(sizeof(udp_handle_info_t)); if (!handle_info) { return -1; } memset(handle_info, 0, sizeof(udp_handle_info_t)); knet_h->transports[KNET_TRANSPORT_UDP] = handle_info; qb_list_init(&handle_info->links_list); return 0; } #if defined (IP_RECVERR) || defined (IPV6_RECVERR) static int read_errs_from_sock(knet_handle_t knet_h, int sockfd) { int err = 0, savederrno = 0; int got_err = 0; char buffer[1024]; struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; struct sock_extended_err *sock_err; struct icmphdr icmph; struct sockaddr_storage remote; struct sockaddr_storage *origin; char addr_str[KNET_MAX_HOST_LEN]; char port_str[KNET_MAX_PORT_LEN]; char addr_remote_str[KNET_MAX_HOST_LEN]; char port_remote_str[KNET_MAX_PORT_LEN]; iov.iov_base = &icmph; iov.iov_len = sizeof(icmph); msg.msg_name = (void*)&remote; msg.msg_namelen = sizeof(remote); msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_flags = 0; msg.msg_control = buffer; msg.msg_controllen = sizeof(buffer); for (;;) { err = recvmsg(sockfd, &msg, MSG_ERRQUEUE); savederrno = errno; if (err < 0) { if (!got_err) { errno = savederrno; return -1; } else { return 0; } } got_err = 1; for (cmsg = CMSG_FIRSTHDR(&msg);cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (((cmsg->cmsg_level == SOL_IP) && (cmsg->cmsg_type == IP_RECVERR)) || ((cmsg->cmsg_level == SOL_IPV6 && (cmsg->cmsg_type == IPV6_RECVERR)))) { sock_err = (struct sock_extended_err*)(void *)CMSG_DATA(cmsg); if (sock_err) { switch (sock_err->ee_origin) { case SO_EE_ORIGIN_NONE: /* no origin */ case SO_EE_ORIGIN_LOCAL: /* local source (EMSGSIZE) */ if (sock_err->ee_errno == EMSGSIZE) { if (pthread_mutex_lock(&knet_h->kmtu_mutex) != 0) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Unable to get mutex lock"); knet_h->kernel_mtu = 0; break; } else { knet_h->kernel_mtu = sock_err->ee_info; log_debug(knet_h, KNET_SUB_TRANSP_UDP, "detected kernel MTU: %u", knet_h->kernel_mtu); pthread_mutex_unlock(&knet_h->kmtu_mutex); } force_pmtud_run(knet_h, KNET_SUB_TRANSP_UDP, 0); } /* * those errors are way too noisy */ break; case SO_EE_ORIGIN_ICMP: /* ICMP */ case SO_EE_ORIGIN_ICMP6: /* ICMP6 */ origin = (struct sockaddr_storage *)(void *)SO_EE_OFFENDER(sock_err); if (knet_addrtostr(origin, sizeof(*origin), addr_str, KNET_MAX_HOST_LEN, port_str, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from unknown source: %s", strerror(sock_err->ee_errno)); } else { if (knet_addrtostr(&remote, sizeof(remote), addr_remote_str, KNET_MAX_HOST_LEN, port_remote_str, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from %s: %s destination unknown", addr_str, strerror(sock_err->ee_errno)); } else { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from %s: %s %s", addr_str, strerror(sock_err->ee_errno), addr_remote_str); } } break; } } else { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "No data in MSG_ERRQUEUE"); } } } } } #else static int read_errs_from_sock(knet_handle_t knet_h, int sockfd) { return 0; } #endif int udp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { if (recv_errno == EAGAIN) { read_errs_from_sock(knet_h, sockfd); } return 0; } int udp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { if (recv_err < 0) { if (recv_errno == EMSGSIZE) { read_errs_from_sock(knet_h, sockfd); return 0; } if ((recv_errno == EINVAL) || (recv_errno == EPERM) || (recv_errno == ENETUNREACH) || (recv_errno == ENETDOWN)) { #ifdef DEBUG if ((recv_errno == ENETUNREACH) || (recv_errno == ENETDOWN)) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Sock: %d is unreachable.", sockfd); } #endif return -1; } if ((recv_errno == ENOBUFS) || (recv_errno == EAGAIN)) { #ifdef DEBUG log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Sock: %d is overloaded. Slowing TX down", sockfd); #endif usleep(knet_h->threads_timer_res / 16); } else { read_errs_from_sock(knet_h, sockfd); } return 1; } return 0; } int udp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { if (msg->msg_len == 0) return KNET_TRANSPORT_RX_NOT_DATA_CONTINUE; return KNET_TRANSPORT_RX_IS_DATA; } int udp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link) { kn_link->status.dynconnected = 1; return 0; } int udp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link) { return kn_link->outsock; }