diff --git a/libknet/handle.c b/libknet/handle.c index 68641d6e..d4bd86c2 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,808 +1,787 @@ /* * Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include "internals.h" #include "crypto.h" #include "links.h" #include "compress.h" #include "compat.h" #include "common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_dsthandler.h" #include "threads_rx.h" #include "threads_tx.h" #include "transports.h" #include "transport_common.h" #include "logging.h" static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_rwlock_t shlib_rwlock; static uint8_t shlib_wrlock_init = 0; static uint32_t knet_ref = 0; static int _init_shlib_tracker(knet_handle_t knet_h) { int savederrno = 0; if (!shlib_wrlock_init) { savederrno = pthread_rwlock_init(&shlib_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s", strerror(savederrno)); errno = savederrno; return -1; } shlib_wrlock_init = 1; } return 0; } static void _fini_shlib_tracker(void) { if (knet_ref == 0) { pthread_rwlock_destroy(&shlib_rwlock); shlib_wrlock_init = 0; } return; } static int _init_locks(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->handle_stats_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize handle stats mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->threads_status_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize threads status mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->kmtu_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize kernel_mtu mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_locks(knet_handle_t knet_h) { pthread_rwlock_destroy(&knet_h->global_rwlock); pthread_mutex_destroy(&knet_h->pmtud_mutex); pthread_mutex_destroy(&knet_h->kmtu_mutex); pthread_cond_destroy(&knet_h->pmtud_cond); pthread_mutex_destroy(&knet_h->hb_mutex); pthread_mutex_destroy(&knet_h->tx_mutex); pthread_mutex_destroy(&knet_h->backoff_mutex); pthread_mutex_destroy(&knet_h->tx_seq_num_mutex); pthread_mutex_destroy(&knet_h->threads_status_mutex); pthread_mutex_destroy(&knet_h->handle_stats_mutex); } static int _init_socks(knet_handle_t knet_h) { int savederrno = 0; - if (_init_socketpair(knet_h, knet_h->hostsockfd)) { - savederrno = errno; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s", - strerror(savederrno)); - goto exit_fail; - } - if (_init_socketpair(knet_h, knet_h->dstsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_socks(knet_handle_t knet_h) { _close_socketpair(knet_h, knet_h->dstsockfd); - _close_socketpair(knet_h, knet_h->hostsockfd); } static int _init_buffers(knet_handle_t knet_h) { int savederrno = 0; int i; size_t bufsize; for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE; knet_h->send_to_links_buf[i] = malloc(bufsize); if (!knet_h->send_to_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf[i], 0, bufsize); } for (i = 0; i < PCKT_RX_BUFS; i++) { knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE); } knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_sock_buf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE); knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE); if (!knet_h->pingbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE); knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6 + KNET_HEADER_ALL_SIZE); if (!knet_h->pmtudbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6 + KNET_HEADER_ALL_SIZE); for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD; knet_h->send_to_links_buf_crypt[i] = malloc(bufsize); if (!knet_h->send_to_links_buf_crypt[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize); } knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_decrypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pingbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pmtudbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS); if (!knet_h->recv_from_links_buf_decompress) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS); knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS); if (!knet_h->send_to_links_buf_compress) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS); memset(knet_h->knet_transport_fd_tracker, 0, sizeof(knet_h->knet_transport_fd_tracker)); for (i = 0; i < KNET_MAX_FDS; i++) { knet_h->knet_transport_fd_tracker[i].transport = KNET_MAX_TRANSPORTS; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_buffers(knet_handle_t knet_h) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { free(knet_h->send_to_links_buf[i]); free(knet_h->send_to_links_buf_crypt[i]); } for (i = 0; i < PCKT_RX_BUFS; i++) { free(knet_h->recv_from_links_buf[i]); } free(knet_h->recv_from_links_buf_decompress); free(knet_h->send_to_links_buf_compress); free(knet_h->recv_from_sock_buf); free(knet_h->recv_from_links_buf_decrypt); free(knet_h->recv_from_links_buf_crypt); free(knet_h->pingbuf); free(knet_h->pingbuf_crypt); free(knet_h->pmtudbuf); free(knet_h->pmtudbuf_crypt); } static int _init_epolls(knet_handle_t knet_h) { struct epoll_event ev; int savederrno = 0; /* * even if the kernel does dynamic allocation with epoll_ctl * we need to reserve one extra for host to host communication */ knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (knet_h->send_to_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->recv_from_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->dst_link_handler_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->send_to_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s", strerror(savederrno)); goto exit_fail; } - memset(&ev, 0, sizeof(struct epoll_event)); - ev.events = EPOLLIN; - ev.data.fd = knet_h->hostsockfd[0]; - - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) { - savederrno = errno; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s", - strerror(savederrno)); - goto exit_fail; - } - memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->dstsockfd[0]; if (epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_epolls(knet_handle_t knet_h) { struct epoll_event ev; int i; memset(&ev, 0, sizeof(struct epoll_event)); for (i = 0; i < KNET_DATAFD_MAX; i++) { if (knet_h->sockfd[i].in_use) { epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev); if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) { _close_socketpair(knet_h, knet_h->sockfd[i].sockfd); } } } - epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev); epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev); close(knet_h->send_to_links_epollfd); close(knet_h->recv_from_links_epollfd); close(knet_h->dst_link_handler_epollfd); } static int _start_threads(knet_handle_t knet_h) { int savederrno = 0; pthread_attr_t attr; set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_REGISTERED); savederrno = pthread_attr_init(&attr); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to init pthread attributes: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_attr_setstacksize(&attr, KNET_THREAD_STACK_SIZE); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to set stack size attribute: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, &attr, _handle_pmtud_link_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->dst_link_handler_thread, &attr, _handle_dst_link_handler_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->send_to_links_thread, &attr, _handle_send_to_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->recv_from_links_thread, &attr, _handle_recv_from_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_REGISTERED); savederrno = pthread_create(&knet_h->heartbt_thread, &attr, _handle_heartbt_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_attr_destroy(&attr); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to destroy pthread attributes: %s", strerror(savederrno)); /* * Do not return error code. Error is not critical. */ } return 0; exit_fail: errno = savederrno; return -1; } static void _stop_threads(knet_handle_t knet_h) { void *retval; wait_all_threads_status(knet_h, KNET_THREAD_STOPPED); if (knet_h->heartbt_thread) { pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); } if (knet_h->send_to_links_thread) { pthread_cancel(knet_h->send_to_links_thread); pthread_join(knet_h->send_to_links_thread, &retval); } if (knet_h->recv_from_links_thread) { pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); } if (knet_h->dst_link_handler_thread) { pthread_cancel(knet_h->dst_link_handler_thread); pthread_join(knet_h->dst_link_handler_thread, &retval); } if (knet_h->pmtud_link_handler_thread) { pthread_cancel(knet_h->pmtud_link_handler_thread); pthread_join(knet_h->pmtud_link_handler_thread, &retval); } } knet_handle_t knet_handle_new_ex(knet_node_id_t host_id, int log_fd, uint8_t default_log_level, uint64_t flags) { knet_handle_t knet_h; int savederrno = 0; struct rlimit cur; if (getrlimit(RLIMIT_NOFILE, &cur) < 0) { return NULL; } if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) { errno = EINVAL; return NULL; } /* * validate incoming request */ if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) { errno = EINVAL; return NULL; } if (flags > KNET_HANDLE_FLAG_PRIVILEGED * 2 - 1) { errno = EINVAL; return NULL; } /* * allocate handle */ knet_h = malloc(sizeof(struct knet_handle)); if (!knet_h) { errno = ENOMEM; return NULL; } memset(knet_h, 0, sizeof(struct knet_handle)); /* * setting up some handle data so that we can use logging * also when initializing the library global locks * and trackers */ knet_h->flags = flags; /* * copy config in place */ knet_h->host_id = host_id; knet_h->logfd = log_fd; if (knet_h->logfd > 0) { memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS); } /* * set pmtud default timers */ knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL; /* * set transports reconnect default timers */ knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL; /* * Set 'min' stats to the maximum value so the * first value we get is always less */ knet_h->stats.tx_compress_time_min = UINT64_MAX; knet_h->stats.rx_compress_time_min = UINT64_MAX; knet_h->stats.tx_crypt_time_min = UINT64_MAX; knet_h->stats.rx_crypt_time_min = UINT64_MAX; /* * init global shlib tracker */ savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); free(knet_h); knet_h = NULL; errno = savederrno; return NULL; } knet_ref++; if (_init_shlib_tracker(knet_h) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handle tracker: %s", strerror(savederrno)); errno = savederrno; pthread_mutex_unlock(&handle_config_mutex); goto exit_fail; } pthread_mutex_unlock(&handle_config_mutex); /* * init main locking structures */ if (_init_locks(knet_h)) { savederrno = errno; goto exit_fail; } /* * init sockets */ if (_init_socks(knet_h)) { savederrno = errno; goto exit_fail; } /* * allocate packet buffers */ if (_init_buffers(knet_h)) { savederrno = errno; goto exit_fail; } if (compress_init(knet_h)) { savederrno = errno; goto exit_fail; } /* * create epoll fds */ if (_init_epolls(knet_h)) { savederrno = errno; goto exit_fail; } /* * start transports */ if (start_all_transports(knet_h)) { savederrno = errno; goto exit_fail; } /* * start internal threads */ if (_start_threads(knet_h)) { savederrno = errno; goto exit_fail; } wait_all_threads_status(knet_h, KNET_THREAD_STARTED); errno = 0; return knet_h; exit_fail: knet_handle_free(knet_h); errno = savederrno; return NULL; } knet_handle_t knet_handle_new(knet_node_id_t host_id, int log_fd, uint8_t default_log_level) { return knet_handle_new_ex(host_id, log_fd, default_log_level, KNET_HANDLE_FLAG_PRIVILEGED); } int knet_handle_free(knet_handle_t knet_h) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_head != NULL) { savederrno = EBUSY; log_err(knet_h, KNET_SUB_HANDLE, "Unable to free handle: host(s) or listener(s) are still active: %s", strerror(savederrno)); pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return -1; } knet_h->fini_in_progress = 1; pthread_rwlock_unlock(&knet_h->global_rwlock); _stop_threads(knet_h); stop_all_transports(knet_h); _close_epolls(knet_h); _destroy_buffers(knet_h); _close_socks(knet_h); crypto_fini(knet_h, KNET_MAX_CRYPTO_INSTANCES + 1); /* values above MAX_CRYPTO will release all crypto resources */ compress_fini(knet_h, 1); _destroy_locks(knet_h); free(knet_h); knet_h = NULL; (void)pthread_mutex_lock(&handle_config_mutex); knet_ref--; _fini_shlib_tracker(); pthread_mutex_unlock(&handle_config_mutex); errno = 0; return 0; } diff --git a/libknet/host.c b/libknet/host.c index cf351798..e9e86eb8 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,742 +1,721 @@ /* * Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "host.h" #include "internals.h" #include "logging.h" #include "threads_common.h" static void _host_list_update(knet_handle_t knet_h) { struct knet_host *host; knet_h->host_ids_entries = 0; for (host = knet_h->host_head; host != NULL; host = host->next) { knet_h->host_ids[knet_h->host_ids_entries] = host->host_id; knet_h->host_ids_entries++; } } int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host = NULL; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_index[host_id]) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } host = malloc(sizeof(struct knet_host)); if (!host) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memset(host, 0, sizeof(struct knet_host)); /* * set host_id */ host->host_id = host_id; /* * set default host->name to host_id for logging */ snprintf(host->name, KNET_MAX_HOST_LEN, "%u", host_id); /* * initialize links internal data */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { host->link[link_idx].link_id = link_idx; host->link[link_idx].status.stats.latency_min = UINT32_MAX; } /* * add new host to the index */ knet_h->host_index[host_id] = host; /* * add new host to host list */ if (knet_h->host_head) { host->next = knet_h->host_head; } knet_h->host_head = host; _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (err < 0) { free(host); } errno = err ? savederrno : 0; return err; } int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id) { int savederrno = 0, err = 0; struct knet_host *host, *removed; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } /* * if links are configured we cannot release the host */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].configured) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s", host_id, strerror(savederrno)); goto exit_unlock; } } removed = NULL; /* * removing host from list */ if (knet_h->host_head->host_id == host_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->host_id == host_id) { removed = host->next; host->next = removed->next; break; } } } knet_h->host_index[host_id] = NULL; free(removed); _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (!name) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (strlen(name) >= KNET_MAX_HOST_LEN) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(host->name, name, KNET_MAX_HOST_LEN)) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u", host->host_id); goto exit_unlock; } } snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN, "%s", name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id, char *name) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { savederrno = EINVAL; err = -1; log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id); goto exit_unlock; } snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, knet_node_id_t *host_id) { int savederrno = 0, err = 0, found = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } if (!host_id) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) { found = 1; *host_id = host->host_id; break; } } if (!found) { savederrno = ENOENT; err = -1; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_host_list(knet_handle_t knet_h, knet_node_id_t *host_ids, size_t *host_ids_entries) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!host_ids) || (!host_ids_entries)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids)); *host_ids_entries = knet_h->host_ids_entries; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t policy) { int savederrno = 0, err = 0; uint8_t old_policy; if (!knet_h) { errno = EINVAL; return -1; } if (policy > KNET_LINK_POLICY_RR) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } old_policy = knet_h->host_index[host_id]->link_handler_policy; knet_h->host_index[host_id]->link_handler_policy = policy; if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) { savederrno = errno; err = -1; knet_h->host_index[host_id]->link_handler_policy = old_policy; log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s", host_id, strerror(savederrno)); } log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *policy) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!policy) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } *policy = knet_h->host_index[host_id]->link_handler_policy; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id, struct knet_host_status *status) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &host->status, sizeof(struct knet_host_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data; knet_h->host_status_change_notify_fn = host_status_change_notify_fn; if (knet_h->host_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } -int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen) -{ - ssize_t ret = 0; - - if (knet_h->fini_in_progress) { - return 0; - } - - ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); - if (ret < 0) { - log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno)); - return -1; - } - if ((size_t)ret != datalen) { - log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret); - return -1; - } - - return 0; -} - static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) { int i; memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE); host->rx_seq_num = rx_seq_num; memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); for (i = 0; i < KNET_MAX_LINK; i++) { memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); } } static void _reclaim_old_defrag_bufs(struct knet_host *host, seq_num_t seq_num) { seq_num_t head, tail; /* seq_num boundaries */ int i; head = seq_num + 1; tail = seq_num - (KNET_MAX_LINK + 1); /* * expire old defrag buffers */ for (i = 0; i < KNET_MAX_LINK; i++) { if (host->defrag_buf[i].in_use) { /* * head has done a rollover to 0+ */ if (tail > head) { if ((host->defrag_buf[i].pckt_seq >= head) && (host->defrag_buf[i].pckt_seq <= tail)) { host->defrag_buf[i].in_use = 0; } } else { if ((host->defrag_buf[i].pckt_seq >= head) || (host->defrag_buf[i].pckt_seq <= tail)){ host->defrag_buf[i].in_use = 0; } } } } } /* * check if a given packet seq num is in the circular buffers * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup */ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) { size_t head, tail; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = host->circular_buffer; char *dst_cbuf_defrag = host->circular_buffer_defrag; seq_num_t *dst_seq_num = &host->rx_seq_num; if (clear_buf) { _clear_cbuffers(host, seq_num); } _reclaim_old_defrag_bufs(host, seq_num); if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { seq_dist = *dst_seq_num - seq_num; } head = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { return (dst_cbuf[head] == 0) ? 1 : 0; } else { return (dst_cbuf_defrag[head] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ tail = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; if (tail > head) { memset(dst_cbuf + tail, 0, KNET_CBUFFER_SIZE - tail); memset(dst_cbuf, 0, head + 1); memset(dst_cbuf_defrag + tail, 0, KNET_CBUFFER_SIZE - tail); memset(dst_cbuf_defrag, 0, head + 1); } else { memset(dst_cbuf + tail, 0, head - tail + 1); memset(dst_cbuf_defrag + tail, 0, head - tail + 1); } *dst_seq_num = seq_num; return 1; } void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf) { if (!defrag_buf) { host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host) { int savederrno = 0; knet_node_id_t host_id = host->host_id; if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) { savederrno = errno; log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s", strerror(savederrno)); errno = savederrno; return -1; } return 0; } int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host) { int link_idx; int best_priority = -1; int reachable = 0; if (knet_h->host_id == host->host_id && knet_h->has_loop_link) { host->active_link_entries = 1; return 0; } host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].status.enabled != 1) /* link is not enabled */ continue; if (host->link[link_idx].status.connected != 1) /* link is not enabled */ continue; if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */ continue; if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (host->link[link_idx].priority > best_priority) { host->active_links[0] = link_idx; best_priority = host->link[link_idx].priority; } host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ host->active_links[host->active_link_entries] = link_idx; host->active_link_entries++; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { log_info(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)", host->host_id, host->link[host->active_links[0]].link_id, host->link[host->active_links[0]].priority); } else { log_info(knet_h, KNET_SUB_HOST, "host: %u has %u active links", host->host_id, host->active_link_entries); } /* no active links, we can clean the circular buffers and indexes */ if (!host->active_link_entries) { log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); _clear_cbuffers(host, 0); } else { reachable = 1; } if (host->status.reachable != reachable) { host->status.reachable = reachable; if (knet_h->host_status_change_notify_fn) { knet_h->host_status_change_notify_fn( knet_h->host_status_change_notify_fn_private_data, host->host_id, host->status.reachable, host->status.remote, host->status.external); } } return 0; } diff --git a/libknet/host.h b/libknet/host.h index bd2e8a70..3312c8ba 100644 --- a/libknet/host.h +++ b/libknet/host.h @@ -1,22 +1,21 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_HOST_H__ #define __KNET_HOST_H__ #include "internals.h" int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf); void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf); -int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen); int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host); int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host); #endif diff --git a/libknet/internals.h b/libknet/internals.h index cda58a6d..22f654bb 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,422 +1,419 @@ /* * Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_INTERNALS_H__ #define __KNET_INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include #include #include #include "libknet.h" #include "onwire.h" #include "compat.h" #include "threads_common.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_DATABUFSIZE_COMPRESS_PAD 1024 #define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define PCKT_RX_BUFS 512 #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1 -#define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX - /* * Size of threads stack. Value is choosen by experimenting, how much is needed * to sucesfully finish test suite, and at the time of writing patch it was * ~300KiB. To have some room for future enhancement it is increased * by factor of 3 and rounded. */ #define KNET_THREAD_STACK_SIZE (1024 * 1024) typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_mmsghdr { struct msghdr msg_hdr; /* Message header */ unsigned int msg_len; /* Number of bytes transmitted */ }; struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned long long pong_timeout_adj; /* timeout adjusted for latency */ uint8_t pong_timeout_backoff; /* see link.h for definition */ unsigned int latency_max_samples; /* precision */ unsigned int latency_cur_samples; uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ uint64_t flags; /* status */ struct knet_link_status status; /* internals */ pthread_mutex_t link_stats_mutex; /* used to update link stats */ uint8_t link_id; uint8_t transport; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused with stats.proto_overhead that includes also knet headers and crypto headers */ struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */ uint8_t has_valid_mtu; }; #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { char buf[KNET_DATABUFSIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ ssize_t frag_size; /* normal frag size (not the last one) */ ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ knet_node_id_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* internals */ char circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t rx_seq_num; seq_num_t untimed_rx_seq_num; seq_num_t timed_rx_seq_num; uint8_t got_data; /* defrag/reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK]; char circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * with this fd */ void *data; /* pointer to the data */ void *access_list_match_entry_head; /* pointer to access list match_entry list head */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 #define KNET_MAX_COMPRESS_METHODS UINT8_MAX #define KNET_MAX_CRYPTO_INSTANCES 2 struct knet_handle_stats_extra { uint64_t tx_crypt_pmtu_packets; uint64_t tx_crypt_pmtu_reply_packets; uint64_t tx_crypt_ping_packets; uint64_t tx_crypt_pong_packets; }; struct knet_handle { knet_node_id_t host_id; unsigned int enabled:1; struct knet_sock sockfd[KNET_DATAFD_MAX + 1]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; - int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */ unsigned int pmtud_interval; unsigned int manual_mtu; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ struct knet_handle_stats stats; struct knet_handle_stats_extra stats_extra; pthread_mutex_t handle_stats_mutex; /* used to protect handle stats */ uint32_t reconnect_int; knet_node_id_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_RX_BUFS]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; uint8_t threads_status[KNET_THREAD_MAX]; uint8_t threads_flush_queue[KNET_THREAD_MAX]; pthread_mutex_t threads_status_mutex; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */ pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */ pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */ pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */ uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */ int pmtud_waiting; int pmtud_running; int pmtud_forcerun; int pmtud_abort; struct crypto_instance *crypto_instance[KNET_MAX_CRYPTO_INSTANCES + 1]; /* store an extra pointer to allow 0|1|2 values without too much magic in the code */ uint8_t crypto_in_use_config; /* crypto config to use for TX */ uint8_t crypto_only; /* allow only crypto (1) or also clear (0) traffic */ size_t sec_block_size; size_t sec_hash_size; size_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; int compress_model; int compress_level; size_t compress_threshold; void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */ unsigned char *recv_from_links_buf_decompress; unsigned char *send_to_links_buf_compress; seq_num_t tx_seq_num; pthread_mutex_t tx_seq_num_mutex; uint8_t has_loop_link; uint8_t loop_link; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_node_id, int8_t *channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); int fini_in_progress; uint64_t flags; }; extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */ /* * NOTE: every single operation must be implementend * for every protocol. */ /* * for now knet supports only IP protocols (udp/sctp) * in future there might be others like ARP * or TIPC. * keep this around as transport information * to use for access lists and other operations */ #define TRANSPORT_PROTO_LOOPBACK 0 #define TRANSPORT_PROTO_IP_PROTO 1 /* * some transports like SCTP can filter incoming * connections before knet has to process * any packets. * GENERIC_ACL -> packet has to be read and filterted * PROTO_ACL -> transport provides filtering at lower levels * and packet does not need to be processed */ typedef enum { USE_NO_ACL, USE_GENERIC_ACL, USE_PROTO_ACL } transport_acl; /* * make it easier to map values in transports.c */ #define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0 #define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1 typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; const uint8_t transport_id; const uint8_t built_in; uint8_t transport_protocol; transport_acl transport_acl_type; /* * connection oriented protocols like SCTP * donĀ“t need dst_addr in sendto calls and * on some OSes are considered EINVAL. */ uint8_t transport_is_connection_oriented; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * transport callback for incoming dynamic connections * this is called in global read lock context */ int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link); /* * return the fd to use for access lists */ int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rdlock */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * transport_tx_sock_error is invoked with global_rwlock and * it's invoked when sendto or sendmmsg returns =< 0 * * it should return: * -1 on internal error * 0 ignore error and continue * 1 retry * any sleep or wait action should happen inside the transport code */ int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg); /* * this function is called by links.c when a link down event is recorded * to notify the transport that packets are not going through, and give * transport the opportunity to take actions. */ int (*transport_link_is_down)(knet_handle_t knet_h, struct knet_link *link); } knet_transport_ops_t; struct pretty_names { const char *name; uint8_t val; }; #endif diff --git a/libknet/onwire.h b/libknet/onwire.h index e00ad915..1040ea07 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -1,208 +1,135 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_ONWIRE_H__ #define __KNET_ONWIRE_H__ /* * data structures to define network packets. * Start from knet_header at the bottom */ #include #include "libknet.h" -#if 0 - -/* - * for future protocol extension (re-switching table calculation) - */ - -struct knet_hinfo_link { - uint8_t khl_link_id; - uint8_t khl_link_dynamic; - uint8_t khl_link_priority; - uint64_t khl_link_latency; - char khl_link_dst_ipaddr[KNET_MAX_HOST_LEN]; - char khl_link_dst_port[KNET_MAX_PORT_LEN]; -} __attribute__((packed)); - -struct knet_hinfo_link_table { - knet_node_id_t khlt_node_id; - uint8_t khlt_local; /* we have this node connected locally */ - struct knet_hinfo_link khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */ -} __attribute__((packed)); - -struct link_table { - knet_node_id_t khdt_host_entries; - uint8_t khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */ -} __attribute__((packed)); -#endif - -#define KNET_HOSTINFO_LINK_STATUS_DOWN 0 -#define KNET_HOSTINFO_LINK_STATUS_UP 1 - -struct knet_hostinfo_payload_link_status { - uint8_t khip_link_status_link_id; /* link id */ - uint8_t khip_link_status_status; /* up/down status */ -} __attribute__((packed)); - -/* - * union to reference possible individual payloads - */ - -union knet_hostinfo_payload { - struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status; -} __attribute__((packed)); - -/* - * due to the nature of knet_hostinfo, we are currently - * sending those data as part of knet_header_payload_data.khp_data_userdata - * and avoid a union that increses knet_header_payload_data size - * unnecessarely. - * This might change later on depending on how we implement - * host info exchange - */ - -#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 // UNUSED -#define KNET_HOSTINFO_TYPE_LINK_TABLE 1 // NOT IMPLEMENTED - -#define KNET_HOSTINFO_UCAST 0 /* send info to a specific host */ -#define KNET_HOSTINFO_BCAST 1 /* send info to all known / connected hosts */ - -struct knet_hostinfo { - uint8_t khi_type; /* type of hostinfo we are sending */ - uint8_t khi_bcast; /* hostinfo destination bcast/ucast */ - knet_node_id_t khi_dst_node_id;/* used only if in ucast mode */ - union knet_hostinfo_payload khi_payload; -} __attribute__((packed)); - -#define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo) -#define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload)) -#define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status)) - -#define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status -#define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id - /* * typedef uint64_t seq_num_t; * #define SEQ_MAX UINT64_MAX */ typedef uint16_t seq_num_t; #define SEQ_MAX UINT16_MAX struct knet_header_payload_data { seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pkcts */ uint8_t khp_data_compress; /* identify if user data are compressed */ uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */ uint8_t khp_data_bcast; /* data destination bcast/ucast */ uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */ uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */ int8_t khp_data_channel; /* transport channel data for localsock <-> knet <-> localsock mapping */ uint8_t khp_data_userdata[0]; /* pointer to the real user data */ } __attribute__((packed)); struct knet_header_payload_ping { uint8_t khp_ping_link; /* source link id */ uint32_t khp_ping_time[4]; /* ping timestamp */ seq_num_t khp_ping_seq_num; /* transport host seq_num */ uint8_t khp_ping_timed; /* timed pinged (1) or forced by seq_num (0) */ } __attribute__((packed)); /* taken from tracepath6 */ #define KNET_PMTUD_SIZE_V4 65535 #define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4 /* * IPv4/IPv6 header size */ #define KNET_PMTUD_OVERHEAD_V4 20 #define KNET_PMTUD_OVERHEAD_V6 40 #define KNET_PMTUD_MIN_MTU_V4 576 #define KNET_PMTUD_MIN_MTU_V6 1280 struct knet_header_payload_pmtud { uint8_t khp_pmtud_link; /* source link id */ uint16_t khp_pmtud_size; /* size of the current packet */ uint8_t khp_pmtud_data[0]; /* pointer to empty/random data/fill buffer */ } __attribute__((packed)); /* * union to reference possible individual payloads */ union knet_header_payload { struct knet_header_payload_data khp_data; /* pure data packet struct */ struct knet_header_payload_ping khp_ping; /* heartbeat packet struct */ struct knet_header_payload_pmtud khp_pmtud; /* Path MTU discovery packet struct */ } __attribute__((packed)); /* * starting point */ #define KNET_HEADER_VERSION 0x01 /* we currently support only one version */ #define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */ -#define KNET_HEADER_TYPE_HOST_INFO 0x01 /* host status information pckt */ #define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */ #define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */ #define KNET_HEADER_TYPE_PONG 0x82 /* reply to heartbeat */ #define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */ #define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */ struct knet_header { uint8_t kh_version; /* pckt format/version */ uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */ knet_node_id_t kh_node; /* host id of the source host for this pckt */ uint8_t kh_pad1; /* make sure to have space in the header to grow features */ uint8_t kh_pad2; union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */ } __attribute__((packed)); /* * commodoty defines to hide structure nesting * (needs review and cleanup) */ #define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num #define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num #define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq #define khp_data_userdata kh_payload.khp_data.khp_data_userdata #define khp_data_bcast kh_payload.khp_data.khp_data_bcast #define khp_data_channel kh_payload.khp_data.khp_data_channel #define khp_data_compress kh_payload.khp_data.khp_data_compress #define khp_ping_link kh_payload.khp_ping.khp_ping_link #define khp_ping_time kh_payload.khp_ping.khp_ping_time #define khp_ping_seq_num kh_payload.khp_ping.khp_ping_seq_num #define khp_ping_timed kh_payload.khp_ping.khp_ping_timed #define khp_pmtud_link kh_payload.khp_pmtud.khp_pmtud_link #define khp_pmtud_size kh_payload.khp_pmtud.khp_pmtud_size #define khp_pmtud_data kh_payload.khp_pmtud.khp_pmtud_data /* * extra defines to avoid mingling with sizeof() too much */ #define KNET_HEADER_ALL_SIZE sizeof(struct knet_header) #define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload)) #define KNET_HEADER_PING_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping)) #define KNET_HEADER_PMTUD_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud)) #define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data)) size_t calc_data_outlen(knet_handle_t knet_h, size_t inlen); size_t calc_max_data_outlen(knet_handle_t knet_h, size_t inlen); size_t calc_min_mtu(knet_handle_t knet_h); #endif diff --git a/libknet/tests/pckt_test.c b/libknet/tests/pckt_test.c index 9522c187..30798f3c 100644 --- a/libknet/tests/pckt_test.c +++ b/libknet/tests/pckt_test.c @@ -1,27 +1,23 @@ /* * Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include #include "onwire.h" int main(void) { printf("\nKronosnet network header size printout:\n\n"); printf("KNET_HEADER_ALL_SIZE: %zu\n", KNET_HEADER_ALL_SIZE); printf("KNET_HEADER_SIZE: %zu\n", KNET_HEADER_SIZE); printf("KNET_HEADER_PING_SIZE: %zu (%zu)\n", KNET_HEADER_PING_SIZE, sizeof(struct knet_header_payload_ping)); printf("KNET_HEADER_PMTUD_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_SIZE, sizeof(struct knet_header_payload_pmtud)); printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data)); - printf("\n"); - printf("KNET_HOSTINFO_ALL_SIZE: %zu\n", KNET_HOSTINFO_ALL_SIZE); - printf("KNET_HOSTINFO_SIZE: %zu\n", KNET_HOSTINFO_SIZE); - printf("KNET_HOSTINFO_LINK_STATUS_SIZE: %zu (%zu)\n", KNET_HOSTINFO_LINK_STATUS_SIZE, sizeof(struct knet_hostinfo_payload_link_status)); return 0; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 644c0ad5..ac2f46fc 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,1074 +1,1048 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_rx.h" #include "netutils.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); } defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int err = 0, savederrno = 0, stats_err = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; uint64_t decrypt_time = 0; struct timespec recvtime; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; - struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; seq_num_t recv_seq_num; int wipe_bufs = 0; int try_decrypt = 0, decrypted = 0, i, found_link = 0; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); return; } if (try_decrypt) { struct timespec start_time; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { return; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &decrypt_time); len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; decrypted = 1; } } if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { /* be aware this works only for PING / PONG and PMTUd packets! */ src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } else { /* data packet */ for (i = 0; i < KNET_MAX_LINK; i++) { src_link = &src_host->link[i]; if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { found_link = 1; break; } } if (!found_link) { log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); return; } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { - case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: /* data stats at the top for consistency with TX */ src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (decrypted) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } if (!src_host->status.reachable) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { pthread_mutex_unlock(&src_link->link_stats_mutex); if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { pthread_mutex_unlock(&src_link->link_stats_mutex); return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->khp_data_compress) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, inbuf->khp_data_compress, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, knet_h->recv_from_links_buf_decompress, &decmp_outlen); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (!err) { /* Collect stats */ if (compress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = compress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + compress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE; memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); len = decmp_outlen + KNET_HEADER_DATA_SIZE; } else { pthread_mutex_unlock(&knet_h->handle_stats_mutex); pthread_mutex_unlock(&src_link->link_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { + if (knet_h->enabled != 1) /* data forward is disabled */ + break; - if (knet_h->enabled != 1) /* data forward is disabled */ - break; + if (knet_h->dst_host_filter_fn) { + size_t host_idx; + int found = 0; + + bcast = knet_h->dst_host_filter_fn( + knet_h->dst_host_filter_fn_private_data, + (const unsigned char *)inbuf->khp_data_userdata, + len - KNET_HEADER_DATA_SIZE, + KNET_NOTIFY_RX, + knet_h->host_id, + inbuf->kh_node, + &channel, + dst_host_ids, + &dst_host_ids_entries); + if (bcast < 0) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); + return; + } - if (knet_h->dst_host_filter_fn) { - size_t host_idx; - int found = 0; - - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, - KNET_NOTIFY_RX, - knet_h->host_id, - inbuf->kh_node, - &channel, - dst_host_ids, - &dst_host_ids_entries); - if (bcast < 0) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); - return; - } + if ((!bcast) && (!dst_host_ids_entries)) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); + return; + } - if ((!bcast) && (!dst_host_ids_entries)) { + /* check if we are dst for this packet */ + if (!bcast) { + if (dst_host_ids_entries > KNET_MAX_HOST) { pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); + log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return; } - - /* check if we are dst for this packet */ - if (!bcast) { - if (dst_host_ids_entries > KNET_MAX_HOST) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); - return; - } - for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - if (dst_host_ids[host_idx] == knet_h->host_id) { - found = 1; - break; - } - } - if (!found) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); - return; + for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { + if (dst_host_ids[host_idx] == knet_h->host_id) { + found = 1; + break; } } + if (!found) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); + return; + } } } - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (!knet_h->sockfd[channel].in_use) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, - "received packet for channel %d but there is no local sock connected", - channel); - return; - } + if (!knet_h->sockfd[channel].in_use) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, + "received packet for channel %d but there is no local sock connected", + channel); + return; + } - outlen = 0; - memset(iov_out, 0, sizeof(iov_out)); + outlen = 0; + memset(iov_out, 0, sizeof(iov_out)); retry: - iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; - iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); - - outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); - if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { - log_debug(knet_h, KNET_SUB_RX, - "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", - iov_out[0].iov_len, outlen); - goto retry; - } + iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; + iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); - if (outlen <= 0) { - knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, - knet_h->sockfd[channel].sockfd[0], - channel, - KNET_NOTIFY_RX, - outlen, - errno); - pthread_mutex_unlock(&src_link->link_stats_mutex); - return; - } - if ((size_t)outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - } - } else { /* HOSTINFO */ - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); - } - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - return; - } + outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); + if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { + log_debug(knet_h, KNET_SUB_RX, + "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", + iov_out[0].iov_len, outlen); + goto retry; + } + + if (outlen <= 0) { + knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, + knet_h->sockfd[channel].sockfd[0], + channel, + KNET_NOTIFY_RX, + outlen, + errno); + pthread_mutex_unlock(&src_link->link_stats_mutex); + return; + } + if ((size_t)outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - switch(knet_hostinfo->khi_type) { - case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: - break; - case KNET_HOSTINFO_TYPE_LINK_TABLE: - break; - default: - log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); - break; - } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); recv_seq_num = ntohs(inbuf->khp_ping_seq_num); src_link->status.stats.rx_ping_packets++; src_link->status.stats.rx_ping_bytes += len; wipe_bufs = 0; if (!inbuf->khp_ping_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(src_host, recv_seq_num, 0, 1); } } } if (knet_h->crypto_in_use_config) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); break; } knet_h->stats_extra.tx_crypt_pong_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } retry_pong: if (src_link->transport_connected) { if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pong_errors++; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ src_link->status.stats.tx_pong_retries++; goto retry_pong; break; } } src_link->status.stats.tx_pong_packets++; src_link->status.stats.tx_pong_bytes += outlen; } break; case KNET_HEADER_TYPE_PONG: src_link->status.stats.rx_pong_packets++; src_link->status.stats.rx_pong_bytes += len; clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); if ((latency_last / 1000llu) > src_link->pong_timeout) { log_debug(knet_h, KNET_SUB_RX, "Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding", src_host->host_id, src_link->link_id); } else { /* * in words : ('previous mean' * '(count -1)') + 'new value') / 'count' */ src_link->latency_cur_samples++; /* * limit to max_samples (precision) */ if (src_link->latency_cur_samples >= src_link->latency_max_samples) { src_link->latency_cur_samples = src_link->latency_max_samples; } src_link->status.latency = (((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples); if (src_link->status.latency < src_link->pong_timeout_adj) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0); } else { src_link->received_pong++; log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } /* Calculate latency stats */ if (src_link->status.latency > src_link->status.stats.latency_max) { src_link->status.stats.latency_max = src_link->status.latency; } if (src_link->status.latency < src_link->status.stats.latency_min) { src_link->status.stats.latency_min = src_link->status.latency; } /* * those 2 lines below make all latency average calculations consistent and capped to * link precision. In future we will kill the one above to keep only this one in * the stats structure, but for now we leave it around to avoid API/ABI * breakage as we backport the fixes to stable */ src_link->status.stats.latency_ave = src_link->status.latency; src_link->status.stats.latency_samples = src_link->latency_cur_samples; } break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_in_use_config) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, outlen, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); break; } knet_h->stats_extra.tx_crypt_pmtu_reply_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno)); goto out_pmtud; } retry_pmtud: if (src_link->transport_connected) { if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); } else { len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); } savederrno = errno; if (len != outlen) { err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno); stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); break; } switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); src_link->status.stats.tx_pmtu_errors++; break; case 0: /* ignore error and continue */ src_link->status.stats.tx_pmtu_errors++; break; case 1: /* retry to send those same data */ src_link->status.stats.tx_pmtu_retries++; pthread_mutex_unlock(&src_link->link_stats_mutex); goto retry_pmtud; break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } } pthread_mutex_unlock(&knet_h->tx_mutex); out_pmtud: return; /* Don't need to unlock link_stats_mutex */ case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); return; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ /* * processing incoming packets vs access lists */ if ((knet_h->use_access_lists) && (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } /* * continue processing the other packets */ continue; } } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); memset(&events, 0, sizeof(events)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 615c4266..2f69991b 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,902 +1,880 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0, locked = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; locked = 0; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, cur_link->link_id, strerror(savederrno)); continue; } locked = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } pthread_mutex_unlock(&cur_link->link_stats_mutex); locked = 0; } out_unlock: if (locked) { pthread_mutex_unlock(&cur_link->link_stats_mutex); } errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync) { size_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; - struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; uint8_t frag_idx; unsigned int temp_data_mtu; size_t host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; unsigned int i; int j; int send_local = 0; int data_compressed = 0; size_t uncrypted_frag_size; int stats_locked = 0, stats_err = 0; inbuf = knet_h->recv_from_sock_buf; - if ((knet_h->enabled != 1) && - (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ + if (knet_h->enabled != 1) { log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } if ((!bcast) && (dst_host_ids_entries_temp > KNET_MAX_HOST)) { log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); savederrno = EINVAL; err = -1; goto out_unlock; } } /* Send to localhost if appropriate and enabled */ if (knet_h->has_loop_link) { send_local = 0; if (bcast) { send_local = 1; } else { for (i=0; i< dst_host_ids_entries_temp; i++) { if (dst_host_ids_temp[i] == knet_h->host_id) { send_local = 1; } } } if (send_local) { const unsigned char *buf = inbuf->khp_data_userdata; ssize_t buflen = inlen; struct knet_link *local_link; local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } } } break; - case KNET_HEADER_TYPE_HOST_INFO: - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - bcast = 0; - dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; - dst_host_ids_entries_temp = 1; - knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); - } - break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * compress data */ if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, (const unsigned char *)inbuf->khp_data_userdata, inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); savederrno = errno; stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } stats_locked = 1; /* Collect stats */ clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); if (compress_time < knet_h->stats.tx_compress_time_min) { knet_h->stats.tx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.tx_compress_time_max) { knet_h->stats.tx_compress_time_max = compress_time; } knet_h->stats.tx_compress_time_ave = (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + compress_time) / (knet_h->stats.tx_compressed_packets+1); if (err < 0) { log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { knet_h->stats.tx_compressed_packets++; knet_h->stats.tx_compressed_original_bytes += inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; if (cmp_outlen < inlen) { memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); inlen = cmp_outlen; data_compressed = 1; } } } if (!stats_locked) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } } if (knet_h->compress_model > 0 && !data_compressed) { knet_h->stats.tx_uncompressed_packets++; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); stats_locked = 0; /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (data_compressed) { inbuf->khp_data_compress = knet_h->compress_model; } else { inbuf->khp_data_compress = 0; } if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx][1].iov_len = temp_data_mtu; } else { iov_out[frag_idx][1].iov_len = frag_len; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; frag_len = frag_len - temp_data_mtu; frag_idx++; } iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; iovcnt_out = 1; } if (knet_h->crypto_in_use_config) { struct timespec start_time; struct timespec end_time; uint64_t crypt_time; frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], (ssize_t *)&outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out_unlock; } if (crypt_time < knet_h->stats.tx_crypt_time_min) { knet_h->stats.tx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.tx_crypt_time_max) { knet_h->stats.tx_crypt_time_max = crypt_time; } knet_h->stats.tx_crypt_time_ave = (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + crypt_time) / (knet_h->stats.tx_crypt_packets+1); uncrypted_frag_size = 0; for (j=0; j < iovcnt_out; j++) { uncrypted_frag_size += iov_out[frag_idx][j].iov_len; } knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); knet_h->stats.tx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } iovcnt_out = 1; } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg->msg_iov, 1); } else { inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg->msg_flags & MSG_TRUNC) { log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd); return; } } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); - if (channel != KNET_INTERNAL_DATA_CHANNEL) { - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { - log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); - } else { - knet_h->sockfd[channel].has_error = 1; - } + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { + log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", + knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); + } else { + knet_h->sockfd[channel].has_error = 1; } - /* - * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL - * once we add support for internal knet communication - */ } else { knet_h->recv_from_sock_buf->kh_type = type; _parse_recv_from_sock(knet_h, inlen, channel, 0); } - if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) { + if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev, type; int flush, flush_queue_limit; int8_t channel; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = sizeof(struct sockaddr_storage); msg.msg_iov = &iov_in; msg.msg_iovlen = 1; knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id); for (i = 0; i < PCKT_FRAG_MAX; i++) { knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { - if (events[i].data.fd == knet_h->hostsockfd[0]) { - type = KNET_HEADER_TYPE_HOST_INFO; - channel = KNET_INTERNAL_DATA_CHANNEL; - } else { - type = KNET_HEADER_TYPE_DATA; - for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { - if ((knet_h->sockfd[channel].in_use) && - (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { - break; - } - } - if (channel >= KNET_DATAFD_MAX) { - log_debug(knet_h, KNET_SUB_TX, "No available channels"); - continue; /* channel not found */ + type = KNET_HEADER_TYPE_DATA; + for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { + if ((knet_h->sockfd[channel].in_use) && + (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { + break; } } + if (channel >= KNET_DATAFD_MAX) { + log_debug(knet_h, KNET_SUB_TX, "No available channels"); + continue; /* channel not found */ + } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }