diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 017e7191..d12900b1 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,985 +1,985 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" #include "onwire_v1.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0, locked = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; locked = 0; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, cur_link->link_id, strerror(savederrno)); continue; } locked = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; msg[msg_idx].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[cur_link->outsock].sockaddr_len; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } pthread_mutex_unlock(&cur_link->link_stats_mutex); locked = 0; } out_unlock: if (locked) { pthread_mutex_unlock(&cur_link->link_stats_mutex); } errno = savederrno; return err; } static int _dispatch_to_local(knet_handle_t knet_h, unsigned char *data, size_t inlen, int8_t channel) { int err = 0, savederrno = 0; const unsigned char *buf = data; ssize_t buflen = inlen; struct knet_link *local_link = knet_h->host_index[knet_h->host_id]->link; local_retry: err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); savederrno = errno; if (err < 0) { log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); local_link->status.stats.tx_data_errors++; goto out; } if (err > 0 && err < buflen) { log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); local_link->status.stats.tx_data_retries++; buf += err; buflen -= err; goto local_retry; } if (err == buflen) { local_link->status.stats.tx_data_packets++; local_link->status.stats.tx_data_bytes += inlen; } out: errno = savederrno; return err; } static int _prep_tx_bufs(knet_handle_t knet_h, struct knet_header *inbuf, uint8_t onwire_ver, unsigned char *data, size_t inlen, seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) { int err = 0, savederrno = 0; unsigned int temp_data_mtu; if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } if (knet_h->onwire_ver_remap) { prep_tx_bufs_v1(knet_h, inbuf, data, inlen, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); } else { switch (onwire_ver) { case 1: prep_tx_bufs_v1(knet_h, inbuf, data, inlen, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); break; default: /* this should never hit as filters are in place in the calling functions */ log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); savederrno = EINVAL; err = -1; goto out; break; } } out: errno = savederrno; return err; } static int _compress_data(knet_handle_t knet_h, unsigned char* data, size_t *inlen, int *data_compressed) { int err = 0, savederrno = 0; int stats_locked = 0, stats_err = 0; size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; /* * compress data */ if (knet_h->compress_model > 0) { if (*inlen > knet_h->compress_threshold) { clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, data, *inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); savederrno = errno; clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &compress_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out; } stats_locked = 1; /* Collect stats */ if (compress_time < knet_h->stats.tx_compress_time_min) { knet_h->stats.tx_compress_time_min = compress_time; } if (compress_time > knet_h->stats.tx_compress_time_max) { knet_h->stats.tx_compress_time_max = compress_time; } knet_h->stats.tx_compress_time_ave = (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + compress_time) / (knet_h->stats.tx_compressed_packets+1); if (err < 0) { knet_h->stats.tx_failed_to_compress++; log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { knet_h->stats.tx_compressed_packets++; knet_h->stats.tx_compressed_original_bytes += *inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; if (cmp_outlen < *inlen) { memmove(data, knet_h->send_to_links_buf_compress, cmp_outlen); *inlen = cmp_outlen; *data_compressed = 1; } else { knet_h->stats.tx_unable_to_compress++; } } } if (!*data_compressed) { if (!stats_locked) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out; } stats_locked = 1; } knet_h->stats.tx_uncompressed_packets++; } if (stats_locked) { pthread_mutex_unlock(&knet_h->handle_stats_mutex); } } out: errno = savederrno; return err; } static int _encrypt_bufs(knet_handle_t knet_h, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) { int err = 0, savederrno = 0, stats_err = 0; struct timespec start_time; struct timespec end_time; uint64_t crypt_time; uint8_t frag_idx = 0; size_t outlen, uncrypted_frag_size; int j; if (knet_h->crypto_in_use_config) { while (frag_idx < msgs_to_send) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, iov_out[frag_idx], *iovcnt_out, knet_h->send_to_links_buf_crypt[frag_idx], (ssize_t *)&outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out; } clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &crypt_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; goto out; } if (crypt_time < knet_h->stats.tx_crypt_time_min) { knet_h->stats.tx_crypt_time_min = crypt_time; } if (crypt_time > knet_h->stats.tx_crypt_time_max) { knet_h->stats.tx_crypt_time_max = crypt_time; } knet_h->stats.tx_crypt_time_ave = (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + crypt_time) / (knet_h->stats.tx_crypt_packets+1); uncrypted_frag_size = 0; for (j=0; j < *iovcnt_out; j++) { uncrypted_frag_size += iov_out[frag_idx][j].iov_len; } knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); knet_h->stats.tx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx][0].iov_len = outlen; frag_idx++; } *iovcnt_out = 1; } out: errno = savederrno; return err; } static int _get_tx_seq_num(knet_handle_t knet_h, seq_num_t *tx_seq_num) { int savederrno = 0; savederrno = pthread_mutex_lock(&knet_h->tx_seq_num_mutex); if (savederrno) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); errno = savederrno; return -1; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ *tx_seq_num = knet_h->tx_seq_num; pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (*tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } return 0; } static int _get_data_dests(knet_handle_t knet_h, unsigned char* data, size_t inlen, int8_t *channel, int *bcast, int *send_local, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries, int is_sync) { int err = 0, savederrno = 0; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; /* store destinations from filter */ size_t dst_host_ids_entries_temp = 0; size_t dst_host_ids_entries_temp2 = 0; /* workaround gcc here */ struct knet_host *dst_host; size_t host_idx; if (knet_h->dst_host_filter_fn) { *bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, data, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (*bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", *bcast); savederrno = EFAULT; err = -1; goto out; } if ((!*bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out; } if ((!*bcast) && (dst_host_ids_entries_temp > KNET_MAX_HOST)) { log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); savederrno = EINVAL; err = -1; goto out; } if (is_sync) { if ((*bcast) || ((!*bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out; } } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unreachable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!*bcast) { *dst_host_ids_entries = dst_host_ids_entries_temp2; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if ((dst_host->host_id == knet_h->host_id) && (knet_h->has_loop_link)) { *send_local = 1; } if (!((dst_host->host_id == knet_h->host_id) && (knet_h->has_loop_link)) && dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries_temp2] = dst_host_ids_temp[host_idx]; dst_host_ids_entries_temp2++; } } if ((!dst_host_ids_entries_temp2) && (!*send_local)) { savederrno = EHOSTDOWN; err = -1; goto out; } *dst_host_ids_entries = dst_host_ids_entries_temp2; } else { *bcast = 0; *send_local = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if ((dst_host->host_id == knet_h->host_id) && (knet_h->has_loop_link)) { *send_local = 1; } if (!(dst_host->host_id == knet_h->host_id && knet_h->has_loop_link) && dst_host->status.reachable) { *bcast = 1; } } if ((!*bcast) && (!*send_local)) { savederrno = EHOSTDOWN; err = -1; goto out; } } out: errno = savederrno; return err; } static int _prep_and_send_msgs(knet_handle_t knet_h, int bcast, knet_node_id_t *dst_host_ids, size_t dst_host_ids_entries, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int iovcnt_out) { int err = 0, savederrno = 0; struct knet_host *dst_host; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msg_idx; size_t host_idx; memset(&msg, 0, sizeof(msg)); msg_idx = 0; while (msg_idx < msgs_to_send) { - msg[msg_idx].msg_hdr.msg_namelen = sockaddr_len((const struct sockaddr_storage *)&msg[msg_idx].msg_hdr.msg_name); + msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* this will set properly in _dispatch_to_links() */ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out; } } } } out: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, uint8_t onwire_ver, int is_sync) { int err = 0, savederrno = 0; struct knet_header *inbuf = knet_h->recv_from_sock_buf; /* all TX packets are stored here regardless of the onwire */ unsigned char *data; /* onwire neutrual pointer to data to send */ int data_compressed = 0; /* track data compression to fill the header */ seq_num_t tx_seq_num; int bcast = 1; /* assume all packets are to be broadcasted unless filter tells us differently */ knet_node_id_t dst_host_ids[KNET_MAX_HOST]; /* store destinations from filter */ size_t dst_host_ids_entries = 0; int send_local = 0; /* send packets to loopback */ struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; int msgs_to_send = 0; if (knet_h->enabled != 1) { log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out; } if (knet_h->onwire_ver_remap) { data = get_data_v1(knet_h, inbuf); } else { switch (onwire_ver) { case 1: data = get_data_v1(knet_h, inbuf); break; default: /* this should never hit as filters are in place in the calling functions */ log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); savederrno = EINVAL; err = -1; goto out; break; } } err = _get_data_dests(knet_h, data, inlen, &channel, &bcast, &send_local, dst_host_ids, &dst_host_ids_entries, is_sync); if (err < 0) { savederrno = errno; goto out; } /* Send to localhost if appropriate and enabled */ if (send_local) { err = _dispatch_to_local(knet_h, data, inlen, channel); if (err < 0) { savederrno = errno; goto out; } } err = _compress_data(knet_h, data, &inlen, &data_compressed); if (err < 0) { savederrno = errno; goto out; } err = _get_tx_seq_num(knet_h, &tx_seq_num); if (err < 0) { savederrno = errno; goto out; } err = _prep_tx_bufs(knet_h, inbuf, onwire_ver, data, inlen, tx_seq_num, channel, bcast, data_compressed, &msgs_to_send, iov_out, &iovcnt_out); if (err < 0) { savederrno = errno; goto out; } err = _encrypt_bufs(knet_h, msgs_to_send, iov_out, &iovcnt_out); if (err < 0) { savederrno = errno; goto out; } err = _prep_and_send_msgs(knet_h, bcast, dst_host_ids, dst_host_ids_entries, msgs_to_send, iov_out, iovcnt_out); if (err < 0) { savederrno = errno; goto out; } out: errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, uint8_t onwire_ver, int8_t channel) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; memset(&iov_in, 0, sizeof(iov_in)); if (knet_h->onwire_ver_remap) { iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf); iov_in.iov_len = KNET_MAX_PACKET_SIZE; } else { switch (onwire_ver) { case 1: iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf); iov_in.iov_len = KNET_MAX_PACKET_SIZE; break; default: log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); break; } } memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len; msg.msg_iov = &iov_in; msg.msg_iovlen = 1; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg.msg_iov, 1); } else { inlen = recvmsg(sockfd, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg.msg_flags & MSG_TRUNC) { log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd); return; } } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } else { _parse_recv_from_sock(knet_h, inlen, channel, onwire_ver, 0); } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev; int flush, flush_queue_limit; int8_t channel; uint8_t onwire_ver; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } if (pthread_mutex_lock(&knet_h->onwire_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); goto out_unlock; } onwire_ver = knet_h->onwire_ver; pthread_mutex_unlock(&knet_h->onwire_mutex); for (i = 0; i < nev; i++) { for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } if (channel >= KNET_DATAFD_MAX) { log_debug(knet_h, KNET_SUB_TX, "No available channels"); continue; /* channel not found */ } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, events[i].data.fd, onwire_ver, channel); pthread_mutex_unlock(&knet_h->tx_mutex); } out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; uint8_t onwire_ver; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } if (pthread_mutex_lock(&knet_h->onwire_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); goto out; } onwire_ver = knet_h->onwire_ver; pthread_mutex_unlock(&knet_h->onwire_mutex); savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } if (knet_h->onwire_ver_remap) { memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len); } else { switch (onwire_ver) { case 1: memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len); break; default: log_warn(knet_h, KNET_SUB_TX, "preparing sync data onwire version %u not supported", onwire_ver); goto out_tx; break; } } err = _parse_recv_from_sock(knet_h, buff_len, channel, onwire_ver, 1); savederrno = errno; out_tx: pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }