Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 422086f1..84545490 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,1001 +1,1000 @@
/*
* Copyright (C) 2012-2024 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
#include "onwire_v1.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0, locked = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
prev_sent = 0;
progress = 1;
locked = 0;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
continue;
}
savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
dst_host->host_id, cur_link->link_id, strerror(savederrno));
continue;
}
locked = 1;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
msg[msg_idx].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[cur_link->outsock].sockaddr_len;
/* Cast for Linux/BSD compatibility */
for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
}
cur_link->status.stats.tx_data_packets++;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, KNET_SUB_TX, sent_msgs, savederrno);
switch(err) {
case KNET_TRANSPORT_SOCK_ERROR_INTERNAL:
cur_link->status.stats.tx_data_errors++;
goto out_unlock;
break;
case KNET_TRANSPORT_SOCK_ERROR_IGNORE:
break;
case KNET_TRANSPORT_SOCK_ERROR_RETRY:
cur_link->status.stats.tx_data_retries++;
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
log_trace(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
pthread_mutex_unlock(&cur_link->link_stats_mutex);
locked = 0;
}
out_unlock:
if (locked) {
pthread_mutex_unlock(&cur_link->link_stats_mutex);
}
errno = savederrno;
return err;
}
static int _dispatch_to_local(knet_handle_t knet_h, unsigned char *data, size_t inlen, int8_t channel)
{
int err = 0, savederrno = 0;
const unsigned char *buf = data;
ssize_t buflen = inlen;
struct knet_link *local_link = knet_h->host_index[knet_h->host_id]->link;
struct iovec iov_out[2];
uint32_t cur_iov = 0;
struct knet_datafd_header datafd_hdr;
if (knet_h->sockfd[channel].flags & KNET_DATAFD_FLAG_RX_RETURN_INFO) {
- log_debug(knet_h, KNET_SUB_RX,
- "Adding header to local packet");
+ memset(&datafd_hdr, 0, sizeof(datafd_hdr));
datafd_hdr.size = sizeof(datafd_hdr);
datafd_hdr.src_nodeid = knet_h->host_id;
iov_out[0].iov_base = &datafd_hdr;
iov_out[0].iov_len = sizeof(datafd_hdr);
cur_iov++;
}
iov_out[cur_iov].iov_base = (void *)buf;
iov_out[cur_iov].iov_len = buflen;
err = writev_all(knet_h, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, cur_iov+1, local_link, KNET_SUB_TRANSP_LOOPBACK);
savederrno = errno;
if (err < 0) {
log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
local_link->status.stats.tx_data_errors++;
goto out;
}
if (err == buflen) {
local_link->status.stats.tx_data_packets++;
local_link->status.stats.tx_data_bytes += inlen;
}
out:
errno = savederrno;
return err;
}
static int _prep_tx_bufs(knet_handle_t knet_h,
struct knet_header *inbuf, uint8_t onwire_ver,
unsigned char *data, size_t inlen, uint32_t data_checksum,
seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed,
int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out)
{
int err = 0, savederrno = 0;
unsigned int temp_data_mtu;
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming minimum IPv4 MTU (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
if (knet_h->onwire_ver_remap) {
prep_tx_bufs_v1(knet_h, inbuf, data, inlen, data_checksum, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out);
} else {
switch (onwire_ver) {
case 1:
prep_tx_bufs_v1(knet_h, inbuf, data, inlen, data_checksum, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out);
break;
default: /* this should never hit as filters are in place in the calling functions */
log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver);
savederrno = EINVAL;
err = -1;
goto out;
break;
}
}
out:
errno = savederrno;
return err;
}
static int _compress_data(knet_handle_t knet_h, unsigned char* data, size_t *inlen, int *data_compressed)
{
int err = 0, savederrno = 0;
int stats_locked = 0, stats_err = 0;
size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
/*
* compress data
*/
if (knet_h->compress_model > 0) {
if (*inlen > knet_h->compress_threshold) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = compress(knet_h,
data, *inlen,
knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
savederrno = errno;
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out;
}
stats_locked = 1;
/* Collect stats */
if (compress_time < knet_h->stats.tx_compress_time_min) {
knet_h->stats.tx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.tx_compress_time_max) {
knet_h->stats.tx_compress_time_max = compress_time;
}
knet_h->stats.tx_compress_time_ave =
(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
compress_time) / (knet_h->stats.tx_compressed_packets+1);
if (err < 0) {
knet_h->stats.tx_failed_to_compress++;
log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno));
} else {
knet_h->stats.tx_compressed_packets++;
knet_h->stats.tx_compressed_original_bytes += *inlen;
knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
if (cmp_outlen < *inlen) {
memmove(data, knet_h->send_to_links_buf_compress, cmp_outlen);
*inlen = cmp_outlen;
*data_compressed = 1;
} else {
knet_h->stats.tx_unable_to_compress++;
}
}
}
if (!*data_compressed) {
if (!stats_locked) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out;
}
stats_locked = 1;
}
knet_h->stats.tx_uncompressed_packets++;
}
if (stats_locked) {
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
}
out:
errno = savederrno;
return err;
}
static int _encrypt_bufs(knet_handle_t knet_h, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out)
{
int err = 0, savederrno = 0, stats_err = 0;
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
uint8_t frag_idx = 0;
size_t outlen, uncrypted_frag_size;
int j;
if (knet_h->crypto_in_use_config) {
while (frag_idx < msgs_to_send) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_encrypt_and_signv(
knet_h,
iov_out[frag_idx], *iovcnt_out,
knet_h->send_to_links_buf_crypt[frag_idx],
(ssize_t *)&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out;
}
if (crypt_time < knet_h->stats.tx_crypt_time_min) {
knet_h->stats.tx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.tx_crypt_time_max) {
knet_h->stats.tx_crypt_time_max = crypt_time;
}
knet_h->stats.tx_crypt_time_ave =
(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
crypt_time) / (knet_h->stats.tx_crypt_packets+1);
uncrypted_frag_size = 0;
for (j=0; j < *iovcnt_out; j++) {
uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
}
knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
knet_h->stats.tx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx][0].iov_len = outlen;
frag_idx++;
}
*iovcnt_out = 1;
}
out:
errno = savederrno;
return err;
}
static int _get_tx_seq_num(knet_handle_t knet_h, seq_num_t *tx_seq_num)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&knet_h->tx_seq_num_mutex);
if (savederrno) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
errno = savederrno;
return -1;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
*tx_seq_num = knet_h->tx_seq_num;
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (*tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
return 0;
}
static int _get_data_dests(knet_handle_t knet_h, unsigned char* data, size_t inlen,
int8_t *channel, int *bcast, int *send_local,
knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries,
int is_sync)
{
int err = 0, savederrno = 0;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; /* store destinations from filter */
size_t dst_host_ids_entries_temp = 0;
size_t dst_host_ids_entries_temp2 = 0; /* workaround gcc here */
struct knet_host *dst_host;
size_t host_idx;
if (knet_h->dst_host_filter_fn) {
*bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
data,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (*bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", *bcast);
savederrno = EFAULT;
err = -1;
goto out;
}
if ((!*bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out;
}
if ((!*bcast) &&
(dst_host_ids_entries_temp > KNET_MAX_HOST)) {
log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
savederrno = EINVAL;
err = -1;
goto out;
}
if (is_sync) {
if ((*bcast) ||
((!*bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out;
}
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unreachable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!*bcast) {
*dst_host_ids_entries = dst_host_ids_entries_temp2;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if ((dst_host->host_id == knet_h->host_id) &&
(knet_h->has_loop_link)) {
*send_local = 1;
}
if (!((dst_host->host_id == knet_h->host_id) &&
(knet_h->has_loop_link)) &&
dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries_temp2] = dst_host_ids_temp[host_idx];
dst_host_ids_entries_temp2++;
}
}
if ((!dst_host_ids_entries_temp2) && (!*send_local)) {
savederrno = EHOSTDOWN;
err = -1;
goto out;
}
*dst_host_ids_entries = dst_host_ids_entries_temp2;
} else {
*bcast = 0;
*send_local = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if ((dst_host->host_id == knet_h->host_id) &&
(knet_h->has_loop_link)) {
*send_local = 1;
}
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
*bcast = 1;
}
}
if ((!*bcast) && (!*send_local)) {
savederrno = EHOSTDOWN;
err = -1;
goto out;
}
}
out:
errno = savederrno;
return err;
}
static int _prep_and_send_msgs(knet_handle_t knet_h, int bcast, knet_node_id_t *dst_host_ids, size_t dst_host_ids_entries, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int iovcnt_out)
{
int err = 0, savederrno = 0;
struct knet_host *dst_host;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msg_idx;
size_t host_idx;
memset(&msg, 0, sizeof(msg));
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* this will set properly in _dispatch_to_links() */
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out;
}
}
}
}
out:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, uint8_t onwire_ver, int is_sync)
{
int err = 0, savederrno = 0;
struct knet_header *inbuf = knet_h->recv_from_sock_buf; /* all TX packets are stored here regardless of the onwire */
unsigned char *data; /* onwire neutrual pointer to data to send */
int data_compressed = 0; /* track data compression to fill the header */
seq_num_t tx_seq_num;
uint32_t data_checksum = 0; /* used only for debugging at the moment */
int bcast = 1; /* assume all packets are to be broadcasted unless filter tells us differently */
knet_node_id_t dst_host_ids[KNET_MAX_HOST]; /* store destinations from filter */
size_t dst_host_ids_entries = 0;
int send_local = 0; /* send packets to loopback */
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
int msgs_to_send = 0;
if (knet_h->enabled != 1) {
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out;
}
if (knet_h->onwire_ver_remap) {
data = get_data_v1(knet_h, inbuf);
} else {
switch (onwire_ver) {
case 1:
data = get_data_v1(knet_h, inbuf);
break;
default: /* this should never hit as filters are in place in the calling functions */
log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver);
savederrno = EINVAL;
err = -1;
goto out;
break;
}
}
#ifdef ONWIRE_V1_EXTRA_DEBUG
data_checksum = compute_chksum(data, inlen);
#endif
err = _get_data_dests(knet_h, data, inlen,
&channel, &bcast, &send_local,
dst_host_ids, &dst_host_ids_entries,
is_sync);
if (err < 0) {
savederrno = errno;
goto out;
}
/* Send to localhost if appropriate and enabled */
if (send_local) {
err = _dispatch_to_local(knet_h, data, inlen, channel);
if (err < 0) {
savederrno = errno;
goto out;
}
}
err = _compress_data(knet_h, data, &inlen, &data_compressed);
if (err < 0) {
savederrno = errno;
goto out;
}
err = _get_tx_seq_num(knet_h, &tx_seq_num);
if (err < 0) {
savederrno = errno;
goto out;
}
err = _prep_tx_bufs(knet_h, inbuf, onwire_ver, data, inlen, data_checksum, tx_seq_num, channel, bcast, data_compressed, &msgs_to_send, iov_out, &iovcnt_out);
if (err < 0) {
savederrno = errno;
goto out;
}
err = _encrypt_bufs(knet_h, msgs_to_send, iov_out, &iovcnt_out);
if (err < 0) {
savederrno = errno;
goto out;
}
err = _prep_and_send_msgs(knet_h, bcast, dst_host_ids, dst_host_ids_entries, msgs_to_send, iov_out, iovcnt_out);
if (err < 0) {
savederrno = errno;
goto out;
}
out:
errno = savederrno;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, uint8_t onwire_ver, int8_t channel)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;
memset(&iov_in, 0, sizeof(iov_in));
if (knet_h->onwire_ver_remap) {
iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf);
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
} else {
switch (onwire_ver) {
case 1:
iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf);
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
break;
default:
log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver);
break;
}
}
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg.msg_iov, 1);
} else {
inlen = recvmsg(sockfd, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
if (msg.msg_flags & MSG_TRUNC) {
log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
return;
}
}
if (inlen == 0) {
savederrno = 0;
docallback = 1;
} else if (inlen < 0) {
struct epoll_event ev;
savederrno = errno;
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
} else {
_parse_recv_from_sock(knet_h, inlen, channel, onwire_ver, 0);
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS + 1]; /* see _init_epolls for + 1 */
int i, nev;
int flush, flush_queue_limit;
int8_t channel;
uint8_t onwire_ver;
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
memset(&events, 0, sizeof(events));
flush_queue_limit = 0;
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000);
flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
/*
* ideally we want to communicate that we are done flushing
* the queue when we have an epoll timeout event
*/
if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
flush_queue_limit = 0;
}
continue;
}
/*
* fall back in case the TX sockets will continue receive traffic
* and we do not hit an epoll timeout.
*
* allow up to a 100 loops to flush queues, then we give up.
* there might be more clean ways to do it by checking the buffer queue
* on each socket, but we have tons of sockets and calculations can go wrong.
* Also, why would you disable data forwarding and still send packets?
*/
if (flush == KNET_THREAD_QUEUE_FLUSH) {
if (flush_queue_limit >= 100) {
log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
flush_queue_limit = 0;
} else {
flush_queue_limit++;
}
} else {
flush_queue_limit = 0;
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
if (pthread_mutex_lock(&knet_h->onwire_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock");
goto out_unlock;
}
onwire_ver = knet_h->onwire_ver;
pthread_mutex_unlock(&knet_h->onwire_mutex);
for (i = 0; i < nev; i++) {
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, events[i].data.fd, onwire_ver, channel);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
return NULL;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
uint8_t onwire_ver;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->dst_host_filter_fn) {
savederrno = ENETDOWN;
err = -1;
goto out;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
if (pthread_mutex_lock(&knet_h->onwire_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock");
goto out;
}
onwire_ver = knet_h->onwire_ver;
pthread_mutex_unlock(&knet_h->onwire_mutex);
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
if (knet_h->onwire_ver_remap) {
memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len);
} else {
switch (onwire_ver) {
case 1:
memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len);
break;
default:
log_warn(knet_h, KNET_SUB_TX, "preparing sync data onwire version %u not supported", onwire_ver);
goto out_tx;
break;
}
}
err = _parse_recv_from_sock(knet_h, buff_len, channel, onwire_ver, 1);
savederrno = errno;
out_tx:
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
diff --git a/libknet/transport_common.c b/libknet/transport_common.c
index 17298f71..3d62031c 100644
--- a/libknet/transport_common.c
+++ b/libknet/transport_common.c
@@ -1,500 +1,493 @@
/*
* Copyright (C) 2016-2024 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/uio.h>
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transport_common.h"
/*
* reuse Jan Friesse's compat layer as wrapper to drop usage of sendmmsg
*
* TODO: kill those wrappers once we work on packet delivery guarantees
*/
int _recvmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
{
int savederrno = 0, err = 0;
unsigned int i;
for (i = 0; i < vlen; i++) {
err = recvmsg(sockfd, &msgvec[i].msg_hdr, flags);
savederrno = errno;
if (err >= 0) {
msgvec[i].msg_len = err;
if (err == 0) {
/* No point in reading anything more until we know this has been dealt with
or we'll just get a vector full of them. Several in fact */
i++;
break;
}
} else {
if ((i > 0) &&
((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
savederrno = 0;
}
break;
}
}
errno = savederrno;
return ((i > 0) ? (int)i : err);
}
int _sendmmsg(int sockfd, int connection_oriented, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
{
int savederrno = 0, err = 0;
unsigned int i;
struct msghdr temp_msg;
struct msghdr *use_msghdr;
for (i = 0; i < vlen; i++) {
if (connection_oriented == TRANSPORT_PROTO_IS_CONNECTION_ORIENTED) {
memcpy(&temp_msg, &msgvec[i].msg_hdr, sizeof(struct msghdr));
temp_msg.msg_name = NULL;
temp_msg.msg_namelen = 0;
use_msghdr = &temp_msg;
} else {
use_msghdr = &msgvec[i].msg_hdr;
}
err = sendmsg(sockfd, use_msghdr, flags);
savederrno = errno;
if (err < 0) {
break;
}
}
errno = savederrno;
return ((i > 0) ? (int)i : err);
}
/* Assume neither of these constants can ever be zero */
#ifndef SO_RCVBUFFORCE
#define SO_RCVBUFFORCE 0
#endif
#ifndef SO_SNDBUFFORCE
#define SO_SNDBUFFORCE 0
#endif
static int _configure_sockbuf(knet_handle_t knet_h, int sock, int option, int force, int target)
{
int savederrno = 0;
int new_value;
socklen_t value_len = sizeof new_value;
if (setsockopt(sock, SOL_SOCKET, option, &target, sizeof target) != 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSPORT,
"Error setting socket buffer via option %d to value %d: %s\n",
option, target, strerror(savederrno));
errno = savederrno;
return -1;
}
if (getsockopt(sock, SOL_SOCKET, option, &new_value, &value_len) != 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSPORT,
"Error getting socket buffer via option %d: %s\n",
option, strerror(savederrno));
errno = savederrno;
return -1;
}
if (value_len != sizeof new_value) {
log_err(knet_h, KNET_SUB_TRANSPORT,
"Socket option %d returned unexpected size %u\n",
option, value_len);
errno = ERANGE;
return -1;
}
if (target <= new_value) {
return 0;
}
if (!force || !(knet_h->flags & KNET_HANDLE_FLAG_PRIVILEGED)) {
log_err(knet_h, KNET_SUB_TRANSPORT,
"Failed to set socket buffer via option %d to value %d: capped at %d",
option, target, new_value);
if (!(knet_h->flags & KNET_HANDLE_FLAG_PRIVILEGED)) {
log_err(knet_h, KNET_SUB_TRANSPORT,
"Continuing regardless, as the handle is not privileged."
" Expect poor performance!");
return 0;
} else {
errno = ENAMETOOLONG;
return -1;
}
}
if (setsockopt(sock, SOL_SOCKET, force, &target, sizeof target) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSPORT,
"Failed to set socket buffer via force option %d: %s",
force, strerror(savederrno));
if (savederrno == EPERM) {
errno = ENAMETOOLONG;
} else {
errno = savederrno;
}
return -1;
}
return 0;
}
int _configure_common_socket(knet_handle_t knet_h, int sock, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_fdset_cloexec(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_fdset_nonblock(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_configure_sockbuf(knet_h, sock, SO_RCVBUF, SO_RCVBUFFORCE, KNET_RING_RCVBUFF)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_configure_sockbuf(knet_h, sock, SO_SNDBUF, SO_SNDBUFFORCE, KNET_RING_RCVBUFF)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
if (flags & KNET_LINK_FLAG_TRAFFICHIPRIO) {
#ifdef KNET_LINUX
#ifdef SO_PRIORITY
value = 6; /* TC_PRIO_INTERACTIVE */
if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s priority: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "TC_PRIO_INTERACTIVE enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "TC_PRIO_INTERACTIVE not available in this build/platform");
#endif
#endif
#if defined(IP_TOS) && defined(IPTOS_LOWDELAY)
value = IPTOS_LOWDELAY;
if (setsockopt(sock, IPPROTO_IP, IP_TOS, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s priority: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPTOS_LOWDELAY enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPTOS_LOWDELAY not available in this build/platform");
#endif
}
exit_error:
errno = savederrno;
return err;
}
int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_configure_common_socket(knet_h, sock, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
#ifdef KNET_LINUX
#ifdef IP_FREEBIND
value = 1;
if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "FREEBIND enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "FREEBIND not available in this build/platform");
#endif
#endif
#ifdef KNET_BSD
#ifdef IP_BINDANY /* BSD */
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set BINDANY on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "BINDANY enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "BINDANY not available in this build/platform");
#endif
#endif
if (address->ss_family == AF_INET6) {
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
&value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s",
type, strerror(savederrno));
goto exit_error;
}
#ifdef KNET_LINUX
#ifdef IPV6_MTU_DISCOVER
value = IPV6_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_MTU_DISCOVER enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_MTU_DISCOVER not available in this build/platform");
#endif
#endif
#ifdef IPV6_DONTFRAG
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_DONTFRAG enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "IPV6_DONTFRAG not available in this build/platform");
#endif
} else {
#ifdef KNET_LINUX
#ifdef IP_MTU_DISCOVER
value = IP_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "PMTUDISC enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "PMTUDISC not available in this build/platform");
#endif
#endif
#ifdef KNET_BSD
#ifdef IP_DONTFRAG
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSPORT, "DONTFRAG enabled on socket: %i", sock);
#else
log_debug(knet_h, KNET_SUB_TRANSPORT, "DONTFRAG not available in this build/platform");
#endif
#endif
}
exit_error:
errno = savederrno;
return err;
}
int _init_socketpair(knet_handle_t knet_h, int *sock)
{
int err = 0, savederrno = 0;
int i;
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
for (i = 0; i < 2; i++) {
if (_configure_common_socket(knet_h, sock[i], 0, "local socketpair") < 0) {
savederrno = errno;
err = -1;
goto exit_fail;
}
}
exit_fail:
errno = savederrno;
return err;
}
void _close_socketpair(knet_handle_t knet_h, int *sock)
{
int i;
for (i = 0; i < 2; i++) {
if (sock[i]) {
close(sock[i]);
sock[i] = 0;
}
}
}
/*
* must be called with global read lock
*
* return -1 on error
* return 0 if fd is invalid
* return 1 if fd is valid
*/
int _is_valid_fd(knet_handle_t knet_h, int sockfd)
{
int ret = 0;
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
if (sockfd >= KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
if (knet_h->knet_transport_fd_tracker[sockfd].transport >= KNET_MAX_TRANSPORTS) {
ret = 0;
} else {
ret = 1;
}
return ret;
}
/*
* must be called with global write lock
*/
int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, socklen_t socklen, void *data, int ifindex)
{
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
if (sockfd >= KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
knet_h->knet_transport_fd_tracker[sockfd].transport = transport;
knet_h->knet_transport_fd_tracker[sockfd].data_type = data_type;
knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len = socklen;
knet_h->knet_transport_fd_tracker[sockfd].data = data;
knet_h->knet_transport_fd_tracker[sockfd].ifindex = ifindex;
return 0;
}
/*
* Wrapper function for writev that retries until all data is written.
*/
ssize_t writev_all(knet_handle_t knet_h, int fd, struct iovec *iov, int iovcnt, struct knet_link *local_link, uint8_t log_subsys)
{
- ssize_t total_written = 0; // Total bytes written
- ssize_t result;
- size_t total_bytes = 0;
- int i;
-
- for (i=0; i<iovcnt; i++) {
- total_bytes += iov[i].iov_len;
- }
-
- while (iovcnt > 0) {
- result = writev(fd, iov, iovcnt);
- if (result < 0) {
- /* retry on signal */
- if (errno == EINTR) {
- continue;
- }
- /* Other errors */
- return -1;
- }
-
- total_written += result;
-
- /* Adjust iovec array to account for the bytes already written */
- size_t bytes_left = result;
- int old_iovcnt = iovcnt;
- for (int i = 0; i < old_iovcnt; i++) {
- if (bytes_left >= iov[i].iov_len) {
- bytes_left -= iov[i].iov_len;
- iov++;
- iovcnt--;
- if (local_link != NULL) {
- local_link->status.stats.tx_data_retries++;
- }
- log_debug(knet_h, log_subsys, "writev incomplete=%zd bytes of %zu\n", result, total_bytes);
- } else {
- /* Adjust the current iovec to start at the remaining data */
- iov[i].iov_base = (char *)iov[i].iov_base + bytes_left;
- iov[i].iov_len -= bytes_left;
- break;
- }
- }
- }
-
- return total_written;
+ ssize_t total_written = 0; /* Total bytes written */
+ ssize_t written; /* Bytes written by single writev */
+ int iov_index = 0;
+
+ for (;;) {
+ written = writev(fd, iov, iovcnt);
+
+ if (written < 0) {
+ /* retry on signal */
+ if (errno == EINTR) {
+ continue;
+ }
+ /* Other errors */
+ return -1;
+ }
+
+ total_written += written;
+
+ while ((size_t)written >= iov[iov_index].iov_len) {
+ written -= iov[iov_index].iov_len;
+ iov_index++;
+ if (iov_index >= iovcnt) {
+ /* Everything written */
+ goto out;
+ }
+ }
+
+ iov[iov_index].iov_base = (char *)iov[iov_index].iov_base + written;
+ iov[iov_index].iov_len -= written;
+
+ if (local_link != NULL) {
+ local_link->status.stats.tx_data_retries++;
+ }
+ }
+
+out:
+ return total_written;
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jun 26, 5:52 PM (22 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1959276
Default Alt Text
(42 KB)

Event Timeline