diff --git a/libknet/Makefile.am b/libknet/Makefile.am index 898132b8..6fd9306e 100644 --- a/libknet/Makefile.am +++ b/libknet/Makefile.am @@ -1,80 +1,82 @@ # # Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+, LGPL-2.0+ # MAINTAINERCLEANFILES = Makefile.in include $(top_srcdir)/build-aux/check.mk SYMFILE = libknet_exported_syms EXTRA_DIST = $(SYMFILE) SUBDIRS = . tests libversion = 0:0:0 # override global LIBS that pulls in lots of craft we don't need here LIBS = sources = \ common.c \ compat.c \ crypto.c \ handle.c \ host.c \ link.c \ logging.c \ netutils.c \ nsscrypto.c \ threads_common.c \ threads_dsthandler.c \ threads_heartbeat.c \ threads_pmtud.c \ - threads_send_recv.c \ + threads_rx.c \ + threads_tx.c \ transport_udp.c \ transport_sctp.c \ transport_common.c include_HEADERS = libknet.h pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libknet.pc noinst_HEADERS = \ common.h \ compat.h \ crypto.h \ host.h \ internals.h \ link.h \ logging.h \ netutils.h \ nsscrypto.h \ onwire.h \ threads_common.h \ threads_dsthandler.h \ threads_heartbeat.h \ threads_pmtud.h \ - threads_send_recv.h \ + threads_rx.h \ + threads_tx.h \ transports.h lib_LTLIBRARIES = libknet.la libknet_la_SOURCES = $(sources) libknet_la_CFLAGS = $(nss_CFLAGS) EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE) libknet_la_LDFLAGS = -Wl,--version-script=$(srcdir)/$(SYMFILE) \ --export-dynamic \ -version-number $(libversion) libknet_la_LIBADD = $(nss_LIBS) -lrt -lpthread -lm diff --git a/libknet/handle.c b/libknet/handle.c index 9a0150e2..d50c1f84 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,1413 +1,1414 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "internals.h" #include "crypto.h" #include "compat.h" #include "common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_dsthandler.h" -#include "threads_send_recv.h" +#include "threads_rx.h" +#include "threads_tx.h" #include "transports.h" #include "logging.h" static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER; static int _init_locks(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s", strerror(savederrno)); goto exit_fail; } knet_h->lock_init_done = 1; savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_locks(knet_handle_t knet_h) { knet_h->lock_init_done = 0; pthread_rwlock_destroy(&knet_h->global_rwlock); pthread_mutex_destroy(&knet_h->pmtud_mutex); pthread_cond_destroy(&knet_h->pmtud_cond); pthread_mutex_destroy(&knet_h->hb_mutex); pthread_mutex_destroy(&knet_h->tx_mutex); pthread_mutex_destroy(&knet_h->tx_seq_num_mutex); } static int _init_socks(knet_handle_t knet_h) { int savederrno = 0; if (_init_socketpair(knet_h, knet_h->hostsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, knet_h->dstsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_socks(knet_handle_t knet_h) { _close_socketpair(knet_h, knet_h->dstsockfd); _close_socketpair(knet_h, knet_h->hostsockfd); } static int _init_buffers(knet_handle_t knet_h) { int savederrno = 0; int i; size_t bufsize; for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE; knet_h->send_to_links_buf[i] = malloc(bufsize); if (!knet_h->send_to_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf[i], 0, bufsize); knet_h->recv_from_sock_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_sock_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_sock_buf[i], 0, KNET_DATABUFSIZE); knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE); } knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE); if (!knet_h->pingbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE); knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6); if (!knet_h->pmtudbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6); for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD; knet_h->send_to_links_buf_crypt[i] = malloc(bufsize); if (!knet_h->send_to_links_buf_crypt[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize); } knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_decrypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pingbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pmtudbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker)); return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_buffers(knet_handle_t knet_h) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { free(knet_h->send_to_links_buf[i]); free(knet_h->recv_from_sock_buf[i]); free(knet_h->send_to_links_buf_crypt[i]); free(knet_h->recv_from_links_buf[i]); } free(knet_h->recv_from_links_buf_decrypt); free(knet_h->recv_from_links_buf_crypt); free(knet_h->pingbuf); free(knet_h->pingbuf_crypt); free(knet_h->pmtudbuf); free(knet_h->pmtudbuf_crypt); } static int _init_epolls(knet_handle_t knet_h) { struct epoll_event ev; int savederrno = 0; /* * even if the kernel does dynamic allocation with epoll_ctl * we need to reserve one extra for host to host communication */ knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (knet_h->send_to_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->recv_from_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->dst_link_handler_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->send_to_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->hostsockfd[0]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->dstsockfd[0]; if (epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_epolls(knet_handle_t knet_h) { struct epoll_event ev; int i; memset(&ev, 0, sizeof(struct epoll_event)); for (i = 0; i < KNET_DATAFD_MAX; i++) { if (knet_h->sockfd[i].in_use) { epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev); if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) { _close_socketpair(knet_h, knet_h->sockfd[i].sockfd); } } } epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev); epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev); close(knet_h->send_to_links_epollfd); close(knet_h->recv_from_links_epollfd); close(knet_h->dst_link_handler_epollfd); } static int _start_transports(knet_handle_t knet_h) { int i, savederrno = 0, err = 0; for (i=0; itransport_ops[i] = get_udp_transport(); break; case KNET_TRANSPORT_SCTP: knet_h->transport_ops[i] = get_sctp_transport(); break; } if (knet_h->transport_ops[i]) { if (knet_h->transport_ops[i]->transport_init(knet_h) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Failed to allocate transport handle for %s: %s", knet_h->transport_ops[i]->transport_name, strerror(savederrno)); err = -1; goto out; } } } out: errno = savederrno; return err; } static void _stop_transports(knet_handle_t knet_h) { int i; for (i=0; itransport_ops[i]) { knet_h->transport_ops[i]->transport_free(knet_h); } } } static int _start_threads(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0, _handle_pmtud_link_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0, _handle_dst_link_handler_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->send_to_links_thread, 0, _handle_send_to_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->recv_from_links_thread, 0, _handle_recv_from_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _stop_threads(knet_handle_t knet_h) { void *retval; /* * allow threads to catch on shutdown request * and release locks before we stop them. * this isn't the most efficent way to handle it * but it works good enough for now */ sleep(1); if (knet_h->heartbt_thread) { pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); } if (knet_h->send_to_links_thread) { pthread_cancel(knet_h->send_to_links_thread); pthread_join(knet_h->send_to_links_thread, &retval); } if (knet_h->recv_from_links_thread) { pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); } if (knet_h->dst_link_handler_thread) { pthread_cancel(knet_h->dst_link_handler_thread); pthread_join(knet_h->dst_link_handler_thread, &retval); } pthread_mutex_lock(&knet_h->pmtud_mutex); pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); sleep(1); if (knet_h->pmtud_link_handler_thread) { pthread_cancel(knet_h->pmtud_link_handler_thread); pthread_join(knet_h->pmtud_link_handler_thread, &retval); } } knet_handle_t knet_handle_new(uint16_t host_id, int log_fd, uint8_t default_log_level) { knet_handle_t knet_h; int savederrno = 0; struct rlimit cur; if (getrlimit(RLIMIT_NOFILE, &cur) < 0) { return NULL; } if ((log_fd < 0) || (log_fd >= cur.rlim_max)) { errno = EINVAL; return NULL; } /* * validate incoming request */ if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) { errno = EINVAL; return NULL; } /* * allocate handle */ knet_h = malloc(sizeof(struct knet_handle)); if (!knet_h) { errno = ENOMEM; return NULL; } memset(knet_h, 0, sizeof(struct knet_handle)); savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); errno = savederrno; goto exit_fail; } /* * copy config in place */ knet_h->host_id = host_id; knet_h->logfd = log_fd; if (knet_h->logfd > 0) { memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS); } /* * set pmtud default timers */ knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL; /* * init main locking structures */ if (_init_locks(knet_h)) { savederrno = errno; goto exit_fail; } /* * init sockets */ if (_init_socks(knet_h)) { savederrno = errno; goto exit_fail; } /* * allocate packet buffers */ if (_init_buffers(knet_h)) { savederrno = errno; goto exit_fail; } /* * create epoll fds */ if (_init_epolls(knet_h)) { savederrno = errno; goto exit_fail; } /* * start transports */ if (_start_transports(knet_h)) { savederrno = errno; goto exit_fail; } /* * start internal threads */ if (_start_threads(knet_h)) { savederrno = errno; goto exit_fail; } pthread_mutex_unlock(&handle_config_mutex); return knet_h; exit_fail: pthread_mutex_unlock(&handle_config_mutex); knet_handle_free(knet_h); errno = savederrno; return NULL; } int knet_handle_free(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h) { pthread_mutex_unlock(&handle_config_mutex); errno = EINVAL; return -1; } if (!knet_h->lock_init_done) { goto exit_nolock; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); pthread_mutex_unlock(&handle_config_mutex); errno = savederrno; return -1; } if (knet_h->host_head != NULL) { savederrno = EBUSY; log_err(knet_h, KNET_SUB_HANDLE, "Unable to free handle: host(s) or listener(s) are still active: %s", strerror(savederrno)); pthread_rwlock_unlock(&knet_h->global_rwlock); pthread_mutex_unlock(&handle_config_mutex); errno = savederrno; return -1; } knet_h->fini_in_progress = 1; pthread_rwlock_unlock(&knet_h->global_rwlock); _stop_threads(knet_h); _stop_transports(knet_h); _close_epolls(knet_h); _destroy_buffers(knet_h); _close_socks(knet_h); crypto_fini(knet_h); _destroy_locks(knet_h); exit_nolock: free(knet_h); knet_h = NULL; pthread_mutex_unlock(&handle_config_mutex); return 0; } int knet_handle_enable_sock_notify(knet_handle_t knet_h, void *sock_notify_fn_private_data, void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno)) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!sock_notify_fn) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; knet_h->sock_notify_fn = sock_notify_fn; log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); pthread_rwlock_unlock(&knet_h->global_rwlock); return err; } int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) { int err = 0, savederrno = 0; int i; struct epoll_event ev; if (!knet_h) { errno = EINVAL; return -1; } if (datafd == NULL) { errno = EINVAL; return -1; } if (channel == NULL) { errno = EINVAL; return -1; } if (*channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sock_notify_fn) { log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!"); savederrno = EINVAL; err = -1; goto out_unlock; } if (*datafd > 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) { log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i); savederrno = EEXIST; err = -1; goto out_unlock; } } } /* * auto allocate a channel */ if (*channel < 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if (!knet_h->sockfd[i].in_use) { *channel = i; break; } } if (*channel < 0) { savederrno = EBUSY; err = -1; goto out_unlock; } } else { if (knet_h->sockfd[*channel].in_use) { savederrno = EBUSY; err = -1; goto out_unlock; } } knet_h->sockfd[*channel].is_created = 0; knet_h->sockfd[*channel].is_socket = 0; knet_h->sockfd[*channel].has_error = 0; if (*datafd > 0) { int sockopt; socklen_t sockoptlen = sizeof(sockopt); if (_fdset_cloexec(*datafd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s", strerror(savederrno)); goto out_unlock; } if (_fdset_nonblock(*datafd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s", strerror(savederrno)); goto out_unlock; } knet_h->sockfd[*channel].sockfd[0] = *datafd; knet_h->sockfd[*channel].sockfd[1] = 0; if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) { knet_h->sockfd[*channel].is_socket = 1; } } else { if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) { savederrno = errno; err = -1; goto out_unlock; } knet_h->sockfd[*channel].is_created = 1; knet_h->sockfd[*channel].is_socket = 1; *datafd = knet_h->sockfd[*channel].sockfd[0]; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s", knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno)); if (knet_h->sockfd[*channel].is_created) { _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd); } goto out_unlock; } knet_h->sockfd[*channel].in_use = 1; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) { int err = 0, savederrno = 0; int8_t channel = -1; int i; struct epoll_event ev; if (!knet_h) { errno = EINVAL; return -1; } if (datafd <= 0) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == datafd)) { channel = i; break; } } if (channel < 0) { savederrno = EINVAL; err = -1; goto out_unlock; } if (!knet_h->sockfd[channel].has_error) { memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); goto out_unlock; } } if (knet_h->sockfd[channel].is_created) { _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd); } memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock)); out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd) { int err = 0, savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) { errno = EINVAL; return -1; } if (datafd == NULL) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } *datafd = knet_h->sockfd[channel].sockfd[0]; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel) { int err = 0, savederrno = 0; int i; if (!knet_h) { errno = EINVAL; return -1; } if (datafd <= 0) { errno = EINVAL; return -1; } if (channel == NULL) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *channel = -1; for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == datafd)) { *channel = i; break; } } if (*channel < 0) { savederrno = EINVAL; err = -1; goto out_unlock; } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_enable_filter(knet_handle_t knet_h, void *dst_host_filter_fn_private_data, int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_node_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data; knet_h->dst_host_filter_fn = dst_host_filter_fn; if (knet_h->dst_host_filter_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((enabled < 0) || (enabled > 1)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->enabled = enabled; if (enabled) { log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *interval = knet_h->pmtud_interval; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!interval) || (interval > 86400)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_interval = interval; log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; knet_h->pmtud_notify_fn = pmtud_notify_fn; if (knet_h->pmtud_notify_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!data_mtu) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *data_mtu = knet_h->data_mtu; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) { int savederrno = 0; int err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!knet_handle_crypto_cfg) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } crypto_fini(knet_h); if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled"); err = 0; goto exit_unlock; } if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) { log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %u): %u", KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len); savederrno = EINVAL; err = -1; goto exit_unlock; } if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) { log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %u): %u", KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len); savederrno = EINVAL; err = -1; goto exit_unlock; } err = crypto_init(knet_h, knet_handle_crypto_cfg); if (err) { err = -2; } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 37375346..b3e177aa 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,1302 +1,727 @@ /* - * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include -#include #include #include -#include -#include -#include -#include -#include -#include "crypto.h" #include "compat.h" +#include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" -#include "threads_send_recv.h" +#include "threads_rx.h" #include "netutils.h" -/* - * SEND - */ - -static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out) -{ - int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress; - struct mmsghdr msg[PCKT_FRAG_MAX]; - int err = 0, savederrno = 0; - - memset(&msg, 0, sizeof(struct mmsghdr)); - - for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { - - msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num; - sent_msgs = 0; - prev_sent = 0; - progress = 1; - -retry: - msg_idx = 0; - - while (msg_idx < msgs_to_send) { - memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr)); - msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; - msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent]; - msg[msg_idx].msg_hdr.msg_iovlen = 1; - msg_idx++; - } - - sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, - msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL); - savederrno = errno; - - err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); - switch(err) { - case -1: /* unrecoverable error */ - goto out_unlock; - break; - case 0: /* ignore error and continue */ - break; - case 1: /* retry to send those same data */ - goto retry; - break; - } - - if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) { - if ((sent_msgs) || (progress)) { - msgs_to_send = msg_idx - sent_msgs; - prev_sent = prev_sent + sent_msgs; - if (sent_msgs) { - progress = 1; - } else { - progress = 0; - } - log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", - sent_msgs, msg_idx, - dst_host->name, dst_host->host_id, - dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, - dst_host->link[dst_host->active_links[link_idx]].status.dst_port, - dst_host->link[dst_host->active_links[link_idx]].link_id); - goto retry; - } - if (!progress) { - savederrno = EAGAIN; - err = -1; - goto out_unlock; - } - } - - if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && - (dst_host->active_link_entries > 1)) { - uint8_t cur_link_id = dst_host->active_links[0]; - - memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); - dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; - - break; - } - } - -out_unlock: - errno = savederrno; - return err; -} - -static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) -{ - ssize_t outlen, frag_len; - struct knet_host *dst_host; - uint16_t dst_host_ids_temp[KNET_MAX_HOST]; - size_t dst_host_ids_entries_temp = 0; - uint16_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct knet_hostinfo *knet_hostinfo; - struct iovec iov_out[PCKT_FRAG_MAX]; - uint8_t frag_idx; - unsigned int temp_data_mtu; - int host_idx; - int send_mcast = 0; - struct knet_header *inbuf; - int savederrno = 0; - int err = 0; - seq_num_t tx_seq_num; - - inbuf = knet_h->recv_from_sock_buf[buf_idx]; - - if ((knet_h->enabled != 1) && - (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ - log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); - savederrno = ECANCELED; - err = -1; - goto out_unlock; - } - - /* - * move this into a separate function to expand on - * extra switching rules - */ - switch(inbuf->kh_type) { - case KNET_HEADER_TYPE_DATA: - if (knet_h->dst_host_filter_fn) { - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - inlen, - KNET_NOTIFY_TX, - knet_h->host_id, - knet_h->host_id, - &channel, - dst_host_ids_temp, - &dst_host_ids_entries_temp); - if (bcast < 0) { - log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); - savederrno = EFAULT; - err = -1; - goto out_unlock; - } - - if ((!bcast) && (!dst_host_ids_entries_temp)) { - log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - } - break; - case KNET_HEADER_TYPE_HOST_INFO: - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - bcast = 0; - dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; - dst_host_ids_entries_temp = 1; - knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); - } - break; - default: - log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); - savederrno = ENOMSG; - err = -1; - goto out_unlock; - break; - } - - if (is_sync) { - if ((bcast) || - ((!bcast) && (dst_host_ids_entries_temp > 1))) { - log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); - savederrno = E2BIG; - err = -1; - goto out_unlock; - } - } - - /* - * check destinations hosts before spending time - * in fragmenting/encrypting packets to save - * time processing data for unrechable hosts. - * for unicast, also remap the destination data - * to skip unreachable hosts. - */ - - if (!bcast) { - dst_host_ids_entries = 0; - for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; - if (!dst_host) { - continue; - } - if (dst_host->status.reachable) { - dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; - dst_host_ids_entries++; - } - } - if (!dst_host_ids_entries) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } else { - send_mcast = 0; - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - if (dst_host->status.reachable) { - send_mcast = 1; - break; - } - } - if (!send_mcast) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } - - if (!knet_h->data_mtu) { - /* - * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough - */ - log_debug(knet_h, KNET_SUB_TX, - "Received data packet but data MTU is still unknown." - " Packet might not be delivered." - " Assuming mininum IPv4 mtu (%d)", - KNET_PMTUD_MIN_MTU_V4); - temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; - } else { - /* - * take a copy of the mtu to avoid value changing under - * our feet while we are sending a fragmented pckt - */ - temp_data_mtu = knet_h->data_mtu; - } - - /* - * prepare the outgoing buffers - */ - - frag_len = inlen; - frag_idx = 0; - - inbuf->khp_data_bcast = bcast; - inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); - inbuf->khp_data_channel = channel; - - if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { - log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); - goto out_unlock; - } - knet_h->tx_seq_num++; - /* - * force seq_num 0 to detect a node that has crashed and rejoining - * the knet instance. seq_num 0 will clear the buffers in the RX - * thread - */ - if (knet_h->tx_seq_num == 0) { - knet_h->tx_seq_num++; - } - /* - * cache the value in locked context - */ - tx_seq_num = knet_h->tx_seq_num; - knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num); - pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); - - /* - * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 - * pckts. - * this solves 2 problems: - * 1) on TX socket overloads we generate extra pings to keep links alive - * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, - * node 3+ will be able to keep in sync on the TX seq_num even without - * receiving traffic or pings in betweens. This avoids issues with - * rollover of the circular buffer - */ - - if (tx_seq_num % (SEQ_MAX / 8) == 0) { - _send_pings(knet_h, 0); - } - - while (frag_idx < inbuf->khp_data_frag_num) { - /* - * set the iov_base - */ - iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; - - /* - * set the len - */ - if (frag_len > temp_data_mtu) { - iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; - } else { - iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; - } - - /* - * copy the frag info on all buffers - */ - knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; - - knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; - knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; - - memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, - inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), - iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); - - frag_len = frag_len - temp_data_mtu; - frag_idx++; - } - - if (knet_h->crypto_instance) { - frag_idx = 0; - while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { - if (crypto_encrypt_and_sign( - knet_h, - (const unsigned char *)knet_h->send_to_links_buf[frag_idx], - iov_out[frag_idx].iov_len, - knet_h->send_to_links_buf_crypt[frag_idx], - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); - savederrno = ECHILD; - err = -1; - goto out_unlock; - } - iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; - iov_out[frag_idx].iov_len = outlen; - frag_idx++; - } - } - - if (!bcast) { - for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids[host_idx]]; - - err = _dispatch_to_links(knet_h, dst_host, iov_out); - savederrno = errno; - if (err) { - goto out_unlock; - } - } - } else { - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - if (dst_host->status.reachable) { - err = _dispatch_to_links(knet_h, dst_host, iov_out); - savederrno = errno; - if (err) { - goto out_unlock; - } - } - } - } - -out_unlock: - errno = savederrno; - return err; -} - -int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) -{ - int savederrno = 0, err = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (buff == NULL) { - errno = EINVAL; - return -1; - } - - if (buff_len <= 0) { - errno = EINVAL; - return -1; - } - - if (buff_len > KNET_MAX_PACKET_SIZE) { - errno = EINVAL; - return -1; - } - - if (channel < 0) { - errno = EINVAL; - return -1; - } - - if (channel >= KNET_DATAFD_MAX) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sockfd[channel].in_use) { - savederrno = EINVAL; - err = -1; - goto out; - } - - savederrno = pthread_mutex_lock(&knet_h->tx_mutex); - if (savederrno) { - log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", - strerror(savederrno)); - err = -1; - goto out; - } - - knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; - memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); - err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); - savederrno = errno; - - pthread_mutex_unlock(&knet_h->tx_mutex); - -out: - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = savederrno; - return err; -} - -static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type) -{ - ssize_t inlen = 0; - struct iovec iov_in; - int msg_recv, i; - int savederrno = 0, docallback = 0; - - if ((channel >= 0) && - (channel < KNET_DATAFD_MAX) && - (!knet_h->sockfd[channel].is_socket)) { - memset(&iov_in, 0, sizeof(iov_in)); - iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; - iov_in.iov_len = KNET_MAX_PACKET_SIZE; - - inlen = readv(sockfd, &iov_in, 1); - - if (inlen <= 0) { - savederrno = errno; - docallback = 1; - goto out; - } - - msg_recv = 1; - knet_h->recv_from_sock_buf[0]->kh_type = type; - _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); - } else { - msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); - if (msg_recv < 0) { - inlen = msg_recv; - savederrno = errno; - docallback = 1; - goto out; - } - for (i = 0; i < msg_recv; i++) { - inlen = msg[i].msg_len; - if (inlen == 0) { - savederrno = 0; - docallback = 1; - goto out; - break; - } - knet_h->recv_from_sock_buf[i]->kh_type = type; - _parse_recv_from_sock(knet_h, i, inlen, channel, 0); - } - } - -out: - if (inlen < 0) { - struct epoll_event ev; - - memset(&ev, 0, sizeof(struct epoll_event)); - - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { - log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); - } else { - knet_h->sockfd[channel].has_error = 1; - } - - } - - if (docallback) { - knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, - knet_h->sockfd[channel].sockfd[0], - channel, - KNET_NOTIFY_TX, - inlen, - savederrno); - } -} - -void *_handle_send_to_links_thread(void *data) -{ - knet_handle_t knet_h = (knet_handle_t) data; - struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; - struct sockaddr_storage address[PCKT_FRAG_MAX]; - struct mmsghdr msg[PCKT_FRAG_MAX]; - struct iovec iov_in[PCKT_FRAG_MAX]; - int i, nev, type; - int8_t channel; - - memset(&msg, 0, sizeof(struct mmsghdr)); - - /* preparing data buffer */ - for (i = 0; i < PCKT_FRAG_MAX; i++) { - iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; - iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; - - memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); - - msg[i].msg_hdr.msg_name = &address[i]; - msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - msg[i].msg_hdr.msg_iov = &iov_in[i]; - msg[i].msg_hdr.msg_iovlen = 1; - - knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; - knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; - knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); - - knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; - knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; - knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); - } - - while (!shutdown_in_progress(knet_h)) { - nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); - - if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); - continue; - } - - for (i = 0; i < nev; i++) { - if (events[i].data.fd == knet_h->hostsockfd[0]) { - type = KNET_HEADER_TYPE_HOST_INFO; - channel = -1; - } else { - type = KNET_HEADER_TYPE_DATA; - for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { - if ((knet_h->sockfd[channel].in_use) && - (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { - break; - } - } - } - if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); - continue; - } - _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type); - pthread_mutex_unlock(&knet_h->tx_mutex); - } - pthread_rwlock_unlock(&knet_h->global_rwlock); - } - - return NULL; -} - /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { if (errno == ETIME) { log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); } return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg) { int err = 0, savederrno = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct timespec recvtime; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; struct sockaddr_storage pckt_src; seq_num_t recv_seq_num; int wipe_bufs = 0; if (knet_h->crypto_instance) { if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); return; } len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } if (len < (KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } src_link = NULL; if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { /* * cpyaddrport will only copy address and port of the incoming * packet and strip extra bits such as flow and scopeid */ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), &pckt_src, sockaddr_len(&pckt_src)) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link); } } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: /* * TODO: should we accept data even if we can't reply to the other node? * how would that work with SCTP and guaranteed delivery? */ if (!src_host->status.reachable) { log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id); //return; } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (knet_h->enabled != 1) /* data forward is disabled */ break; if (knet_h->dst_host_filter_fn) { int host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return; } if (outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); } } else { /* HOSTINFO */ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); } if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { return; } _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); switch(knet_hostinfo->khi_type) { case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: break; case KNET_HOSTINFO_TYPE_LINK_TABLE: break; default: log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); break; } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); recv_seq_num = ntohs(inbuf->khp_ping_seq_num); wipe_bufs = 0; if (!inbuf->khp_ping_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(src_host, recv_seq_num, 0, 1); } } } if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } retry_pong: len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); savederrno = errno; if (len != outlen) { err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry_pong; break; } } break; case KNET_HEADER_TYPE_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); src_link->status.latency = ((src_link->status.latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->status.latency < src_link->pong_timeout) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); } else { src_link->received_pong++; log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } break; case KNET_HEADER_TYPE_PMTUD: outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } retry_pmtud: len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); if (len != outlen) { err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry_pmtud; break; } } break; case KNET_HEADER_TYPE_PMTUD_REPLY: if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); break; default: return; } } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_FRAG_MAX; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case -1: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case 0: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case 1: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case 2: /* packet is data and should be parsed as such */ _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; memset(&msg, 0, sizeof(msg)); for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } return NULL; } diff --git a/libknet/threads_rx.h b/libknet/threads_rx.h index fec6e5f1..54f3a6d8 100644 --- a/libknet/threads_rx.h +++ b/libknet/threads_rx.h @@ -1,16 +1,15 @@ /* - * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ -#ifndef __THREADS_SEND_RECV_H__ -#define __THREADS_SEND_RECV_H__ +#ifndef __THREADS_RX_H__ +#define __THREADS_RX_H__ -void *_handle_send_to_links_thread(void *data); void *_handle_recv_from_links_thread(void *data); #endif diff --git a/libknet/threads_send_recv.c b/libknet/threads_send_recv.c deleted file mode 100644 index 37375346..00000000 --- a/libknet/threads_send_recv.c +++ /dev/null @@ -1,1302 +0,0 @@ -/* - * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. - * - * Authors: Fabio M. Di Nitto - * Federico Simoncelli - * - * This software licensed under GPL-2.0+, LGPL-2.0+ - */ - -#include "config.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "crypto.h" -#include "compat.h" -#include "host.h" -#include "link.h" -#include "logging.h" -#include "transports.h" -#include "threads_common.h" -#include "threads_heartbeat.h" -#include "threads_send_recv.h" -#include "netutils.h" - -/* - * SEND - */ - -static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out) -{ - int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress; - struct mmsghdr msg[PCKT_FRAG_MAX]; - int err = 0, savederrno = 0; - - memset(&msg, 0, sizeof(struct mmsghdr)); - - for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { - - msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num; - sent_msgs = 0; - prev_sent = 0; - progress = 1; - -retry: - msg_idx = 0; - - while (msg_idx < msgs_to_send) { - memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr)); - msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; - msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent]; - msg[msg_idx].msg_hdr.msg_iovlen = 1; - msg_idx++; - } - - sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, - msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL); - savederrno = errno; - - err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); - switch(err) { - case -1: /* unrecoverable error */ - goto out_unlock; - break; - case 0: /* ignore error and continue */ - break; - case 1: /* retry to send those same data */ - goto retry; - break; - } - - if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) { - if ((sent_msgs) || (progress)) { - msgs_to_send = msg_idx - sent_msgs; - prev_sent = prev_sent + sent_msgs; - if (sent_msgs) { - progress = 1; - } else { - progress = 0; - } - log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", - sent_msgs, msg_idx, - dst_host->name, dst_host->host_id, - dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, - dst_host->link[dst_host->active_links[link_idx]].status.dst_port, - dst_host->link[dst_host->active_links[link_idx]].link_id); - goto retry; - } - if (!progress) { - savederrno = EAGAIN; - err = -1; - goto out_unlock; - } - } - - if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && - (dst_host->active_link_entries > 1)) { - uint8_t cur_link_id = dst_host->active_links[0]; - - memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); - dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; - - break; - } - } - -out_unlock: - errno = savederrno; - return err; -} - -static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) -{ - ssize_t outlen, frag_len; - struct knet_host *dst_host; - uint16_t dst_host_ids_temp[KNET_MAX_HOST]; - size_t dst_host_ids_entries_temp = 0; - uint16_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct knet_hostinfo *knet_hostinfo; - struct iovec iov_out[PCKT_FRAG_MAX]; - uint8_t frag_idx; - unsigned int temp_data_mtu; - int host_idx; - int send_mcast = 0; - struct knet_header *inbuf; - int savederrno = 0; - int err = 0; - seq_num_t tx_seq_num; - - inbuf = knet_h->recv_from_sock_buf[buf_idx]; - - if ((knet_h->enabled != 1) && - (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ - log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); - savederrno = ECANCELED; - err = -1; - goto out_unlock; - } - - /* - * move this into a separate function to expand on - * extra switching rules - */ - switch(inbuf->kh_type) { - case KNET_HEADER_TYPE_DATA: - if (knet_h->dst_host_filter_fn) { - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - inlen, - KNET_NOTIFY_TX, - knet_h->host_id, - knet_h->host_id, - &channel, - dst_host_ids_temp, - &dst_host_ids_entries_temp); - if (bcast < 0) { - log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); - savederrno = EFAULT; - err = -1; - goto out_unlock; - } - - if ((!bcast) && (!dst_host_ids_entries_temp)) { - log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - } - break; - case KNET_HEADER_TYPE_HOST_INFO: - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - bcast = 0; - dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; - dst_host_ids_entries_temp = 1; - knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); - } - break; - default: - log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); - savederrno = ENOMSG; - err = -1; - goto out_unlock; - break; - } - - if (is_sync) { - if ((bcast) || - ((!bcast) && (dst_host_ids_entries_temp > 1))) { - log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); - savederrno = E2BIG; - err = -1; - goto out_unlock; - } - } - - /* - * check destinations hosts before spending time - * in fragmenting/encrypting packets to save - * time processing data for unrechable hosts. - * for unicast, also remap the destination data - * to skip unreachable hosts. - */ - - if (!bcast) { - dst_host_ids_entries = 0; - for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; - if (!dst_host) { - continue; - } - if (dst_host->status.reachable) { - dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; - dst_host_ids_entries++; - } - } - if (!dst_host_ids_entries) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } else { - send_mcast = 0; - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - if (dst_host->status.reachable) { - send_mcast = 1; - break; - } - } - if (!send_mcast) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } - - if (!knet_h->data_mtu) { - /* - * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough - */ - log_debug(knet_h, KNET_SUB_TX, - "Received data packet but data MTU is still unknown." - " Packet might not be delivered." - " Assuming mininum IPv4 mtu (%d)", - KNET_PMTUD_MIN_MTU_V4); - temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; - } else { - /* - * take a copy of the mtu to avoid value changing under - * our feet while we are sending a fragmented pckt - */ - temp_data_mtu = knet_h->data_mtu; - } - - /* - * prepare the outgoing buffers - */ - - frag_len = inlen; - frag_idx = 0; - - inbuf->khp_data_bcast = bcast; - inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); - inbuf->khp_data_channel = channel; - - if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { - log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); - goto out_unlock; - } - knet_h->tx_seq_num++; - /* - * force seq_num 0 to detect a node that has crashed and rejoining - * the knet instance. seq_num 0 will clear the buffers in the RX - * thread - */ - if (knet_h->tx_seq_num == 0) { - knet_h->tx_seq_num++; - } - /* - * cache the value in locked context - */ - tx_seq_num = knet_h->tx_seq_num; - knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num); - pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); - - /* - * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 - * pckts. - * this solves 2 problems: - * 1) on TX socket overloads we generate extra pings to keep links alive - * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, - * node 3+ will be able to keep in sync on the TX seq_num even without - * receiving traffic or pings in betweens. This avoids issues with - * rollover of the circular buffer - */ - - if (tx_seq_num % (SEQ_MAX / 8) == 0) { - _send_pings(knet_h, 0); - } - - while (frag_idx < inbuf->khp_data_frag_num) { - /* - * set the iov_base - */ - iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; - - /* - * set the len - */ - if (frag_len > temp_data_mtu) { - iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; - } else { - iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; - } - - /* - * copy the frag info on all buffers - */ - knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; - - knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; - knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; - - memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, - inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), - iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); - - frag_len = frag_len - temp_data_mtu; - frag_idx++; - } - - if (knet_h->crypto_instance) { - frag_idx = 0; - while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { - if (crypto_encrypt_and_sign( - knet_h, - (const unsigned char *)knet_h->send_to_links_buf[frag_idx], - iov_out[frag_idx].iov_len, - knet_h->send_to_links_buf_crypt[frag_idx], - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); - savederrno = ECHILD; - err = -1; - goto out_unlock; - } - iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; - iov_out[frag_idx].iov_len = outlen; - frag_idx++; - } - } - - if (!bcast) { - for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids[host_idx]]; - - err = _dispatch_to_links(knet_h, dst_host, iov_out); - savederrno = errno; - if (err) { - goto out_unlock; - } - } - } else { - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - if (dst_host->status.reachable) { - err = _dispatch_to_links(knet_h, dst_host, iov_out); - savederrno = errno; - if (err) { - goto out_unlock; - } - } - } - } - -out_unlock: - errno = savederrno; - return err; -} - -int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) -{ - int savederrno = 0, err = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - if (buff == NULL) { - errno = EINVAL; - return -1; - } - - if (buff_len <= 0) { - errno = EINVAL; - return -1; - } - - if (buff_len > KNET_MAX_PACKET_SIZE) { - errno = EINVAL; - return -1; - } - - if (channel < 0) { - errno = EINVAL; - return -1; - } - - if (channel >= KNET_DATAFD_MAX) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - if (!knet_h->sockfd[channel].in_use) { - savederrno = EINVAL; - err = -1; - goto out; - } - - savederrno = pthread_mutex_lock(&knet_h->tx_mutex); - if (savederrno) { - log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", - strerror(savederrno)); - err = -1; - goto out; - } - - knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; - memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); - err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); - savederrno = errno; - - pthread_mutex_unlock(&knet_h->tx_mutex); - -out: - pthread_rwlock_unlock(&knet_h->global_rwlock); - - errno = savederrno; - return err; -} - -static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type) -{ - ssize_t inlen = 0; - struct iovec iov_in; - int msg_recv, i; - int savederrno = 0, docallback = 0; - - if ((channel >= 0) && - (channel < KNET_DATAFD_MAX) && - (!knet_h->sockfd[channel].is_socket)) { - memset(&iov_in, 0, sizeof(iov_in)); - iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; - iov_in.iov_len = KNET_MAX_PACKET_SIZE; - - inlen = readv(sockfd, &iov_in, 1); - - if (inlen <= 0) { - savederrno = errno; - docallback = 1; - goto out; - } - - msg_recv = 1; - knet_h->recv_from_sock_buf[0]->kh_type = type; - _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); - } else { - msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); - if (msg_recv < 0) { - inlen = msg_recv; - savederrno = errno; - docallback = 1; - goto out; - } - for (i = 0; i < msg_recv; i++) { - inlen = msg[i].msg_len; - if (inlen == 0) { - savederrno = 0; - docallback = 1; - goto out; - break; - } - knet_h->recv_from_sock_buf[i]->kh_type = type; - _parse_recv_from_sock(knet_h, i, inlen, channel, 0); - } - } - -out: - if (inlen < 0) { - struct epoll_event ev; - - memset(&ev, 0, sizeof(struct epoll_event)); - - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { - log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); - } else { - knet_h->sockfd[channel].has_error = 1; - } - - } - - if (docallback) { - knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, - knet_h->sockfd[channel].sockfd[0], - channel, - KNET_NOTIFY_TX, - inlen, - savederrno); - } -} - -void *_handle_send_to_links_thread(void *data) -{ - knet_handle_t knet_h = (knet_handle_t) data; - struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; - struct sockaddr_storage address[PCKT_FRAG_MAX]; - struct mmsghdr msg[PCKT_FRAG_MAX]; - struct iovec iov_in[PCKT_FRAG_MAX]; - int i, nev, type; - int8_t channel; - - memset(&msg, 0, sizeof(struct mmsghdr)); - - /* preparing data buffer */ - for (i = 0; i < PCKT_FRAG_MAX; i++) { - iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; - iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; - - memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); - - msg[i].msg_hdr.msg_name = &address[i]; - msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - msg[i].msg_hdr.msg_iov = &iov_in[i]; - msg[i].msg_hdr.msg_iovlen = 1; - - knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; - knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; - knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); - - knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; - knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; - knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); - } - - while (!shutdown_in_progress(knet_h)) { - nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); - - if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); - continue; - } - - for (i = 0; i < nev; i++) { - if (events[i].data.fd == knet_h->hostsockfd[0]) { - type = KNET_HEADER_TYPE_HOST_INFO; - channel = -1; - } else { - type = KNET_HEADER_TYPE_DATA; - for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { - if ((knet_h->sockfd[channel].in_use) && - (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { - break; - } - } - } - if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); - continue; - } - _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type); - pthread_mutex_unlock(&knet_h->tx_mutex); - } - pthread_rwlock_unlock(&knet_h->global_rwlock); - } - - return NULL; -} - -/* - * RECV - */ - -/* - * return 1 if a > b - * return -1 if b > a - * return 0 if they are equal - */ -static inline int timecmp(struct timespec a, struct timespec b) -{ - if (a.tv_sec != b.tv_sec) { - if (a.tv_sec > b.tv_sec) { - return 1; - } else { - return -1; - } - } else { - if (a.tv_nsec > b.tv_nsec) { - return 1; - } else if (a.tv_nsec < b.tv_nsec) { - return -1; - } else { - return 0; - } - } -} - -/* - * this functions needs to return an index (0 to 7) - * to a knet_host_defrag_buf. (-1 on errors) - */ - -static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) -{ - struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; - int i, oldest; - - /* - * check if there is a buffer already in use handling the same seq_num - */ - for (i = 0; i < KNET_MAX_LINK; i++) { - if (src_host->defrag_buf[i].in_use) { - if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { - return i; - } - } - } - - /* - * If there is no buffer that's handling the current seq_num - * either it's new or it's been reclaimed already. - * check if it's been reclaimed/seen before using the defrag circular - * buffer. If the pckt has been seen before, the buffer expired (ETIME) - * and there is no point to try to defrag it again. - */ - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { - errno = ETIME; - return -1; - } - - /* - * register the pckt as seen - */ - _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); - - /* - * see if there is a free buffer - */ - for (i = 0; i < KNET_MAX_LINK; i++) { - if (!src_host->defrag_buf[i].in_use) { - return i; - } - } - - /* - * at this point, there are no free buffers, the pckt is new - * and we need to reclaim a buffer, and we will take the one - * with the oldest timestamp. It's as good as any. - */ - - oldest = 0; - - for (i = 0; i < KNET_MAX_LINK; i++) { - if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { - oldest = i; - } - } - src_host->defrag_buf[oldest].in_use = 0; - return oldest; -} - -static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) -{ - struct knet_host_defrag_buf *defrag_buf; - int defrag_buf_idx; - - defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); - if (defrag_buf_idx < 0) { - if (errno == ETIME) { - log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); - } - return 1; - } - - defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; - - /* - * if the buf is not is use, then make sure it's clean - */ - if (!defrag_buf->in_use) { - memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); - defrag_buf->in_use = 1; - defrag_buf->pckt_seq = inbuf->khp_data_seq_num; - } - - /* - * update timestamp on the buffer - */ - clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); - - /* - * check if we already received this fragment - */ - if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { - /* - * if we have received this fragment and we didn't clear the buffer - * it means that we don't have all fragments yet - */ - return 1; - } - - /* - * we need to handle the last packet with gloves due to its different size - */ - - if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { - defrag_buf->last_frag_size = *len; - - /* - * in the event when the last packet arrives first, - * we still don't know the offset vs the other fragments (based on MTU), - * so we store the fragment at the end of the buffer where it's safe - * and take a copy of the len so that we can restore its offset later. - * remember we can't use the local MTU for this calculation because pMTU - * can be asymettric between the same hosts. - */ - if (!defrag_buf->frag_size) { - defrag_buf->last_first = 1; - memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), - inbuf->khp_data_userdata, - *len); - } - } else { - defrag_buf->frag_size = *len; - } - - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), - inbuf->khp_data_userdata, *len); - - defrag_buf->frag_recv++; - defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; - - /* - * check if we received all the fragments - */ - if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { - /* - * special case the last pckt - */ - - if (defrag_buf->last_first) { - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), - defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), - defrag_buf->last_frag_size); - } - - /* - * recalculate packet lenght - */ - - *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; - - /* - * copy the pckt back in the user data - */ - memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); - - /* - * free this buffer - */ - defrag_buf->in_use = 0; - return 0; - } - - return 1; -} - -static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg) -{ - int err = 0, savederrno = 0; - ssize_t outlen; - struct knet_host *src_host; - struct knet_link *src_link; - unsigned long long latency_last; - uint16_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct timespec recvtime; - struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; - unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; - ssize_t len = msg->msg_len; - struct knet_hostinfo *knet_hostinfo; - struct iovec iov_out[1]; - int8_t channel; - struct sockaddr_storage pckt_src; - seq_num_t recv_seq_num; - int wipe_bufs = 0; - - if (knet_h->crypto_instance) { - if (crypto_authenticate_and_decrypt(knet_h, - (unsigned char *)inbuf, - len, - knet_h->recv_from_links_buf_decrypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); - return; - } - len = outlen; - inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; - } - - if (len < (KNET_HEADER_SIZE + 1)) { - log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len); - return; - } - - if (inbuf->kh_version != KNET_HEADER_VERSION) { - log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); - return; - } - - inbuf->kh_node = ntohs(inbuf->kh_node); - src_host = knet_h->host_index[inbuf->kh_node]; - if (src_host == NULL) { /* host not found */ - log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); - return; - } - - src_link = NULL; - - if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { - src_link = src_host->link + - (inbuf->khp_ping_link % KNET_MAX_LINK); - if (src_link->dynamic == KNET_LINK_DYNIP) { - /* - * cpyaddrport will only copy address and port of the incoming - * packet and strip extra bits such as flow and scopeid - */ - cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); - - if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), - &pckt_src, sockaddr_len(&pckt_src)) != 0) { - log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", - src_host->host_id, src_link->link_id); - memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); - if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), - src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, - src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); - snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); - snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); - } else { - log_info(knet_h, KNET_SUB_RX, - "host: %u link: %u new connection established from: %s %s", - src_host->host_id, src_link->link_id, - src_link->status.dst_ipaddr, src_link->status.dst_port); - } - } - /* - * transport has already accepted the connection here - * otherwise we would not be receiving packets - */ - knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link); - } - } - - switch (inbuf->kh_type) { - case KNET_HEADER_TYPE_HOST_INFO: - case KNET_HEADER_TYPE_DATA: - /* - * TODO: should we accept data even if we can't reply to the other node? - * how would that work with SCTP and guaranteed delivery? - */ - - if (!src_host->status.reachable) { - log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id); - //return; - } - inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); - channel = inbuf->khp_data_channel; - src_host->got_data = 1; - - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { - log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); - } - return; - } - - if (inbuf->khp_data_frag_num > 1) { - /* - * len as received from the socket also includes extra stuff - * that the defrag code doesn't care about. So strip it - * here and readd only for repadding once we are done - * defragging - */ - len = len - KNET_HEADER_DATA_SIZE; - if (pckt_defrag(knet_h, inbuf, &len)) { - return; - } - len = len + KNET_HEADER_DATA_SIZE; - } - - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (knet_h->enabled != 1) /* data forward is disabled */ - break; - - if (knet_h->dst_host_filter_fn) { - int host_idx; - int found = 0; - - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, - KNET_NOTIFY_RX, - knet_h->host_id, - inbuf->kh_node, - &channel, - dst_host_ids, - &dst_host_ids_entries); - if (bcast < 0) { - log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); - return; - } - - if ((!bcast) && (!dst_host_ids_entries)) { - log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); - return; - } - - /* check if we are dst for this packet */ - if (!bcast) { - for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - if (dst_host_ids[host_idx] == knet_h->host_id) { - found = 1; - break; - } - } - if (!found) { - log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); - return; - } - } - } - } - - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (!knet_h->sockfd[channel].in_use) { - log_debug(knet_h, KNET_SUB_RX, - "received packet for channel %d but there is no local sock connected", - channel); - return; - } - - memset(iov_out, 0, sizeof(iov_out)); - iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; - iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; - - outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); - if (outlen <= 0) { - knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, - knet_h->sockfd[channel].sockfd[0], - channel, - KNET_NOTIFY_RX, - outlen, - errno); - return; - } - if (outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - } - } else { /* HOSTINFO */ - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - bcast = 0; - knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); - } - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - return; - } - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - switch(knet_hostinfo->khi_type) { - case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: - break; - case KNET_HOSTINFO_TYPE_LINK_TABLE: - break; - default: - log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); - break; - } - } - break; - case KNET_HEADER_TYPE_PING: - outlen = KNET_HEADER_PING_SIZE; - inbuf->kh_type = KNET_HEADER_TYPE_PONG; - inbuf->kh_node = htons(knet_h->host_id); - recv_seq_num = ntohs(inbuf->khp_ping_seq_num); - - wipe_bufs = 0; - - if (!inbuf->khp_ping_timed) { - /* - * we might be receiving this message from all links, but we want - * to process it only the first time - */ - if (recv_seq_num != src_host->untimed_rx_seq_num) { - /* - * cache the untimed seq num - */ - src_host->untimed_rx_seq_num = recv_seq_num; - /* - * if the host has received data in between - * untimed ping, then we don't need to wipe the bufs - */ - if (src_host->got_data) { - src_host->got_data = 0; - wipe_bufs = 0; - } else { - wipe_bufs = 1; - } - } - _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); - } else { - /* - * pings always arrives in bursts over all the link - * catch the first of them to cache the seq num and - * avoid duplicate processing - */ - if (recv_seq_num != src_host->timed_rx_seq_num) { - src_host->timed_rx_seq_num = recv_seq_num; - - if (recv_seq_num == 0) { - _seq_num_lookup(src_host, recv_seq_num, 0, 1); - } - } - } - - if (knet_h->crypto_instance) { - if (crypto_encrypt_and_sign(knet_h, - (const unsigned char *)inbuf, - len, - knet_h->recv_from_links_buf_crypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); - break; - } - outbuf = knet_h->recv_from_links_buf_crypt; - } - -retry_pong: - len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *) &src_link->dst_addr, - sizeof(struct sockaddr_storage)); - savederrno = errno; - if (len != outlen) { - err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); - switch(err) { - case -1: /* unrecoverable error */ - log_debug(knet_h, KNET_SUB_RX, - "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", - src_link->outsock, errno, strerror(errno), - src_link->status.src_ipaddr, src_link->status.src_port, - src_link->status.dst_ipaddr, src_link->status.dst_port); - break; - case 0: /* ignore error and continue */ - break; - case 1: /* retry to send those same data */ - goto retry_pong; - break; - } - } - break; - case KNET_HEADER_TYPE_PONG: - clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); - - memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); - timespec_diff(recvtime, - src_link->status.pong_last, &latency_last); - - src_link->status.latency = - ((src_link->status.latency * src_link->latency_exp) + - ((latency_last / 1000llu) * - (src_link->latency_fix - src_link->latency_exp))) / - src_link->latency_fix; - - if (src_link->status.latency < src_link->pong_timeout) { - if (!src_link->status.connected) { - if (src_link->received_pong >= src_link->pong_count) { - log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", - src_host->host_id, src_link->link_id); - _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); - } else { - src_link->received_pong++; - log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", - src_host->host_id, src_link->link_id, src_link->received_pong); - } - } - } - - break; - case KNET_HEADER_TYPE_PMTUD: - outlen = KNET_HEADER_PMTUD_SIZE; - inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; - inbuf->kh_node = htons(knet_h->host_id); - - if (knet_h->crypto_instance) { - if (crypto_encrypt_and_sign(knet_h, - (const unsigned char *)inbuf, - len, - knet_h->recv_from_links_buf_crypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); - break; - } - outbuf = knet_h->recv_from_links_buf_crypt; - } - -retry_pmtud: - len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *) &src_link->dst_addr, - sizeof(struct sockaddr_storage)); - if (len != outlen) { - err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); - switch(err) { - case -1: /* unrecoverable error */ - log_debug(knet_h, KNET_SUB_RX, - "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", - src_link->outsock, errno, strerror(errno), - src_link->status.src_ipaddr, src_link->status.src_port, - src_link->status.dst_ipaddr, src_link->status.dst_port); - break; - case 0: /* ignore error and continue */ - break; - case 1: /* retry to send those same data */ - goto retry_pmtud; - break; - } - } - - break; - case KNET_HEADER_TYPE_PMTUD_REPLY: - if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); - break; - } - src_link->last_recv_mtu = inbuf->khp_pmtud_size; - pthread_cond_signal(&knet_h->pmtud_cond); - pthread_mutex_unlock(&knet_h->pmtud_mutex); - break; - default: - return; - } -} - -static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg) -{ - int err, savederrno; - int i, msg_recv, transport; - - if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); - return; - } - - if (_is_valid_fd(knet_h, sockfd) < 1) { - /* - * this is normal if a fd got an event and before we grab the read lock - * and the link is removed by another thread - */ - goto exit_unlock; - } - - transport = knet_h->knet_transport_fd_tracker[sockfd].transport; - - /* - * reset msg_namelen to buffer size because after recvmmsg - * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 - */ - - for (i = 0; i < PCKT_FRAG_MAX; i++) { - msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - } - - msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); - savederrno = errno; - - /* - * WARNING: man page for recvmmsg is wrong. Kernel implementation here: - * recvmmsg can return: - * -1 on error - * 0 if the previous run of recvmmsg recorded an error on the socket - * N number of messages (see exception below). - * - * If there is an error from recvmsg after receiving a frame or more, the recvmmsg - * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and - * it will be visibile in the next run. - * - * Need to be careful how we handle errors at this stage. - * - * error messages need to be handled on a per transport/protocol base - * at this point we have different layers of error handling - * - msg_recv < 0 -> error from this run - * msg_recv = 0 -> error from previous run and error on socket needs to be cleared - * - per-transport message data - * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, - * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure - * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no - * errno set. That means the error api needs to be able to abort the loop below. - */ - - if (msg_recv <= 0) { - knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno); - goto exit_unlock; - } - - for (i = 0; i < msg_recv; i++) { - err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]); - - /* - * TODO: make this section silent once we are confident - * all protocols packet handlers are good - */ - - switch(err) { - case -1: /* on error */ - log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); - goto exit_unlock; - break; - case 0: /* packet is not data and we should continue the packet process loop */ - log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); - break; - case 1: /* packet is not data and we should STOP the packet process loop */ - log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); - goto exit_unlock; - break; - case 2: /* packet is data and should be parsed as such */ - _parse_recv_from_links(knet_h, sockfd, &msg[i]); - break; - } - } - -exit_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); -} - -void *_handle_recv_from_links_thread(void *data) -{ - int i, nev; - knet_handle_t knet_h = (knet_handle_t) data; - struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; - struct sockaddr_storage address[PCKT_FRAG_MAX]; - struct mmsghdr msg[PCKT_FRAG_MAX]; - struct iovec iov_in[PCKT_FRAG_MAX]; - - memset(&msg, 0, sizeof(msg)); - - for (i = 0; i < PCKT_FRAG_MAX; i++) { - iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; - iov_in[i].iov_len = KNET_DATABUFSIZE; - - memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); - - msg[i].msg_hdr.msg_name = &address[i]; - msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - msg[i].msg_hdr.msg_iov = &iov_in[i]; - msg[i].msg_hdr.msg_iovlen = 1; - } - - while (!shutdown_in_progress(knet_h)) { - nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); - - for (i = 0; i < nev; i++) { - _handle_recv_from_links(knet_h, events[i].data.fd, msg); - } - } - - return NULL; -} diff --git a/libknet/threads_send_recv.h b/libknet/threads_send_recv.h deleted file mode 100644 index fec6e5f1..00000000 --- a/libknet/threads_send_recv.h +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. - * - * Authors: Fabio M. Di Nitto - * Federico Simoncelli - * - * This software licensed under GPL-2.0+, LGPL-2.0+ - */ - -#ifndef __THREADS_SEND_RECV_H__ -#define __THREADS_SEND_RECV_H__ - -void *_handle_send_to_links_thread(void *data); -void *_handle_recv_from_links_thread(void *data); - -#endif diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 37375346..efafd9d8 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,1302 +1,595 @@ /* - * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" -#include +#include #include -#include -#include #include -#include -#include -#include -#include -#include +#include -#include "crypto.h" #include "compat.h" +#include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" -#include "threads_send_recv.h" +#include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out) { int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress; struct mmsghdr msg[PCKT_FRAG_MAX]; int err = 0, savederrno = 0; memset(&msg, 0, sizeof(struct mmsghdr)); for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num; sent_msgs = 0; prev_sent = 0; progress = 1; retry: msg_idx = 0; while (msg_idx < msgs_to_send) { memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr)); msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent]; msg[msg_idx].msg_hdr.msg_iovlen = 1; msg_idx++; } sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry; break; } if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) { if ((sent_msgs) || (progress)) { msgs_to_send = msg_idx - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) { ssize_t outlen, frag_len; struct knet_host *dst_host; uint16_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX]; uint8_t frag_idx; unsigned int temp_data_mtu; int host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; inbuf = knet_h->recv_from_sock_buf[buf_idx]; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unrechable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming mininum IPv4 mtu (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; } else { iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); frag_len = frag_len - temp_data_mtu; frag_idx++; } if (knet_h->crypto_instance) { frag_idx = 0; while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { if (crypto_encrypt_and_sign( knet_h, (const unsigned char *)knet_h->send_to_links_buf[frag_idx], iov_out[frag_idx].iov_len, knet_h->send_to_links_buf_crypt[frag_idx], &outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); savederrno = ECHILD; err = -1; goto out_unlock; } iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx].iov_len = outlen; frag_idx++; } } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, iov_out); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, iov_out); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type) { ssize_t inlen = 0; struct iovec iov_in; int msg_recv, i; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; inlen = readv(sockfd, &iov_in, 1); if (inlen <= 0) { savederrno = errno; docallback = 1; goto out; } msg_recv = 1; knet_h->recv_from_sock_buf[0]->kh_type = type; _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); } else { msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); if (msg_recv < 0) { inlen = msg_recv; savederrno = errno; docallback = 1; goto out; } for (i = 0; i < msg_recv; i++) { inlen = msg[i].msg_len; if (inlen == 0) { savederrno = 0; docallback = 1; goto out; break; } knet_h->recv_from_sock_buf[i]->kh_type = type; _parse_recv_from_sock(knet_h, i, inlen, channel, 0); } } out: if (inlen < 0) { struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, nev, type; int8_t channel; memset(&msg, 0, sizeof(struct mmsghdr)); /* preparing data buffer */ for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = -1; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } - -/* - * RECV - */ - -/* - * return 1 if a > b - * return -1 if b > a - * return 0 if they are equal - */ -static inline int timecmp(struct timespec a, struct timespec b) -{ - if (a.tv_sec != b.tv_sec) { - if (a.tv_sec > b.tv_sec) { - return 1; - } else { - return -1; - } - } else { - if (a.tv_nsec > b.tv_nsec) { - return 1; - } else if (a.tv_nsec < b.tv_nsec) { - return -1; - } else { - return 0; - } - } -} - -/* - * this functions needs to return an index (0 to 7) - * to a knet_host_defrag_buf. (-1 on errors) - */ - -static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) -{ - struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; - int i, oldest; - - /* - * check if there is a buffer already in use handling the same seq_num - */ - for (i = 0; i < KNET_MAX_LINK; i++) { - if (src_host->defrag_buf[i].in_use) { - if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { - return i; - } - } - } - - /* - * If there is no buffer that's handling the current seq_num - * either it's new or it's been reclaimed already. - * check if it's been reclaimed/seen before using the defrag circular - * buffer. If the pckt has been seen before, the buffer expired (ETIME) - * and there is no point to try to defrag it again. - */ - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { - errno = ETIME; - return -1; - } - - /* - * register the pckt as seen - */ - _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); - - /* - * see if there is a free buffer - */ - for (i = 0; i < KNET_MAX_LINK; i++) { - if (!src_host->defrag_buf[i].in_use) { - return i; - } - } - - /* - * at this point, there are no free buffers, the pckt is new - * and we need to reclaim a buffer, and we will take the one - * with the oldest timestamp. It's as good as any. - */ - - oldest = 0; - - for (i = 0; i < KNET_MAX_LINK; i++) { - if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { - oldest = i; - } - } - src_host->defrag_buf[oldest].in_use = 0; - return oldest; -} - -static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) -{ - struct knet_host_defrag_buf *defrag_buf; - int defrag_buf_idx; - - defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); - if (defrag_buf_idx < 0) { - if (errno == ETIME) { - log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); - } - return 1; - } - - defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; - - /* - * if the buf is not is use, then make sure it's clean - */ - if (!defrag_buf->in_use) { - memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); - defrag_buf->in_use = 1; - defrag_buf->pckt_seq = inbuf->khp_data_seq_num; - } - - /* - * update timestamp on the buffer - */ - clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); - - /* - * check if we already received this fragment - */ - if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { - /* - * if we have received this fragment and we didn't clear the buffer - * it means that we don't have all fragments yet - */ - return 1; - } - - /* - * we need to handle the last packet with gloves due to its different size - */ - - if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { - defrag_buf->last_frag_size = *len; - - /* - * in the event when the last packet arrives first, - * we still don't know the offset vs the other fragments (based on MTU), - * so we store the fragment at the end of the buffer where it's safe - * and take a copy of the len so that we can restore its offset later. - * remember we can't use the local MTU for this calculation because pMTU - * can be asymettric between the same hosts. - */ - if (!defrag_buf->frag_size) { - defrag_buf->last_first = 1; - memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), - inbuf->khp_data_userdata, - *len); - } - } else { - defrag_buf->frag_size = *len; - } - - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), - inbuf->khp_data_userdata, *len); - - defrag_buf->frag_recv++; - defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; - - /* - * check if we received all the fragments - */ - if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { - /* - * special case the last pckt - */ - - if (defrag_buf->last_first) { - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), - defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), - defrag_buf->last_frag_size); - } - - /* - * recalculate packet lenght - */ - - *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; - - /* - * copy the pckt back in the user data - */ - memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); - - /* - * free this buffer - */ - defrag_buf->in_use = 0; - return 0; - } - - return 1; -} - -static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg) -{ - int err = 0, savederrno = 0; - ssize_t outlen; - struct knet_host *src_host; - struct knet_link *src_link; - unsigned long long latency_last; - uint16_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct timespec recvtime; - struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; - unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; - ssize_t len = msg->msg_len; - struct knet_hostinfo *knet_hostinfo; - struct iovec iov_out[1]; - int8_t channel; - struct sockaddr_storage pckt_src; - seq_num_t recv_seq_num; - int wipe_bufs = 0; - - if (knet_h->crypto_instance) { - if (crypto_authenticate_and_decrypt(knet_h, - (unsigned char *)inbuf, - len, - knet_h->recv_from_links_buf_decrypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); - return; - } - len = outlen; - inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; - } - - if (len < (KNET_HEADER_SIZE + 1)) { - log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len); - return; - } - - if (inbuf->kh_version != KNET_HEADER_VERSION) { - log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); - return; - } - - inbuf->kh_node = ntohs(inbuf->kh_node); - src_host = knet_h->host_index[inbuf->kh_node]; - if (src_host == NULL) { /* host not found */ - log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); - return; - } - - src_link = NULL; - - if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { - src_link = src_host->link + - (inbuf->khp_ping_link % KNET_MAX_LINK); - if (src_link->dynamic == KNET_LINK_DYNIP) { - /* - * cpyaddrport will only copy address and port of the incoming - * packet and strip extra bits such as flow and scopeid - */ - cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); - - if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), - &pckt_src, sockaddr_len(&pckt_src)) != 0) { - log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", - src_host->host_id, src_link->link_id); - memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); - if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), - src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, - src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); - snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); - snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); - } else { - log_info(knet_h, KNET_SUB_RX, - "host: %u link: %u new connection established from: %s %s", - src_host->host_id, src_link->link_id, - src_link->status.dst_ipaddr, src_link->status.dst_port); - } - } - /* - * transport has already accepted the connection here - * otherwise we would not be receiving packets - */ - knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link); - } - } - - switch (inbuf->kh_type) { - case KNET_HEADER_TYPE_HOST_INFO: - case KNET_HEADER_TYPE_DATA: - /* - * TODO: should we accept data even if we can't reply to the other node? - * how would that work with SCTP and guaranteed delivery? - */ - - if (!src_host->status.reachable) { - log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id); - //return; - } - inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); - channel = inbuf->khp_data_channel; - src_host->got_data = 1; - - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { - log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); - } - return; - } - - if (inbuf->khp_data_frag_num > 1) { - /* - * len as received from the socket also includes extra stuff - * that the defrag code doesn't care about. So strip it - * here and readd only for repadding once we are done - * defragging - */ - len = len - KNET_HEADER_DATA_SIZE; - if (pckt_defrag(knet_h, inbuf, &len)) { - return; - } - len = len + KNET_HEADER_DATA_SIZE; - } - - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (knet_h->enabled != 1) /* data forward is disabled */ - break; - - if (knet_h->dst_host_filter_fn) { - int host_idx; - int found = 0; - - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, - KNET_NOTIFY_RX, - knet_h->host_id, - inbuf->kh_node, - &channel, - dst_host_ids, - &dst_host_ids_entries); - if (bcast < 0) { - log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); - return; - } - - if ((!bcast) && (!dst_host_ids_entries)) { - log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); - return; - } - - /* check if we are dst for this packet */ - if (!bcast) { - for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - if (dst_host_ids[host_idx] == knet_h->host_id) { - found = 1; - break; - } - } - if (!found) { - log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); - return; - } - } - } - } - - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (!knet_h->sockfd[channel].in_use) { - log_debug(knet_h, KNET_SUB_RX, - "received packet for channel %d but there is no local sock connected", - channel); - return; - } - - memset(iov_out, 0, sizeof(iov_out)); - iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; - iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; - - outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); - if (outlen <= 0) { - knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, - knet_h->sockfd[channel].sockfd[0], - channel, - KNET_NOTIFY_RX, - outlen, - errno); - return; - } - if (outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - } - } else { /* HOSTINFO */ - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - bcast = 0; - knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); - } - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - return; - } - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - switch(knet_hostinfo->khi_type) { - case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: - break; - case KNET_HOSTINFO_TYPE_LINK_TABLE: - break; - default: - log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); - break; - } - } - break; - case KNET_HEADER_TYPE_PING: - outlen = KNET_HEADER_PING_SIZE; - inbuf->kh_type = KNET_HEADER_TYPE_PONG; - inbuf->kh_node = htons(knet_h->host_id); - recv_seq_num = ntohs(inbuf->khp_ping_seq_num); - - wipe_bufs = 0; - - if (!inbuf->khp_ping_timed) { - /* - * we might be receiving this message from all links, but we want - * to process it only the first time - */ - if (recv_seq_num != src_host->untimed_rx_seq_num) { - /* - * cache the untimed seq num - */ - src_host->untimed_rx_seq_num = recv_seq_num; - /* - * if the host has received data in between - * untimed ping, then we don't need to wipe the bufs - */ - if (src_host->got_data) { - src_host->got_data = 0; - wipe_bufs = 0; - } else { - wipe_bufs = 1; - } - } - _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); - } else { - /* - * pings always arrives in bursts over all the link - * catch the first of them to cache the seq num and - * avoid duplicate processing - */ - if (recv_seq_num != src_host->timed_rx_seq_num) { - src_host->timed_rx_seq_num = recv_seq_num; - - if (recv_seq_num == 0) { - _seq_num_lookup(src_host, recv_seq_num, 0, 1); - } - } - } - - if (knet_h->crypto_instance) { - if (crypto_encrypt_and_sign(knet_h, - (const unsigned char *)inbuf, - len, - knet_h->recv_from_links_buf_crypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); - break; - } - outbuf = knet_h->recv_from_links_buf_crypt; - } - -retry_pong: - len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *) &src_link->dst_addr, - sizeof(struct sockaddr_storage)); - savederrno = errno; - if (len != outlen) { - err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); - switch(err) { - case -1: /* unrecoverable error */ - log_debug(knet_h, KNET_SUB_RX, - "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", - src_link->outsock, errno, strerror(errno), - src_link->status.src_ipaddr, src_link->status.src_port, - src_link->status.dst_ipaddr, src_link->status.dst_port); - break; - case 0: /* ignore error and continue */ - break; - case 1: /* retry to send those same data */ - goto retry_pong; - break; - } - } - break; - case KNET_HEADER_TYPE_PONG: - clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); - - memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); - timespec_diff(recvtime, - src_link->status.pong_last, &latency_last); - - src_link->status.latency = - ((src_link->status.latency * src_link->latency_exp) + - ((latency_last / 1000llu) * - (src_link->latency_fix - src_link->latency_exp))) / - src_link->latency_fix; - - if (src_link->status.latency < src_link->pong_timeout) { - if (!src_link->status.connected) { - if (src_link->received_pong >= src_link->pong_count) { - log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", - src_host->host_id, src_link->link_id); - _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); - } else { - src_link->received_pong++; - log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", - src_host->host_id, src_link->link_id, src_link->received_pong); - } - } - } - - break; - case KNET_HEADER_TYPE_PMTUD: - outlen = KNET_HEADER_PMTUD_SIZE; - inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; - inbuf->kh_node = htons(knet_h->host_id); - - if (knet_h->crypto_instance) { - if (crypto_encrypt_and_sign(knet_h, - (const unsigned char *)inbuf, - len, - knet_h->recv_from_links_buf_crypt, - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); - break; - } - outbuf = knet_h->recv_from_links_buf_crypt; - } - -retry_pmtud: - len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *) &src_link->dst_addr, - sizeof(struct sockaddr_storage)); - if (len != outlen) { - err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); - switch(err) { - case -1: /* unrecoverable error */ - log_debug(knet_h, KNET_SUB_RX, - "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", - src_link->outsock, errno, strerror(errno), - src_link->status.src_ipaddr, src_link->status.src_port, - src_link->status.dst_ipaddr, src_link->status.dst_port); - break; - case 0: /* ignore error and continue */ - break; - case 1: /* retry to send those same data */ - goto retry_pmtud; - break; - } - } - - break; - case KNET_HEADER_TYPE_PMTUD_REPLY: - if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); - break; - } - src_link->last_recv_mtu = inbuf->khp_pmtud_size; - pthread_cond_signal(&knet_h->pmtud_cond); - pthread_mutex_unlock(&knet_h->pmtud_mutex); - break; - default: - return; - } -} - -static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg) -{ - int err, savederrno; - int i, msg_recv, transport; - - if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); - return; - } - - if (_is_valid_fd(knet_h, sockfd) < 1) { - /* - * this is normal if a fd got an event and before we grab the read lock - * and the link is removed by another thread - */ - goto exit_unlock; - } - - transport = knet_h->knet_transport_fd_tracker[sockfd].transport; - - /* - * reset msg_namelen to buffer size because after recvmmsg - * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 - */ - - for (i = 0; i < PCKT_FRAG_MAX; i++) { - msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - } - - msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); - savederrno = errno; - - /* - * WARNING: man page for recvmmsg is wrong. Kernel implementation here: - * recvmmsg can return: - * -1 on error - * 0 if the previous run of recvmmsg recorded an error on the socket - * N number of messages (see exception below). - * - * If there is an error from recvmsg after receiving a frame or more, the recvmmsg - * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and - * it will be visibile in the next run. - * - * Need to be careful how we handle errors at this stage. - * - * error messages need to be handled on a per transport/protocol base - * at this point we have different layers of error handling - * - msg_recv < 0 -> error from this run - * msg_recv = 0 -> error from previous run and error on socket needs to be cleared - * - per-transport message data - * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, - * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure - * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no - * errno set. That means the error api needs to be able to abort the loop below. - */ - - if (msg_recv <= 0) { - knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno); - goto exit_unlock; - } - - for (i = 0; i < msg_recv; i++) { - err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]); - - /* - * TODO: make this section silent once we are confident - * all protocols packet handlers are good - */ - - switch(err) { - case -1: /* on error */ - log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); - goto exit_unlock; - break; - case 0: /* packet is not data and we should continue the packet process loop */ - log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); - break; - case 1: /* packet is not data and we should STOP the packet process loop */ - log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); - goto exit_unlock; - break; - case 2: /* packet is data and should be parsed as such */ - _parse_recv_from_links(knet_h, sockfd, &msg[i]); - break; - } - } - -exit_unlock: - pthread_rwlock_unlock(&knet_h->global_rwlock); -} - -void *_handle_recv_from_links_thread(void *data) -{ - int i, nev; - knet_handle_t knet_h = (knet_handle_t) data; - struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; - struct sockaddr_storage address[PCKT_FRAG_MAX]; - struct mmsghdr msg[PCKT_FRAG_MAX]; - struct iovec iov_in[PCKT_FRAG_MAX]; - - memset(&msg, 0, sizeof(msg)); - - for (i = 0; i < PCKT_FRAG_MAX; i++) { - iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; - iov_in[i].iov_len = KNET_DATABUFSIZE; - - memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); - - msg[i].msg_hdr.msg_name = &address[i]; - msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - msg[i].msg_hdr.msg_iov = &iov_in[i]; - msg[i].msg_hdr.msg_iovlen = 1; - } - - while (!shutdown_in_progress(knet_h)) { - nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); - - for (i = 0; i < nev; i++) { - _handle_recv_from_links(knet_h, events[i].data.fd, msg); - } - } - - return NULL; -} diff --git a/libknet/threads_tx.h b/libknet/threads_tx.h index fec6e5f1..bf8c1d8f 100644 --- a/libknet/threads_tx.h +++ b/libknet/threads_tx.h @@ -1,16 +1,15 @@ /* - * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ -#ifndef __THREADS_SEND_RECV_H__ -#define __THREADS_SEND_RECV_H__ +#ifndef __THREADS_TX_H__ +#define __THREADS_TX_H__ void *_handle_send_to_links_thread(void *data); -void *_handle_recv_from_links_thread(void *data); #endif