diff --git a/TODO b/TODO index 21d37107..de7f5716 100644 --- a/TODO +++ b/TODO @@ -1,101 +1,104 @@ distro/packaging: - (issue) check why systemd doesn't let me set scheduler priority fixed for systemd unit file, LSB init + systemd still fails https://bugzilla.redhat.com/show_bug.cgi?id=893015 link/host level: + - (issue) review RX thread error code related to guaranteed delivery + (for ex when using SCTP) - (issue) investigate sock_notify_fn in thread_send_recv.c data delivery to app to better handle socket failures. In some cases we need/want to retry delivering to the application (sock overload?) and others the app need to close/remove the socket. I don't think we can automatically do the right thing without asking the app (sock_notify return code). - (issue) transports: review all error handling code failures, specially on clear_config, to not leak and better clean in case of errors - (issue) improve test suite to cover for all transports (needs transport list API) - (issue) change knet_bench to allow protocol specification per link - (issue) review memory locking across - (issue) simplify handling of DATA and HOSTINFO code paths in send/recv code - (issue) need bind to interface for dynamic ip local interfaces vs src ip address or find a way to autodetect the new ip on that interface (listen to kernel netlink?) - (issue) must implement link auth via user/passwd. This is necessary in case key is leaked. - (issue) standardize exit labels (ex out_unlock / exit_unlock) and variable/function names. - (issue) review how TX onwire pckt info are filled in between inbuf/socket/frags + - (rfe) continue improving multinode TX code - (rfe) add fd_tracker error exit check and perhaps use it for local sockets too to make it easier to identify leaks and fd abuse - (rfe) link status callback notification - (rfe) compress: should only compress user data, we will add a bit in the data header to indicate if the pckt is compressed or not (save time). this approach allow runtime change of compress. open questions are: methods? level? zlib? lzo? bz? lzma? xz? how much do we save by compressin our header? compress must happen before encrypt we can express compress data in packet type without adding extra flags to the headers. DATA -> BZ/GZDATA and we can change that right before encrypting. Using a similar approach to PING_MASK - (rfe) crypto: expand API to support dual key for rekey process - (rfe) link id made optional? right now we need the link id to match on both sides of the connection. this is somewhat annoying from a user perspective. Evaluate if we can make it optional. - (rfe) make hostid autogenerated in a consistent way? - (rfe) Check IPV6_NEXTHOP for v6 sockets and find equivalent for v4 (Jesper?) this would allow using one IP address as destination via multiple links - (rfe) add statistics at different levels (pckt per host/link, bytes, crypto overhead, frame overhead, pure data...) - (rfe) link connection access-list (chrissie has working generic code for this one, needs merging and API) - (rfe) improve host-to-host communication. Right now I am not satisfied with the current implementation, even if it works. - (rfe) implement link switching via scoring system based on: 1) latency 2) priority (auto/manual) 3) usage (over XX% traffic start RR) 4) flapping of the links (time/sec) this requires complex rules setting and a super efficent way to look up destination links 5) if links are stable, reduce the number of links in a-a min 2 - (rfe) benchmark tests for all critical paths in switching threads - (rfe) network convergence protocol (host exchange) - (rfe) reswitching of packets - (rfe) look into UDP+ECN bit set to avoid overloading sockets? - (rfe) add openssl support? - (rfe) consider adding threadpools to process data packets in parallel libknet: - (issue) review logging policy/levels in public api call example is scanning for active links in a host that would return a half gazzillion useless log entries - (issue) add .3 man pages libtap: - (issue) add .3 man pages - (issue) improve tests to cover thread safety and better error codes specially from the up/down handling. - (rfe) consider adding dhcp support for tap device it can be done now via up.d/ scripts, but it's not intuitive kronostnetd: - (issue) beside the code that is as bad as it can possibly be and will make you wish to have a tea spoon handy to carve your eyeballs out, the vty needs a good clean/rewrite - (issue) fix config file format. current one will make you scream - (issue) missing output from several command execution failures in vty mode - (issue) fix check_param for ip/prefix/crypto (this is part of the rewrite as it needs more clever arg parsing code/method) - (rfe) add logging config (per subsystem/global) - (rfe) split vty_cmd_files to be smaller. it's just too big to handle nicely. - (rfe) add equivalent of "description: ...." to various levels - (rfe) add optional options. right now it's necessary to specify everything all the time. - (rfe) implement tab completion on options general: - (issue) missing unit tests on many many bits - (issue) missing docs of all kind, devel, users, admin guide. - (rfe) check code with coverity diff --git a/libknet/handle.c b/libknet/handle.c index bbe74665..9a0150e2 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -1,1397 +1,1413 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "internals.h" #include "crypto.h" #include "compat.h" #include "common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_dsthandler.h" #include "threads_send_recv.h" #include "transports.h" #include "logging.h" static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER; static int _init_locks(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s", strerror(savederrno)); goto exit_fail; } knet_h->lock_init_done = 1; savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s", strerror(savederrno)); goto exit_fail; } + savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s", + strerror(savederrno)); + goto exit_fail; + } + savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL); if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s", + log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s", + strerror(savederrno)); + goto exit_fail; + } + + savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_locks(knet_handle_t knet_h) { knet_h->lock_init_done = 0; pthread_rwlock_destroy(&knet_h->global_rwlock); pthread_mutex_destroy(&knet_h->pmtud_mutex); pthread_cond_destroy(&knet_h->pmtud_cond); + pthread_mutex_destroy(&knet_h->hb_mutex); pthread_mutex_destroy(&knet_h->tx_mutex); + pthread_mutex_destroy(&knet_h->tx_seq_num_mutex); } static int _init_socks(knet_handle_t knet_h) { int savederrno = 0; if (_init_socketpair(knet_h, knet_h->hostsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, knet_h->dstsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_socks(knet_handle_t knet_h) { _close_socketpair(knet_h, knet_h->dstsockfd); _close_socketpair(knet_h, knet_h->hostsockfd); } static int _init_buffers(knet_handle_t knet_h) { int savederrno = 0; int i; size_t bufsize; for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE; knet_h->send_to_links_buf[i] = malloc(bufsize); if (!knet_h->send_to_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf[i], 0, bufsize); knet_h->recv_from_sock_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_sock_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_sock_buf[i], 0, KNET_DATABUFSIZE); knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE); if (!knet_h->recv_from_links_buf[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE); } knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE); if (!knet_h->pingbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE); knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6); if (!knet_h->pmtudbuf) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6); for (i = 0; i < PCKT_FRAG_MAX; i++) { bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD; knet_h->send_to_links_buf_crypt[i] = malloc(bufsize); if (!knet_h->send_to_links_buf_crypt[i]) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize); } knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_decrypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->recv_from_links_buf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pingbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT); if (!knet_h->pmtudbuf_crypt) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s", strerror(savederrno)); goto exit_fail; } memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker)); return 0; exit_fail: errno = savederrno; return -1; } static void _destroy_buffers(knet_handle_t knet_h) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { free(knet_h->send_to_links_buf[i]); free(knet_h->recv_from_sock_buf[i]); free(knet_h->send_to_links_buf_crypt[i]); free(knet_h->recv_from_links_buf[i]); } free(knet_h->recv_from_links_buf_decrypt); free(knet_h->recv_from_links_buf_crypt); free(knet_h->pingbuf); free(knet_h->pingbuf_crypt); free(knet_h->pmtudbuf); free(knet_h->pmtudbuf_crypt); } static int _init_epolls(knet_handle_t knet_h) { struct epoll_event ev; int savederrno = 0; /* * even if the kernel does dynamic allocation with epoll_ctl * we need to reserve one extra for host to host communication */ knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (knet_h->send_to_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->recv_from_links_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s", strerror(savederrno)); goto exit_fail; } knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS); if (knet_h->dst_link_handler_epollfd < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->send_to_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->hostsockfd[0]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->dstsockfd[0]; if (epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _close_epolls(knet_handle_t knet_h) { struct epoll_event ev; int i; memset(&ev, 0, sizeof(struct epoll_event)); for (i = 0; i < KNET_DATAFD_MAX; i++) { if (knet_h->sockfd[i].in_use) { epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev); if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) { _close_socketpair(knet_h, knet_h->sockfd[i].sockfd); } } } epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev); epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev); close(knet_h->send_to_links_epollfd); close(knet_h->recv_from_links_epollfd); close(knet_h->dst_link_handler_epollfd); } static int _start_transports(knet_handle_t knet_h) { int i, savederrno = 0, err = 0; for (i=0; itransport_ops[i] = get_udp_transport(); break; case KNET_TRANSPORT_SCTP: knet_h->transport_ops[i] = get_sctp_transport(); break; } if (knet_h->transport_ops[i]) { if (knet_h->transport_ops[i]->transport_init(knet_h) < 0) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Failed to allocate transport handle for %s: %s", knet_h->transport_ops[i]->transport_name, strerror(savederrno)); err = -1; goto out; } } } out: errno = savederrno; return err; } static void _stop_transports(knet_handle_t knet_h) { int i; for (i=0; itransport_ops[i]) { knet_h->transport_ops[i]->transport_free(knet_h); } } } static int _start_threads(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0, _handle_pmtud_link_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0, _handle_dst_link_handler_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->send_to_links_thread, 0, _handle_send_to_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->recv_from_links_thread, 0, _handle_recv_from_links_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&knet_h->heartbt_thread, 0, _handle_heartbt_thread, (void *) knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s", strerror(savederrno)); goto exit_fail; } return 0; exit_fail: errno = savederrno; return -1; } static void _stop_threads(knet_handle_t knet_h) { void *retval; /* * allow threads to catch on shutdown request * and release locks before we stop them. * this isn't the most efficent way to handle it * but it works good enough for now */ sleep(1); if (knet_h->heartbt_thread) { pthread_cancel(knet_h->heartbt_thread); pthread_join(knet_h->heartbt_thread, &retval); } if (knet_h->send_to_links_thread) { pthread_cancel(knet_h->send_to_links_thread); pthread_join(knet_h->send_to_links_thread, &retval); } if (knet_h->recv_from_links_thread) { pthread_cancel(knet_h->recv_from_links_thread); pthread_join(knet_h->recv_from_links_thread, &retval); } if (knet_h->dst_link_handler_thread) { pthread_cancel(knet_h->dst_link_handler_thread); pthread_join(knet_h->dst_link_handler_thread, &retval); } pthread_mutex_lock(&knet_h->pmtud_mutex); pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); sleep(1); if (knet_h->pmtud_link_handler_thread) { pthread_cancel(knet_h->pmtud_link_handler_thread); pthread_join(knet_h->pmtud_link_handler_thread, &retval); } } knet_handle_t knet_handle_new(uint16_t host_id, int log_fd, uint8_t default_log_level) { knet_handle_t knet_h; int savederrno = 0; struct rlimit cur; if (getrlimit(RLIMIT_NOFILE, &cur) < 0) { return NULL; } if ((log_fd < 0) || (log_fd >= cur.rlim_max)) { errno = EINVAL; return NULL; } /* * validate incoming request */ if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) { errno = EINVAL; return NULL; } /* * allocate handle */ knet_h = malloc(sizeof(struct knet_handle)); if (!knet_h) { errno = ENOMEM; return NULL; } memset(knet_h, 0, sizeof(struct knet_handle)); savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); errno = savederrno; goto exit_fail; } /* * copy config in place */ knet_h->host_id = host_id; knet_h->logfd = log_fd; if (knet_h->logfd > 0) { memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS); } /* * set pmtud default timers */ knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL; /* * init main locking structures */ if (_init_locks(knet_h)) { savederrno = errno; goto exit_fail; } /* * init sockets */ if (_init_socks(knet_h)) { savederrno = errno; goto exit_fail; } /* * allocate packet buffers */ if (_init_buffers(knet_h)) { savederrno = errno; goto exit_fail; } /* * create epoll fds */ if (_init_epolls(knet_h)) { savederrno = errno; goto exit_fail; } /* * start transports */ if (_start_transports(knet_h)) { savederrno = errno; goto exit_fail; } /* * start internal threads */ if (_start_threads(knet_h)) { savederrno = errno; goto exit_fail; } pthread_mutex_unlock(&handle_config_mutex); return knet_h; exit_fail: pthread_mutex_unlock(&handle_config_mutex); knet_handle_free(knet_h); errno = savederrno; return NULL; } int knet_handle_free(knet_handle_t knet_h) { int savederrno = 0; savederrno = pthread_mutex_lock(&handle_config_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h) { pthread_mutex_unlock(&handle_config_mutex); errno = EINVAL; return -1; } if (!knet_h->lock_init_done) { goto exit_nolock; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); pthread_mutex_unlock(&handle_config_mutex); errno = savederrno; return -1; } if (knet_h->host_head != NULL) { savederrno = EBUSY; log_err(knet_h, KNET_SUB_HANDLE, "Unable to free handle: host(s) or listener(s) are still active: %s", strerror(savederrno)); pthread_rwlock_unlock(&knet_h->global_rwlock); pthread_mutex_unlock(&handle_config_mutex); errno = savederrno; return -1; } knet_h->fini_in_progress = 1; pthread_rwlock_unlock(&knet_h->global_rwlock); _stop_threads(knet_h); _stop_transports(knet_h); _close_epolls(knet_h); _destroy_buffers(knet_h); _close_socks(knet_h); crypto_fini(knet_h); _destroy_locks(knet_h); exit_nolock: free(knet_h); knet_h = NULL; pthread_mutex_unlock(&handle_config_mutex); return 0; } int knet_handle_enable_sock_notify(knet_handle_t knet_h, void *sock_notify_fn_private_data, void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno)) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!sock_notify_fn) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; knet_h->sock_notify_fn = sock_notify_fn; log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); pthread_rwlock_unlock(&knet_h->global_rwlock); return err; } int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) { int err = 0, savederrno = 0; int i; struct epoll_event ev; if (!knet_h) { errno = EINVAL; return -1; } if (datafd == NULL) { errno = EINVAL; return -1; } if (channel == NULL) { errno = EINVAL; return -1; } if (*channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sock_notify_fn) { log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!"); savederrno = EINVAL; err = -1; goto out_unlock; } if (*datafd > 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) { log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i); savederrno = EEXIST; err = -1; goto out_unlock; } } } /* * auto allocate a channel */ if (*channel < 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if (!knet_h->sockfd[i].in_use) { *channel = i; break; } } if (*channel < 0) { savederrno = EBUSY; err = -1; goto out_unlock; } } else { if (knet_h->sockfd[*channel].in_use) { savederrno = EBUSY; err = -1; goto out_unlock; } } knet_h->sockfd[*channel].is_created = 0; knet_h->sockfd[*channel].is_socket = 0; knet_h->sockfd[*channel].has_error = 0; if (*datafd > 0) { int sockopt; socklen_t sockoptlen = sizeof(sockopt); if (_fdset_cloexec(*datafd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s", strerror(savederrno)); goto out_unlock; } if (_fdset_nonblock(*datafd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s", strerror(savederrno)); goto out_unlock; } knet_h->sockfd[*channel].sockfd[0] = *datafd; knet_h->sockfd[*channel].sockfd[1] = 0; if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) { knet_h->sockfd[*channel].is_socket = 1; } } else { if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) { savederrno = errno; err = -1; goto out_unlock; } knet_h->sockfd[*channel].is_created = 1; knet_h->sockfd[*channel].is_socket = 1; *datafd = knet_h->sockfd[*channel].sockfd[0]; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created]; if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s", knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno)); if (knet_h->sockfd[*channel].is_created) { _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd); } goto out_unlock; } knet_h->sockfd[*channel].in_use = 1; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) { int err = 0, savederrno = 0; int8_t channel = -1; int i; struct epoll_event ev; if (!knet_h) { errno = EINVAL; return -1; } if (datafd <= 0) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == datafd)) { channel = i; break; } } if (channel < 0) { savederrno = EINVAL; err = -1; goto out_unlock; } if (!knet_h->sockfd[channel].has_error) { memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); goto out_unlock; } } if (knet_h->sockfd[channel].is_created) { _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd); } memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock)); out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd) { int err = 0, savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) { errno = EINVAL; return -1; } if (datafd == NULL) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } *datafd = knet_h->sockfd[channel].sockfd[0]; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel) { int err = 0, savederrno = 0; int i; if (!knet_h) { errno = EINVAL; return -1; } if (datafd <= 0) { errno = EINVAL; return -1; } if (channel == NULL) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *channel = -1; for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == datafd)) { *channel = i; break; } } if (*channel < 0) { savederrno = EINVAL; err = -1; goto out_unlock; } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_handle_enable_filter(knet_handle_t knet_h, void *dst_host_filter_fn_private_data, int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_node_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data; knet_h->dst_host_filter_fn = dst_host_filter_fn; if (knet_h->dst_host_filter_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((enabled < 0) || (enabled > 1)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->enabled = enabled; if (enabled) { log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *interval = knet_h->pmtud_interval; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!interval) || (interval > 86400)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_interval = interval; log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval); pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data; knet_h->pmtud_notify_fn = pmtud_notify_fn; if (knet_h->pmtud_notify_fn) { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!data_mtu) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *data_mtu = knet_h->data_mtu; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) { int savederrno = 0; int err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!knet_handle_crypto_cfg) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } crypto_fini(knet_h); if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) && (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) { log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled"); err = 0; goto exit_unlock; } if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) { log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %u): %u", KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len); savederrno = EINVAL; err = -1; goto exit_unlock; } if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) { log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %u): %u", KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len); savederrno = EINVAL; err = -1; goto exit_unlock; } err = crypto_init(knet_h, knet_handle_crypto_cfg); if (err) { err = -2; } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/host.c b/libknet/host.c index c3d9edbf..0370b467 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -1,759 +1,698 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "host.h" #include "internals.h" #include "logging.h" static void _host_list_update(knet_handle_t knet_h) { struct knet_host *host; knet_h->host_ids_entries = 0; for (host = knet_h->host_head; host != NULL; host = host->next) { knet_h->host_ids[knet_h->host_ids_entries] = host->host_id; knet_h->host_ids_entries++; } } int knet_host_add(knet_handle_t knet_h, uint16_t host_id) { int savederrno = 0, err = 0; struct knet_host *host = NULL; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (knet_h->host_index[host_id]) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } host = malloc(sizeof(struct knet_host)); if (!host) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memset(host, 0, sizeof(struct knet_host)); /* * set host_id */ host->host_id = host_id; /* * set default host->name to host_id for logging */ snprintf(host->name, KNET_MAX_HOST_LEN - 1, "%u", host_id); /* * initialize links internal data */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { host->link[link_idx].link_id = link_idx; } /* * add new host to the index */ knet_h->host_index[host_id] = host; /* * add new host to host list */ if (knet_h->host_head) { host->next = knet_h->host_head; } knet_h->host_head = host; _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); if (err < 0) { free(host); } errno = savederrno; return err; } int knet_host_remove(knet_handle_t knet_h, uint16_t host_id) { int savederrno = 0, err = 0; struct knet_host *host, *removed; uint8_t link_idx; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } /* * if links are configured we cannot release the host */ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].configured) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s", host_id, strerror(savederrno)); goto exit_unlock; } } removed = NULL; /* * removing host from list */ if (knet_h->host_head->host_id == host_id) { removed = knet_h->host_head; knet_h->host_head = removed->next; } else { for (host = knet_h->host_head; host->next != NULL; host = host->next) { if (host->next->host_id == host_id) { removed = host->next; host->next = removed->next; break; } } } knet_h->host_index[host_id] = NULL; free(removed); _host_list_update(knet_h); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_set_name(knet_handle_t knet_h, uint16_t host_id, const char *name) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (!name) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (strlen(name) >= KNET_MAX_HOST_LEN) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(host->name, name, KNET_MAX_HOST_LEN - 1)) { err = -1; savederrno = EEXIST; log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u", host->host_id); goto exit_unlock; } } snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN - 1, "%s", name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_name_by_host_id(knet_handle_t knet_h, uint16_t host_id, char *name) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { savederrno = EINVAL; err = -1; log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id); goto exit_unlock; } snprintf(name, KNET_MAX_HOST_LEN - 1, "%s", knet_h->host_index[host_id]->name); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, uint16_t *host_id) { int savederrno = 0, err = 0, found = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!name) { errno = EINVAL; return -1; } if (!host_id) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } for (host = knet_h->host_head; host != NULL; host = host->next) { if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) { found = 1; *host_id = host->host_id; break; } } if (!found) { savederrno = ENOENT; err = -1; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_host_list(knet_handle_t knet_h, uint16_t *host_ids, size_t *host_ids_entries) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if ((!host_ids) || (!host_ids_entries)) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids)); *host_ids_entries = knet_h->host_ids_entries; pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_set_policy(knet_handle_t knet_h, uint16_t host_id, uint8_t policy) { int savederrno = 0, err = 0; uint8_t old_policy; if (!knet_h) { errno = EINVAL; return -1; } if (policy > KNET_LINK_POLICY_RR) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } old_policy = knet_h->host_index[host_id]->link_handler_policy; knet_h->host_index[host_id]->link_handler_policy = policy; if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) { savederrno = errno; err = -1; knet_h->host_index[host_id]->link_handler_policy = old_policy; log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s", host_id, strerror(savederrno)); } + log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy); + exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_policy(knet_handle_t knet_h, uint16_t host_id, uint8_t *policy) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (!policy) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->host_index[host_id]) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } *policy = knet_h->host_index[host_id]->link_handler_policy; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_get_status(knet_handle_t knet_h, uint16_t host_id, struct knet_host_status *status) { int savederrno = 0, err = 0; struct knet_host *host; if (!knet_h) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &host->status, sizeof(struct knet_host_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, uint16_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data; knet_h->host_status_change_notify_fn = host_status_change_notify_fn; if (knet_h->host_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen) { if (knet_h->fini_in_progress) { return 0; } if (sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != datalen) { log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe"); return -1; } return 0; } +static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) +{ + int i; + + memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE); + host->rx_seq_num = rx_seq_num; + + memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); + + for (i = 0; i < KNET_MAX_LINK; i++) { + memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); + } +} + /* * check if a given packet seq num is in the circular buffers - * bcast = 0 -> unicast packet | 1 -> broadcast|mcast * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup */ -int _seq_num_lookup(struct knet_host *host, int bcast, seq_num_t seq_num, int defrag_buf) +int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) { size_t i, j; /* circular buffer indexes */ seq_num_t seq_dist; - char *dst_cbuf = NULL; - char *dst_cbuf_defrag = NULL; - seq_num_t *dst_seq_num; - - if (bcast) { - dst_cbuf = host->bcast_circular_buffer; - dst_cbuf_defrag = host->bcast_circular_buffer_defrag; - dst_seq_num = &host->bcast_seq_num_rx; - } else { - dst_cbuf = host->ucast_circular_buffer; - dst_cbuf_defrag = host->ucast_circular_buffer_defrag; - dst_seq_num = &host->ucast_seq_num_rx; + char *dst_cbuf = host->circular_buffer; + char *dst_cbuf_defrag = host->circular_buffer_defrag; + seq_num_t *dst_seq_num = &host->rx_seq_num; + + if (clear_buf) { + _clear_cbuffers(host, seq_num); } if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { seq_dist = *dst_seq_num - seq_num; } j = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { return (dst_cbuf[j] == 0) ? 1 : 0; } else { return (dst_cbuf_defrag[j] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE); *dst_seq_num = seq_num; } /* cleaning up circular buffer */ i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; if (i > j) { memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i); memset(dst_cbuf, 0, j + 1); memset(dst_cbuf_defrag + i, 0, KNET_CBUFFER_SIZE - i); memset(dst_cbuf_defrag, 0, j + 1); } else { memset(dst_cbuf + i, 0, j - i + 1); memset(dst_cbuf_defrag + i, 0, j - i + 1); } *dst_seq_num = seq_num; return 1; } -void _seq_num_set(struct knet_host *host, int bcast, seq_num_t seq_num, int defrag_buf) +void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf) { if (!defrag_buf) { - if (bcast) { - host->bcast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; - } else { - host->ucast_circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; - } + host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1; } else { - if (bcast) { - host->bcast_circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; - } else { - host->ucast_circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; - } + host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1; } return; } int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host) { int savederrno = 0; uint16_t host_id = host->host_id; if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) { savederrno = errno; log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s", strerror(savederrno)); errno = savederrno; return -1; } return 0; } -static void _clear_cbuffers(struct knet_host *host) -{ - int i; - - memset(host->bcast_circular_buffer, 0, KNET_CBUFFER_SIZE); - memset(host->ucast_circular_buffer, 0, KNET_CBUFFER_SIZE); - host->bcast_seq_num_rx = 0; - host->ucast_seq_num_rx = 0; - - memset(host->bcast_circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); - memset(host->ucast_circular_buffer_defrag, 0, KNET_CBUFFER_SIZE); - - for (i = 0; i < KNET_MAX_LINK; i++) { - memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf)); - } -} - int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host) { int link_idx; int best_priority = -1; - int send_link_idx = 0; - uint8_t send_link_status[KNET_MAX_LINK]; - int clear_cbuffer = 0; - int host_has_remote = 0; int reachable = 0; host->active_link_entries = 0; for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (host->link[link_idx].status.enabled != 1) /* link is not enabled */ continue; - if (host->link[link_idx].remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) /* track if remote is connected */ - host_has_remote = 1; if (host->link[link_idx].status.connected != 1) /* link is not enabled */ continue; if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */ continue; - if ((!host->link[link_idx].host_info_up_sent) && - (!host->link[link_idx].donnotremoteupdate)) { - send_link_status[send_link_idx] = link_idx; - send_link_idx++; - /* - * detect node coming back to life and reset the buffers - */ - if (host->link[link_idx].remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) { - clear_cbuffer = 1; - } - } - if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { /* for passive we look for the only active link with higher priority */ if (host->link[link_idx].priority > best_priority) { host->active_links[0] = link_idx; best_priority = host->link[link_idx].priority; } host->active_link_entries = 1; } else { /* for RR and ACTIVE we need to copy all available links */ host->active_links[host->active_link_entries] = link_idx; host->active_link_entries++; } } if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) { log_debug(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)", host->host_id, host->link[host->active_links[0]].link_id, host->link[host->active_links[0]].priority); } else { log_debug(knet_h, KNET_SUB_HOST, "host: %u has %u active links", host->host_id, host->active_link_entries); } /* no active links, we can clean the circular buffers and indexes */ - if ((!host->active_link_entries) || (clear_cbuffer) || (!host_has_remote)) { - if (!host_has_remote) { - log_debug(knet_h, KNET_SUB_HOST, "host: %u has no active remote links", host->host_id); - } - if (!host->active_link_entries) { - log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); - } - if (clear_cbuffer) { - log_debug(knet_h, KNET_SUB_HOST, "host: %u is coming back to life", host->host_id); - } - _clear_cbuffers(host); - } - - if (send_link_idx) { - int i; - struct knet_hostinfo knet_hostinfo; - - knet_hostinfo.khi_type = KNET_HOSTINFO_TYPE_LINK_UP_DOWN; - knet_hostinfo.khi_bcast = KNET_HOSTINFO_UCAST; - knet_hostinfo.khi_dst_node_id = host->host_id; - knet_hostinfo.khip_link_status_status = KNET_HOSTINFO_LINK_STATUS_UP; - - for (i=0; i < send_link_idx; i++) { - knet_hostinfo.khip_link_status_link_id = send_link_status[i]; - _send_host_info(knet_h, &knet_hostinfo, KNET_HOSTINFO_LINK_STATUS_SIZE); - host->link[send_link_status[i]].host_info_up_sent = 1; - host->link[send_link_status[i]].donnotremoteupdate = 0; - } - } - - if (host->active_link_entries) { + if (!host->active_link_entries) { + log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id); + _clear_cbuffers(host, 0); + } else { reachable = 1; } if (host->status.reachable != reachable) { host->status.reachable = reachable; if (knet_h->host_status_change_notify_fn) { knet_h->host_status_change_notify_fn( knet_h->host_status_change_notify_fn_private_data, host->host_id, host->status.reachable, host->status.remote, host->status.external); } } return 0; } diff --git a/libknet/host.h b/libknet/host.h index ce58c23f..2d6aa007 100644 --- a/libknet/host.h +++ b/libknet/host.h @@ -1,22 +1,22 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __HOST_H__ #define __HOST_H__ #include "internals.h" -int _seq_num_lookup(struct knet_host *host, int bcast, seq_num_t seq_num, int defrag_buf); -void _seq_num_set(struct knet_host *host, int bcast, seq_num_t seq_num, int defrag_buf); +int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf); +void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf); int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen); int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host); int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host); #endif diff --git a/libknet/internals.h b/libknet/internals.h index 5d8a2ef2..60cb0512 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,464 +1,462 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __INTERNALS_H__ #define __INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include "libknet.h" #include "onwire.h" #include "compat.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned int latency_fix; /* precision */ uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ /* status */ struct knet_link_status status; /* internals */ uint8_t link_id; uint8_t transport_type; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ - unsigned int remoteconnected:1; /* link is enabled for data (peer view) */ - unsigned int donnotremoteupdate:1; /* define source of the update */ - unsigned int host_info_up_sent:1; /* 0 if we need to notify remote that link is up */ unsigned int latency_exp; uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint8_t has_valid_mtu; }; #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { - char buf[KNET_MAX_PACKET_SIZE]; + char buf[KNET_DATABUFSIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ uint16_t frag_size; /* normal frag size (not the last one) */ uint16_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ uint16_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* internals */ - char bcast_circular_buffer[KNET_CBUFFER_SIZE]; - seq_num_t bcast_seq_num_rx; - char ucast_circular_buffer[KNET_CBUFFER_SIZE]; - seq_num_t ucast_seq_num_tx; - seq_num_t ucast_seq_num_rx; - /* defrag/(reassembly buffers */ + char circular_buffer[KNET_CBUFFER_SIZE]; + seq_num_t rx_seq_num; + seq_num_t untimed_rx_seq_num; + seq_num_t timed_rx_seq_num; + uint8_t got_data; + /* defrag/reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK]; - char bcast_circular_buffer_defrag[KNET_CBUFFER_SIZE]; - char ucast_circular_buffer_defrag[KNET_CBUFFER_SIZE]; + char circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * to this fd */ void *data; /* pointer to the data */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 struct knet_handle { uint16_t host_id; unsigned int enabled:1; struct knet_sock sockfd[KNET_DATAFD_MAX]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; unsigned int pmtud_interval; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_transport_ops *transport_ops[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ uint16_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf[PCKT_FRAG_MAX]; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_FRAG_MAX]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; int lock_init_done; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ - pthread_mutex_t tx_mutex; + pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */ + pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */ struct crypto_instance *crypto_instance; uint16_t sec_header_size; uint16_t sec_block_size; uint16_t sec_hash_size; uint16_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; - seq_num_t bcast_seq_num_tx; + seq_num_t tx_seq_num; + pthread_mutex_t tx_seq_num_mutex; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_node_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, uint16_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); int fini_in_progress; }; /* * NOTE: every single operation must be implementend * for every protocol. */ typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; const uint8_t transport_id; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * transport callback for incoming dynamic connections * this is called in global read lock context */ int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rdlock */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * transport_tx_sock_error is invoked with global_rwlock and * it's invoked when sendto or sendmmsg returns =< 0 * * it should return: * -1 on internal error * 0 ignore error and continue * 1 retry * any sleep or wait action should happen inside the transport code */ int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg); } knet_transport_ops_t; socklen_t sockaddr_len(const struct sockaddr_storage *ss); /** * This is a kernel style list implementation. * * @author Steven Dake */ struct knet_list_head { struct knet_list_head *next; struct knet_list_head *prev; }; /** * @def KNET_LIST_DECLARE() * Declare and initialize a list head. */ #define KNET_LIST_DECLARE(name) \ struct knet_list_head name = { &(name), &(name) } #define KNET_INIT_LIST_HEAD(ptr) do { \ (ptr)->next = (ptr); (ptr)->prev = (ptr); \ } while (0) /** * Initialize the list entry. * * Points next and prev pointers to head. * @param head pointer to the list head */ static inline void knet_list_init(struct knet_list_head *head) { head->next = head; head->prev = head; } /** * Add this element to the list. * * @param element the new element to insert. * @param head pointer to the list head */ static inline void knet_list_add(struct knet_list_head *element, struct knet_list_head *head) { head->next->prev = element; element->next = head->next; element->prev = head; head->next = element; } /** * Add to the list (but at the end of the list). * * @param element pointer to the element to add * @param head pointer to the list head * @see knet_list_add() */ static inline void knet_list_add_tail(struct knet_list_head *element, struct knet_list_head *head) { head->prev->next = element; element->next = head; element->prev = head->prev; head->prev = element; } /** * Delete an entry from the list. * * @param _remove the list item to remove */ static inline void knet_list_del(struct knet_list_head *_remove) { _remove->next->prev = _remove->prev; _remove->prev->next = _remove->next; } /** * Replace old entry by new one * @param old: the element to be replaced * @param new: the new element to insert */ static inline void knet_list_replace(struct knet_list_head *old, struct knet_list_head *new) { new->next = old->next; new->next->prev = new; new->prev = old->prev; new->prev->next = new; } /** * Tests whether list is the last entry in list head * @param list: the entry to test * @param head: the head of the list * @return boolean true/false */ static inline int knet_list_is_last(const struct knet_list_head *list, const struct knet_list_head *head) { return list->next == head; } /** * A quick test to see if the list is empty (pointing to it's self). * @param head pointer to the list head * @return boolean true/false */ static inline int32_t knet_list_empty(const struct knet_list_head *head) { return head->next == head; } /** * Get the struct for this entry * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_entry(ptr,type,member)\ ((type *)((char *)(ptr)-(char*)(&((type *)0)->member))) /** * Get the first element from a list * @param ptr: the &struct list_head pointer. * @param type: the type of the struct this is embedded in. * @param member: the name of the list_struct within the struct. */ #define knet_list_first_entry(ptr, type, member) \ knet_list_entry((ptr)->next, type, member) /** * Iterate over a list * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) /** * Iterate over a list backwards * @param pos: the &struct list_head to use as a loop counter. * @param head: the head for your list. */ #define knet_list_for_each_reverse(pos, head) \ for (pos = (head)->prev; pos != (head); pos = pos->prev) /** * Iterate over a list safe against removal of list entry * @param pos: the &struct list_head to use as a loop counter. * @param n: another &struct list_head to use as temporary storage * @param head: the head for your list. */ #define knet_list_for_each_safe(pos, n, head) \ for (pos = (head)->next, n = pos->next; pos != (head); \ pos = n, n = pos->next) /** * Iterate over list of given type * @param pos: the type * to use as a loop counter. * @param head: the head for your list. * @param member: the name of the list_struct within the struct. */ #define knet_list_for_each_entry(pos, head, member) \ for (pos = knet_list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = knet_list_entry(pos->member.next, typeof(*pos), member)) #endif diff --git a/libknet/link.c b/libknet/link.c index 1437afaa..a0ae60d9 100644 --- a/libknet/link.c +++ b/libknet/link.c @@ -1,993 +1,973 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "internals.h" #include "logging.h" #include "link.h" #include "transports.h" #include "host.h" int _link_updown(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, unsigned int enabled, unsigned int connected) { struct knet_link *link = &knet_h->host_index[host_id]->link[link_id]; if ((link->status.enabled == enabled) && (link->status.connected == connected)) return 0; link->status.enabled = enabled; link->status.connected = connected; - _host_dstcache_update_sync(knet_h, knet_h->host_index[host_id]); + _host_dstcache_update_async(knet_h, knet_h->host_index[host_id]); if ((link->status.dynconnected) && (!link->status.connected)) link->status.dynconnected = 0; return 0; } int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!src_addr) { errno = EINVAL; return -1; } if (transport >= KNET_MAX_TRANSPORTS) { errno = EINVAL; return -1; } if (!knet_h->transport_ops[transport]) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (link->configured != 0) { err =-1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled != 0) { err =-1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } memmove(&link->src_addr, src_addr, sizeof(struct sockaddr_storage)); err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage), link->status.src_ipaddr, KNET_MAX_HOST_LEN, link->status.src_port, KNET_MAX_PORT_LEN); if (err) { if (err == EAI_SYSTEM) { savederrno = errno; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u source addr/port: %s", host_id, link_id, strerror(savederrno)); } else { savederrno = EINVAL; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u source addr/port: %s", host_id, link_id, gai_strerror(err)); } err = -1; goto exit_unlock; } if (!dst_addr) { link->dynamic = KNET_LINK_DYNIP; } else { link->dynamic = KNET_LINK_STATIC; memmove(&link->dst_addr, dst_addr, sizeof(struct sockaddr_storage)); err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage), link->status.dst_ipaddr, KNET_MAX_HOST_LEN, link->status.dst_port, KNET_MAX_PORT_LEN); if (err) { if (err == EAI_SYSTEM) { savederrno = errno; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u destination addr/port: %s", host_id, link_id, strerror(savederrno)); } else { savederrno = EINVAL; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u destination addr/port: %s", host_id, link_id, gai_strerror(err)); } err = -1; goto exit_unlock; } } link->transport_type = transport; link->transport_connected = 0; link->proto_overhead = knet_h->transport_ops[link->transport_type]->transport_mtu_overhead; link->configured = 1; link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT; link->has_valid_mtu = 0; link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */ link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */ link->latency_fix = KNET_LINK_DEFAULT_PING_PRECISION; link->latency_exp = KNET_LINK_DEFAULT_PING_PRECISION - \ ((link->ping_interval * KNET_LINK_DEFAULT_PING_PRECISION) / 8000000); if (knet_h->transport_ops[link->transport_type]->transport_link_set_config(knet_h, link) < 0) { savederrno = errno; err = -1; goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured", host_id, link_id); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t *transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint8_t *dynamic) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!src_addr) { errno = EINVAL; return -1; } if (!dynamic) { errno = EINVAL; return -1; } if (!transport) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) { savederrno = EINVAL; err = -1; goto exit_unlock; } memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage)); *transport = link->transport_type; if (link->dynamic == KNET_LINK_STATIC) { *dynamic = 0; memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage)); } else { *dynamic = 1; } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_clear_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (link->configured != 1) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled != 0) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if ((knet_h->transport_ops[link->transport_type]->transport_link_clear_config(knet_h, link) < 0) && (errno != EBUSY)) { savederrno = errno; err = -1; goto exit_unlock; } memset(link, 0, sizeof(struct knet_link)); link->link_id = link_id; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped", host_id, link_id); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_set_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, unsigned int enabled) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (enabled > 1) { errno = EINVAL; return -1; } - /* - * this read lock might appear as an API violation, but be - * very careful because we cannot use a write lock (yet). - * the _send_host_info requires threads to be operational. - * a write lock here would deadlock. - * a read lock is sufficient as all functions invoked by - * this code are already thread safe. - */ - savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); + savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled == enabled) { err = 0; goto exit_unlock; } - if (!enabled) { - struct knet_hostinfo knet_hostinfo; - - knet_hostinfo.khi_type = KNET_HOSTINFO_TYPE_LINK_UP_DOWN; - knet_hostinfo.khi_bcast = KNET_HOSTINFO_UCAST; - knet_hostinfo.khi_dst_node_id = host_id; - knet_hostinfo.khip_link_status_link_id = link_id; - knet_hostinfo.khip_link_status_status = KNET_HOSTINFO_LINK_STATUS_DOWN; - _send_host_info(knet_h, &knet_hostinfo, KNET_HOSTINFO_LINK_STATUS_SIZE); - } - err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected); savederrno = errno; if (enabled) { goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled", host_id, link_id); - link->host_info_up_sent = 0; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, unsigned int *enabled) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!enabled) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *enabled = link->status.enabled; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_set_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t pong_count) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (pong_count < 1) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } link->pong_count = pong_count; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u pong count update: %u", host_id, link_id, link->pong_count); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t *pong_count) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!pong_count) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *pong_count = link->pong_count; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_set_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, time_t interval, time_t timeout, unsigned int precision) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } if (!timeout) { errno = EINVAL; return -1; } if (!precision) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } link->ping_interval = interval * 1000; /* microseconds */ link->pong_timeout = timeout * 1000; /* microseconds */ link->latency_fix = precision; link->latency_exp = precision - \ ((link->ping_interval * precision) / 8000000); log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %d", host_id, link_id, link->ping_interval, link->pong_timeout, precision); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, time_t *interval, time_t *timeout, unsigned int *precision) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } if (!timeout) { errno = EINVAL; return -1; } if (!precision) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *interval = link->ping_interval / 1000; /* microseconds */ *timeout = link->pong_timeout / 1000; *precision = link->latency_fix; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_set_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t priority) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; uint8_t old_priority; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } old_priority = link->priority; if (link->priority == priority) { err = 0; goto exit_unlock; } link->priority = priority; - if (_host_dstcache_update_async(knet_h, host)) { + if (_host_dstcache_update_sync(knet_h, host)) { savederrno = errno; log_debug(knet_h, KNET_SUB_LINK, "Unable to update link priority (host: %u link: %u priority: %u): %s", host_id, link_id, link->priority, strerror(savederrno)); link->priority = old_priority; err = -1; goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u priority set to: %u", host_id, link_id, link->priority); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t *priority) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!priority) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *priority = link->priority; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_link_list(knet_handle_t knet_h, uint16_t host_id, uint8_t *link_ids, size_t *link_ids_entries) { int savederrno = 0, err = 0, i, count = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (!link_ids) { errno = EINVAL; return -1; } if (!link_ids_entries) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (i = 0; i < KNET_MAX_LINK; i++) { link = &host->link[i]; if (!link->configured) { continue; } link_ids[count] = i; count++; } *link_ids_entries = count; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_get_status(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, struct knet_link_status *status) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!knet_h) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } memmove(status, &link->status, sizeof(struct knet_link_status)); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/onwire.h b/libknet/onwire.h index e2b8ec6f..f0c07f08 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -1,195 +1,199 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __ONWIRE_H__ #define __ONWIRE_H__ /* * data structures to define network packets. * Start from knet_header at the bottom */ #include #if 0 /* * for future protocol extension (re-switching table calculation) */ struct knet_hinfo_link { uint8_t khl_link_id; uint8_t khl_link_dynamic; uint8_t khl_link_priority; uint64_t khl_link_latency; char khl_link_dst_ipaddr[KNET_MAX_HOST_LEN]; char khl_link_dst_port[KNET_MAX_PORT_LEN]; } __attribute__((packed)); struct knet_hinfo_link_table { uint16_t khlt_node_id; uint8_t khlt_local; /* we have this node connected locally */ struct knet_hinfo_link khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */ } __attribute__((packed)); struct link_table { uint16_t khdt_host_entries; uint8_t khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */ } __attribute__((packed)); #endif #define KNET_HOSTINFO_LINK_STATUS_DOWN 0 #define KNET_HOSTINFO_LINK_STATUS_UP 1 struct knet_hostinfo_payload_link_status { uint8_t khip_link_status_link_id; /* link id */ uint8_t khip_link_status_status; /* up/down status */ } __attribute__((packed)); /* * union to reference possible individual payloads */ union knet_hostinfo_payload { struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status; } __attribute__((packed)); /* * due to the nature of knet_hostinfo, we are currently * sending those data as part of knet_header_payload_data.khp_data_userdata * and avoid a union that increses knet_header_payload_data size * unnecessarely. * This might change later on depending on how we implement * host info exchange */ -#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 +#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 // UNUSED #define KNET_HOSTINFO_TYPE_LINK_TABLE 1 // NOT IMPLEMENTED #define KNET_HOSTINFO_UCAST 0 /* send info to a specific host */ #define KNET_HOSTINFO_BCAST 1 /* send info to all known / connected hosts */ struct knet_hostinfo { uint8_t khi_type; /* type of hostinfo we are sending */ uint8_t khi_bcast; /* hostinfo destination bcast/ucast */ uint16_t khi_dst_node_id;/* used only if in ucast mode */ union knet_hostinfo_payload khi_payload; } __attribute__((packed)); #define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo) #define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload)) #define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status)) #define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status #define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id /* * typedef uint64_t seq_num_t; * #define SEQ_MAX UINT64_MAX */ typedef uint16_t seq_num_t; #define SEQ_MAX UINT16_MAX struct knet_header_payload_data { seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pkcts */ uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */ uint8_t khp_data_pad2; uint8_t khp_data_bcast; /* data destination bcast/ucast */ uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */ uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */ int8_t khp_data_channel; /* transport channel data for localsock <-> knet <-> localsock mapping */ uint8_t khp_data_userdata[0]; /* pointer to the real user data */ } __attribute__((packed)); struct knet_header_payload_ping { uint8_t khp_ping_link; /* source link id */ uint32_t khp_ping_time[4]; /* ping timestamp */ + seq_num_t khp_ping_seq_num; /* transport host seq_num */ + uint8_t khp_ping_timed; /* timed pinged (1) or forced by seq_num (0) */ } __attribute__((packed)); /* taken from tracepath6 */ #define KNET_PMTUD_SIZE_V4 65535 #define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4 /* These two get the protocol-specific overheads added to them */ #define KNET_PMTUD_OVERHEAD_V4 20 #define KNET_PMTUD_OVERHEAD_V6 40 #define KNET_PMTUD_MIN_MTU_V4 576 #define KNET_PMTUD_MIN_MTU_V6 1280 struct knet_header_payload_pmtud { uint8_t khp_pmtud_link; /* source link id */ uint16_t khp_pmtud_size; /* size of the current packet */ uint8_t khp_pmtud_data[0]; /* pointer to empty/random data/fill buffer */ } __attribute__((packed)); /* * union to reference possible individual payloads */ union knet_header_payload { struct knet_header_payload_data khp_data; /* pure data packet struct */ struct knet_header_payload_ping khp_ping; /* heartbeat packet struct */ struct knet_header_payload_pmtud khp_pmtud; /* Path MTU discovery packet struct */ } __attribute__((packed)); /* * starting point */ #define KNET_HEADER_VERSION 0x01 /* we currently support only one version */ #define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */ #define KNET_HEADER_TYPE_HOST_INFO 0x01 /* host status information pckt */ #define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */ #define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */ #define KNET_HEADER_TYPE_PONG 0x82 /* reply to heartbeat */ #define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */ #define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */ struct knet_header { uint8_t kh_version; /* pckt format/version */ uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */ uint16_t kh_node; /* host id of the source host for this pckt */ uint8_t kh_pad1; /* make sure to have space in the header to grow features */ uint8_t kh_pad2; union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */ } __attribute__((packed)); /* * commodoty defines to hide structure nesting * (needs review and cleanup) */ #define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num #define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num #define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq #define khp_data_userdata kh_payload.khp_data.khp_data_userdata #define khp_data_bcast kh_payload.khp_data.khp_data_bcast #define khp_data_channel kh_payload.khp_data.khp_data_channel #define khp_ping_link kh_payload.khp_ping.khp_ping_link #define khp_ping_time kh_payload.khp_ping.khp_ping_time +#define khp_ping_seq_num kh_payload.khp_ping.khp_ping_seq_num +#define khp_ping_timed kh_payload.khp_ping.khp_ping_timed #define khp_pmtud_link kh_payload.khp_pmtud.khp_pmtud_link #define khp_pmtud_size kh_payload.khp_pmtud.khp_pmtud_size #define khp_pmtud_data kh_payload.khp_pmtud.khp_pmtud_data /* * extra defines to avoid mingling with sizeof() too much */ #define KNET_HEADER_ALL_SIZE sizeof(struct knet_header) #define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload)) #define KNET_HEADER_PING_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping)) #define KNET_HEADER_PMTUD_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud)) #define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data)) #endif diff --git a/libknet/threads_heartbeat.c b/libknet/threads_heartbeat.c index 63fcc242..77727245 100644 --- a/libknet/threads_heartbeat.c +++ b/libknet/threads_heartbeat.c @@ -1,146 +1,167 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "crypto.h" #include "link.h" #include "logging.h" #include "threads_common.h" #include "threads_heartbeat.h" static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) { memset(&dst_link->pmtud_last, 0, sizeof(struct timespec)); dst_link->received_pong = 0; dst_link->status.pong_last.tv_nsec = 0; if (dst_link->status.connected == 1) { log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down", dst_host->host_id, dst_link->link_id); _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0); } } -static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link) +static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed) { int err = 0, savederrno = 0; int len; ssize_t outlen = KNET_HEADER_PING_SIZE; struct timespec clock_now, pong_last; unsigned long long diff_ping; unsigned char *outbuf = (unsigned char *)knet_h->pingbuf; if (dst_link->transport_connected == 0) { _link_down(knet_h, dst_host, dst_link); return; } /* caching last pong to avoid race conditions */ pong_last = dst_link->status.pong_last; if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get monotonic clock"); return; } timespec_diff(dst_link->ping_last, clock_now, &diff_ping); - if (diff_ping >= (dst_link->ping_interval * 1000llu)) { + if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) { memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec)); knet_h->pingbuf->khp_ping_link = dst_link->link_id; + if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { + log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock"); + return; + } + knet_h->pingbuf->khp_ping_seq_num = htons(knet_h->tx_seq_num); + pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); + knet_h->pingbuf->khp_ping_timed = timed; if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)knet_h->pingbuf, outlen, knet_h->pingbuf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to crypto ping packet"); return; } outbuf = knet_h->pingbuf_crypt; } retry: len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage)); dst_link->ping_last = clock_now; if (len != outlen) { err = knet_h->transport_ops[dst_link->transport_type]->transport_tx_sock_error(knet_h, dst_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to send ping (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", dst_link->outsock, errno, strerror(errno), dst_link->status.src_ipaddr, dst_link->status.src_port, dst_link->status.dst_ipaddr, dst_link->status.dst_port); break; case 0: break; case 1: goto retry; break; } } else { dst_link->last_ping_size = outlen; } } timespec_diff(pong_last, clock_now, &diff_ping); if ((pong_last.tv_nsec) && (diff_ping >= (dst_link->pong_timeout * 1000llu))) { _link_down(knet_h, dst_host, dst_link); } } -void *_handle_heartbt_thread(void *data) +void _send_pings(knet_handle_t knet_h, int timed) { - knet_handle_t knet_h = (knet_handle_t) data; struct knet_host *dst_host; int link_idx; + if (pthread_mutex_lock(&knet_h->hb_mutex)) { + log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock"); + return; + } + + for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { + for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { + if ((dst_host->link[link_idx].status.enabled != 1) || + ((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) && + (dst_host->link[link_idx].status.dynconnected != 1))) + continue; + + _handle_check_each(knet_h, dst_host, &dst_host->link[link_idx], timed); + } + } + + pthread_mutex_unlock(&knet_h->hb_mutex); +} + +void *_handle_heartbt_thread(void *data) +{ + knet_handle_t knet_h = (knet_handle_t) data; + /* preparing ping buffer */ knet_h->pingbuf->kh_version = KNET_HEADER_VERSION; knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING; knet_h->pingbuf->kh_node = htons(knet_h->host_id); while (!shutdown_in_progress(knet_h)) { usleep(KNET_THREADS_TIMERES); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock"); continue; } - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { - if ((dst_host->link[link_idx].status.enabled != 1) || - ((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) && - (dst_host->link[link_idx].status.dynconnected != 1))) - continue; - _handle_check_each(knet_h, dst_host, &dst_host->link[link_idx]); - } - } + _send_pings(knet_h, 1); pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } diff --git a/libknet/threads_heartbeat.h b/libknet/threads_heartbeat.h index 3ca437b5..eb763622 100644 --- a/libknet/threads_heartbeat.h +++ b/libknet/threads_heartbeat.h @@ -1,15 +1,16 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __THREADS_HEARTBEAT_H__ #define __THREADS_HEARTBEAT_H__ +void _send_pings(knet_handle_t knet_h, int timed); void *_handle_heartbt_thread(void *data); #endif diff --git a/libknet/threads_send_recv.c b/libknet/threads_send_recv.c index a24c7093..37375346 100644 --- a/libknet/threads_send_recv.c +++ b/libknet/threads_send_recv.c @@ -1,1288 +1,1302 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include "crypto.h" #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" +#include "threads_heartbeat.h" #include "threads_send_recv.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out) { int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress; struct mmsghdr msg[PCKT_FRAG_MAX]; int err = 0, savederrno = 0; memset(&msg, 0, sizeof(struct mmsghdr)); for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num; sent_msgs = 0; prev_sent = 0; progress = 1; retry: msg_idx = 0; while (msg_idx < msgs_to_send) { memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr)); msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent]; msg[msg_idx].msg_hdr.msg_iovlen = 1; msg_idx++; } sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry; break; } if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) { if ((sent_msgs) || (progress)) { msgs_to_send = msg_idx - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) { ssize_t outlen, frag_len; struct knet_host *dst_host; uint16_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX]; uint8_t frag_idx; unsigned int temp_data_mtu; int host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; + seq_num_t tx_seq_num; inbuf = knet_h->recv_from_sock_buf[buf_idx]; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unrechable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming mininum IPv4 mtu (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; + if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { + log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); + goto out_unlock; + } + knet_h->tx_seq_num++; + /* + * force seq_num 0 to detect a node that has crashed and rejoining + * the knet instance. seq_num 0 will clear the buffers in the RX + * thread + */ + if (knet_h->tx_seq_num == 0) { + knet_h->tx_seq_num++; + } + /* + * cache the value in locked context + */ + tx_seq_num = knet_h->tx_seq_num; + knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num); + pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); + + /* + * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 + * pckts. + * this solves 2 problems: + * 1) on TX socket overloads we generate extra pings to keep links alive + * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, + * node 3+ will be able to keep in sync on the TX seq_num even without + * receiving traffic or pings in betweens. This avoids issues with + * rollover of the circular buffer + */ + + if (tx_seq_num % (SEQ_MAX / 8) == 0) { + _send_pings(knet_h, 0); + } + while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; } else { iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; + knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); frag_len = frag_len - temp_data_mtu; - frag_idx++; } - if (!bcast) { + if (knet_h->crypto_instance) { + frag_idx = 0; + while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { + if (crypto_encrypt_and_sign( + knet_h, + (const unsigned char *)knet_h->send_to_links_buf[frag_idx], + iov_out[frag_idx].iov_len, + knet_h->send_to_links_buf_crypt[frag_idx], + &outlen) < 0) { + log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); + savederrno = ECHILD; + err = -1; + goto out_unlock; + } + iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; + iov_out[frag_idx].iov_len = outlen; + frag_idx++; + } + } + if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids[host_idx]]; - knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx); - - frag_idx = 0; - - while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { - - knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; - - if (knet_h->crypto_instance) { - if (crypto_encrypt_and_sign( - knet_h, - (const unsigned char *)knet_h->send_to_links_buf[frag_idx], - iov_out[frag_idx].iov_len, - knet_h->send_to_links_buf_crypt[frag_idx], - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); - savederrno = ECHILD; - err = -1; - goto out_unlock; - } - iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; - iov_out[frag_idx].iov_len = outlen; - } - - frag_idx++; - } - err = _dispatch_to_links(knet_h, dst_host, iov_out); savederrno = errno; if (err) { goto out_unlock; } - } - } else { - - knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx); - - frag_idx = 0; - - while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { - - knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; - - if (knet_h->crypto_instance) { - if (crypto_encrypt_and_sign( - knet_h, - (const unsigned char *)knet_h->send_to_links_buf[frag_idx], - iov_out[frag_idx].iov_len, - knet_h->send_to_links_buf_crypt[frag_idx], - &outlen) < 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); - savederrno = ECHILD; - err = -1; - goto out_unlock; - } - iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; - iov_out[frag_idx].iov_len = outlen; - } - - frag_idx++; - } - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, iov_out); savederrno = errno; if (err) { goto out_unlock; } } } - } out_unlock: errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type) { ssize_t inlen = 0; struct iovec iov_in; int msg_recv, i; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; inlen = readv(sockfd, &iov_in, 1); if (inlen <= 0) { savederrno = errno; docallback = 1; goto out; } msg_recv = 1; knet_h->recv_from_sock_buf[0]->kh_type = type; _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); } else { msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); if (msg_recv < 0) { inlen = msg_recv; savederrno = errno; docallback = 1; goto out; } for (i = 0; i < msg_recv; i++) { inlen = msg[i].msg_len; if (inlen == 0) { savederrno = 0; docallback = 1; goto out; break; } knet_h->recv_from_sock_buf[i]->kh_type = type; _parse_recv_from_sock(knet_h, i, inlen, channel, 0); } } out: if (inlen < 0) { struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, nev, type; int8_t channel; memset(&msg, 0, sizeof(struct mmsghdr)); /* preparing data buffer */ for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = -1; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ - if (!_seq_num_lookup(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 1)) { + if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ - _seq_num_set(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 1); + _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { if (errno == ETIME) { log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); } return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg) { int err = 0, savederrno = 0; ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct timespec recvtime; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; struct sockaddr_storage pckt_src; + seq_num_t recv_seq_num; + int wipe_bufs = 0; if (knet_h->crypto_instance) { if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); return; } len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } if (len < (KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } src_link = NULL; if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { /* * cpyaddrport will only copy address and port of the incoming * packet and strip extra bits such as flow and scopeid */ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), &pckt_src, sockaddr_len(&pckt_src)) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link); } } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: + /* + * TODO: should we accept data even if we can't reply to the other node? + * how would that work with SCTP and guaranteed delivery? + */ + + if (!src_host->status.reachable) { + log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id); + //return; + } inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; + src_host->got_data = 1; - if (!_seq_num_lookup(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 0)) { + if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (knet_h->enabled != 1) /* data forward is disabled */ break; if (knet_h->dst_host_filter_fn) { int host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return; } if (outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0); + _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); } } else { /* HOSTINFO */ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); } - if (!_seq_num_lookup(src_host, bcast, inbuf->khp_data_seq_num, 0)) { + if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { return; } - _seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0); + _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); switch(knet_hostinfo->khi_type) { case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: - src_link = src_host->link + - (knet_hostinfo->khip_link_status_link_id % KNET_MAX_LINK); - /* - * basically if the node is coming back to life from a crash - * we should receive a host info where local previous status == remote current status - * and so we can detect that node is showing up again - * we need to clear cbuffers and notify the node of our status by resending our host info - */ - if ((src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) && - (src_link->remoteconnected == knet_hostinfo->khip_link_status_status)) { - src_link->host_info_up_sent = 0; - } - src_link->remoteconnected = knet_hostinfo->khip_link_status_status; - if (src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_DOWN) { - /* - * if a host is disconnecting clean, we note that in donnotremoteupdate - * so that we don't send host info back immediately but we wait - * for the node to send an update when it's alive again - */ - src_link->host_info_up_sent = 0; - src_link->donnotremoteupdate = 1; - } else { - src_link->donnotremoteupdate = 0; - } - log_debug(knet_h, KNET_SUB_RX, "host message up/down. from host: %u link: %u remote connected: %u", - src_host->host_id, - src_link->link_id, - src_link->remoteconnected); - if (_host_dstcache_update_async(knet_h, src_host)) { - log_debug(knet_h, KNET_SUB_RX, - "Unable to update switch cache for host: %u link: %u remote connected: %u)", - src_host->host_id, - src_link->link_id, - src_link->remoteconnected); - } break; case KNET_HOSTINFO_TYPE_LINK_TABLE: break; default: log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); break; } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); + recv_seq_num = ntohs(inbuf->khp_ping_seq_num); + + wipe_bufs = 0; + + if (!inbuf->khp_ping_timed) { + /* + * we might be receiving this message from all links, but we want + * to process it only the first time + */ + if (recv_seq_num != src_host->untimed_rx_seq_num) { + /* + * cache the untimed seq num + */ + src_host->untimed_rx_seq_num = recv_seq_num; + /* + * if the host has received data in between + * untimed ping, then we don't need to wipe the bufs + */ + if (src_host->got_data) { + src_host->got_data = 0; + wipe_bufs = 0; + } else { + wipe_bufs = 1; + } + } + _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); + } else { + /* + * pings always arrives in bursts over all the link + * catch the first of them to cache the seq num and + * avoid duplicate processing + */ + if (recv_seq_num != src_host->timed_rx_seq_num) { + src_host->timed_rx_seq_num = recv_seq_num; + + if (recv_seq_num == 0) { + _seq_num_lookup(src_host, recv_seq_num, 0, 1); + } + } + } if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } retry_pong: len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); savederrno = errno; if (len != outlen) { err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry_pong; break; } } break; case KNET_HEADER_TYPE_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); src_link->status.latency = ((src_link->status.latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->status.latency < src_link->pong_timeout) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); } else { src_link->received_pong++; log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } break; case KNET_HEADER_TYPE_PMTUD: outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } retry_pmtud: len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)); if (len != outlen) { err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno); switch(err) { case -1: /* unrecoverable error */ log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry_pmtud; break; } } break; case KNET_HEADER_TYPE_PMTUD_REPLY: if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); break; default: return; } } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_FRAG_MAX; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case -1: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case 0: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case 1: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case 2: /* packet is data and should be parsed as such */ _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; memset(&msg, 0, sizeof(msg)); for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } return NULL; }