diff --git a/libknet/crypto_openssl.c b/libknet/crypto_openssl.c index f584c910..6ffc043e 100644 --- a/libknet/crypto_openssl.c +++ b/libknet/crypto_openssl.c @@ -1,616 +1,602 @@ /* * Copyright (C) 2017 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #define KNET_MODULE #include "config.h" #include #include #include #include #include #include #include #include #include "logging.h" #include "crypto_model.h" /* * 1.0.2 requires at least 120 bytes * 1.1.0 requires at least 256 bytes */ #define SSLERR_BUF_SIZE 512 /* * crypto definitions and conversion tables */ #define SALT_SIZE 16 struct opensslcrypto_instance { void *private_key; int private_key_len; const EVP_CIPHER *crypto_cipher_type; const EVP_MD *crypto_hash_type; }; /* * crypt/decrypt functions openssl1.0 */ #ifdef BUILDCRYPTOOPENSSL10 static int encrypt_openssl( knet_handle_t knet_h, const struct iovec *iov, int iovcnt, unsigned char *buf_out, ssize_t *buf_out_len) { struct opensslcrypto_instance *instance = knet_h->crypto_instance->model_instance; EVP_CIPHER_CTX ctx; int tmplen = 0, offset = 0; unsigned char *salt = buf_out; unsigned char *data = buf_out + SALT_SIZE; int err = 0; int i; char sslerr[SSLERR_BUF_SIZE]; EVP_CIPHER_CTX_init(&ctx); /* * contribute to PRNG for each packet we send/receive */ RAND_seed((unsigned char *)iov[iovcnt - 1].iov_base, iov[iovcnt - 1].iov_len); if (!RAND_bytes(salt, SALT_SIZE)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to get random salt data: %s", sslerr); err = -1; goto out; } /* * add warning re keylength */ EVP_EncryptInit_ex(&ctx, instance->crypto_cipher_type, NULL, instance->private_key, salt); for (i=0; icrypto_instance->model_instance; EVP_CIPHER_CTX ctx; int tmplen1 = 0, tmplen2 = 0; unsigned char *salt = (unsigned char *)buf_in; unsigned char *data = salt + SALT_SIZE; int datalen = buf_in_len - SALT_SIZE; int err = 0; char sslerr[SSLERR_BUF_SIZE]; EVP_CIPHER_CTX_init(&ctx); /* * contribute to PRNG for each packet we send/receive */ RAND_seed(buf_in, buf_in_len); /* * add warning re keylength */ EVP_DecryptInit_ex(&ctx, instance->crypto_cipher_type, NULL, instance->private_key, salt); if (!EVP_DecryptUpdate(&ctx, buf_out, &tmplen1, data, datalen)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to decrypt: %s", sslerr); err = -1; goto out; } if (!EVP_DecryptFinal_ex(&ctx, buf_out + tmplen1, &tmplen2)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to finalize decrypt: %s", sslerr); err = -1; goto out; } *buf_out_len = tmplen1 + tmplen2; out: EVP_CIPHER_CTX_cleanup(&ctx); return err; } #endif #ifdef BUILDCRYPTOOPENSSL11 static int encrypt_openssl( knet_handle_t knet_h, const struct iovec *iov, int iovcnt, unsigned char *buf_out, ssize_t *buf_out_len) { struct opensslcrypto_instance *instance = knet_h->crypto_instance->model_instance; EVP_CIPHER_CTX *ctx; int tmplen = 0, offset = 0; unsigned char *salt = buf_out; unsigned char *data = buf_out + SALT_SIZE; int err = 0; int i; char sslerr[SSLERR_BUF_SIZE]; ctx = EVP_CIPHER_CTX_new(); /* * contribute to PRNG for each packet we send/receive */ RAND_seed((unsigned char *)iov[iovcnt - 1].iov_base, iov[iovcnt - 1].iov_len); if (!RAND_bytes(salt, SALT_SIZE)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to get random salt data: %s", sslerr); err = -1; goto out; } /* * add warning re keylength */ EVP_EncryptInit_ex(ctx, instance->crypto_cipher_type, NULL, instance->private_key, salt); for (i=0; icrypto_instance->model_instance; EVP_CIPHER_CTX *ctx; int tmplen1 = 0, tmplen2 = 0; unsigned char *salt = (unsigned char *)buf_in; unsigned char *data = salt + SALT_SIZE; int datalen = buf_in_len - SALT_SIZE; int err = 0; char sslerr[SSLERR_BUF_SIZE]; ctx = EVP_CIPHER_CTX_new(); /* * contribute to PRNG for each packet we send/receive */ RAND_seed(buf_in, buf_in_len); /* * add warning re keylength */ EVP_DecryptInit_ex(ctx, instance->crypto_cipher_type, NULL, instance->private_key, salt); if (!EVP_DecryptUpdate(ctx, buf_out, &tmplen1, data, datalen)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to decrypt: %s", sslerr); err = -1; goto out; } if (!EVP_DecryptFinal_ex(ctx, buf_out + tmplen1, &tmplen2)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to finalize decrypt: %s", sslerr); err = -1; goto out; } *buf_out_len = tmplen1 + tmplen2; out: EVP_CIPHER_CTX_free(ctx); return err; } #endif /* * hash/hmac/digest functions */ static int calculate_openssl_hash( knet_handle_t knet_h, const unsigned char *buf, const size_t buf_len, unsigned char *hash) { struct opensslcrypto_instance *instance = knet_h->crypto_instance->model_instance; unsigned int hash_len = 0; unsigned char *hash_out = NULL; char sslerr[SSLERR_BUF_SIZE]; hash_out = HMAC(instance->crypto_hash_type, instance->private_key, instance->private_key_len, buf, buf_len, hash, &hash_len); if ((!hash_out) || (hash_len != knet_h->sec_hash_size)) { ERR_error_string_n(ERR_get_error(), sslerr, sizeof(sslerr)); log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to calculate hash: %s", sslerr); return -1; } return 0; } /* * exported API */ static int opensslcrypto_encrypt_and_signv ( knet_handle_t knet_h, const struct iovec *iov_in, int iovcnt_in, unsigned char *buf_out, ssize_t *buf_out_len) { struct opensslcrypto_instance *instance = knet_h->crypto_instance->model_instance; int i; if (instance->crypto_cipher_type) { if (encrypt_openssl(knet_h, iov_in, iovcnt_in, buf_out, buf_out_len) < 0) { return -1; } } else { *buf_out_len = 0; for (i=0; icrypto_hash_type) { if (calculate_openssl_hash(knet_h, buf_out, *buf_out_len, buf_out + *buf_out_len) < 0) { return -1; } *buf_out_len = *buf_out_len + knet_h->sec_hash_size; } return 0; } static int opensslcrypto_encrypt_and_sign ( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { struct iovec iov_in; memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (unsigned char *)buf_in; iov_in.iov_len = buf_in_len; return opensslcrypto_encrypt_and_signv(knet_h, &iov_in, 1, buf_out, buf_out_len); } static int opensslcrypto_authenticate_and_decrypt ( knet_handle_t knet_h, const unsigned char *buf_in, const ssize_t buf_in_len, unsigned char *buf_out, ssize_t *buf_out_len) { struct opensslcrypto_instance *instance = knet_h->crypto_instance->model_instance; ssize_t temp_len = buf_in_len; if (instance->crypto_hash_type) { unsigned char tmp_hash[knet_h->sec_hash_size]; ssize_t temp_buf_len = buf_in_len - knet_h->sec_hash_size; if ((temp_buf_len < 0) || (temp_buf_len > KNET_MAX_PACKET_SIZE)) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Incorrect packet size."); return -1; } if (calculate_openssl_hash(knet_h, buf_in, temp_buf_len, tmp_hash) < 0) { return -1; } if (memcmp(tmp_hash, buf_in + temp_buf_len, knet_h->sec_hash_size) != 0) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Digest does not match"); return -1; } temp_len = temp_len - knet_h->sec_hash_size; *buf_out_len = temp_len; } if (instance->crypto_cipher_type) { if (decrypt_openssl(knet_h, buf_in, temp_len, buf_out, buf_out_len) < 0) { return -1; } } else { memmove(buf_out, buf_in, temp_len); *buf_out_len = temp_len; } return 0; } #ifdef BUILDCRYPTOOPENSSL10 static pthread_mutex_t *openssl_internal_lock; -static long *openssl_internal_lock_count; static void openssl_internal_locking_callback(int mode, int type, char *file, int line) { if (mode & CRYPTO_LOCK) { pthread_mutex_lock(&(openssl_internal_lock[type])); - openssl_internal_lock_count[type]++; } else { pthread_mutex_unlock(&(openssl_internal_lock[type])); } } static unsigned long openssl_internal_thread_id(void) { return (unsigned long)pthread_self(); } static void openssl_internal_lock_cleanup(void) { int i; CRYPTO_set_locking_callback(NULL); CRYPTO_set_id_callback(NULL); for (i = 0; i < CRYPTO_num_locks(); i++) { pthread_mutex_destroy(&(openssl_internal_lock[i])); } - if (openssl_internal_lock_count) { - free(openssl_internal_lock_count); - } - if (openssl_internal_lock) { free(openssl_internal_lock); } return; } static int openssl_internal_lock_setup(void) { int savederrno = 0, err = 0; int i; openssl_internal_lock = malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t)); if (!openssl_internal_lock) { savederrno = errno; err = -1; goto out; } - openssl_internal_lock_count = malloc(CRYPTO_num_locks() * sizeof(long)); - if (!openssl_internal_lock_count) { - savederrno = errno; - err = -1; - goto out; - } - for (i = 0; i < CRYPTO_num_locks(); i++) { - openssl_internal_lock_count[i] = 0; savederrno = pthread_mutex_init(&(openssl_internal_lock[i]), NULL); if (savederrno) { err = -1; goto out; } } CRYPTO_set_id_callback((unsigned long (*)(void))openssl_internal_thread_id); CRYPTO_set_locking_callback((void *)&openssl_internal_locking_callback); out: if (err) { openssl_internal_lock_cleanup(); } errno = savederrno; return err; } #endif static void opensslcrypto_fini( knet_handle_t knet_h) { struct opensslcrypto_instance *opensslcrypto_instance = knet_h->crypto_instance->model_instance; if (opensslcrypto_instance) { #ifdef BUILDCRYPTOOPENSSL10 openssl_internal_lock_cleanup(); #endif if (opensslcrypto_instance->private_key) { free(opensslcrypto_instance->private_key); opensslcrypto_instance->private_key = NULL; } free(opensslcrypto_instance); knet_h->crypto_instance->model_instance = NULL; knet_h->sec_header_size = 0; } return; } static int opensslcrypto_init( knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg) { static int openssl_is_init = 0; struct opensslcrypto_instance *opensslcrypto_instance = NULL; log_debug(knet_h, KNET_SUB_OPENSSLCRYPTO, "Initizializing openssl crypto module [%s/%s]", knet_handle_crypto_cfg->crypto_cipher_type, knet_handle_crypto_cfg->crypto_hash_type); if (!openssl_is_init) { #ifdef BUILDCRYPTOOPENSSL10 ERR_load_crypto_strings(); OPENSSL_add_all_algorithms_noconf(); #endif #ifdef BUILDCRYPTOOPENSSL11 if (!OPENSSL_init_crypto(OPENSSL_INIT_ADD_ALL_CIPHERS \ | OPENSSL_INIT_ADD_ALL_DIGESTS, NULL)) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to init openssl"); errno = EAGAIN; return -1; } #endif openssl_is_init = 1; } #ifdef BUILDCRYPTOOPENSSL10 if (openssl_internal_lock_setup() < 0) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to init openssl"); errno = EAGAIN; return -1; } #endif knet_h->crypto_instance->model_instance = malloc(sizeof(struct opensslcrypto_instance)); if (!knet_h->crypto_instance->model_instance) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to allocate memory for openssl model instance"); return -1; } opensslcrypto_instance = knet_h->crypto_instance->model_instance; memset(opensslcrypto_instance, 0, sizeof(struct opensslcrypto_instance)); if (strcmp(knet_handle_crypto_cfg->crypto_cipher_type, "none") == 0) { opensslcrypto_instance->crypto_cipher_type = NULL; } else { opensslcrypto_instance->crypto_cipher_type = EVP_get_cipherbyname(knet_handle_crypto_cfg->crypto_cipher_type); if (!opensslcrypto_instance->crypto_cipher_type) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "unknown crypto cipher type requested"); goto out_err; } } if (strcmp(knet_handle_crypto_cfg->crypto_hash_type, "none") == 0) { opensslcrypto_instance->crypto_hash_type = NULL; } else { opensslcrypto_instance->crypto_hash_type = EVP_get_digestbyname(knet_handle_crypto_cfg->crypto_hash_type); if (!opensslcrypto_instance->crypto_hash_type) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "unknown crypto hash type requested"); goto out_err; } } if ((opensslcrypto_instance->crypto_cipher_type) && (!opensslcrypto_instance->crypto_hash_type)) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "crypto communication requires hash specified"); goto out_err; } opensslcrypto_instance->private_key = malloc(knet_handle_crypto_cfg->private_key_len); if (!opensslcrypto_instance->private_key) { log_err(knet_h, KNET_SUB_OPENSSLCRYPTO, "Unable to allocate memory for openssl private key"); goto out_err; } memmove(opensslcrypto_instance->private_key, knet_handle_crypto_cfg->private_key, knet_handle_crypto_cfg->private_key_len); opensslcrypto_instance->private_key_len = knet_handle_crypto_cfg->private_key_len; knet_h->sec_header_size = 0; if (opensslcrypto_instance->crypto_hash_type) { knet_h->sec_hash_size = EVP_MD_size(opensslcrypto_instance->crypto_hash_type); knet_h->sec_header_size += knet_h->sec_hash_size; } if (opensslcrypto_instance->crypto_cipher_type) { int block_size; block_size = EVP_CIPHER_block_size(opensslcrypto_instance->crypto_cipher_type); if (block_size < 0) { goto out_err; } knet_h->sec_header_size += (block_size * 2); knet_h->sec_header_size += SALT_SIZE; knet_h->sec_salt_size = SALT_SIZE; knet_h->sec_block_size = block_size; } return 0; out_err: opensslcrypto_fini(knet_h); return -1; } crypto_ops_t crypto_model = { KNET_CRYPTO_MODEL_ABI, opensslcrypto_init, opensslcrypto_fini, opensslcrypto_encrypt_and_sign, opensslcrypto_encrypt_and_signv, opensslcrypto_authenticate_and_decrypt }; diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index dc540d77..d7796c10 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -1,1252 +1,1296 @@ /* * Copyright (C) 2016-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "internals.h" #include "netutils.h" #include "transport_common.h" #include "threads_common.h" #include "test-common.h" #define MAX_NODES 128 static int senderid = -1; static int thisnodeid = -1; static knet_handle_t knet_h; static int datafd = 0; static int8_t channel = 0; static int globallistener = 0; static int continous = 0; static int show_stats = 0; static struct sockaddr_storage allv4; static struct sockaddr_storage allv6; static int broadcast_test = 1; static pthread_t rx_thread = (pthread_t)NULL; static char *rx_buf[PCKT_FRAG_MAX]; static int wait_for_perf_rx = 0; static char *compresscfg = NULL; static char *cryptocfg = NULL; +static int machine_output = 0; static int bench_shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; #define TEST_PING 0 #define TEST_PING_AND_DATA 1 #define TEST_PERF_BY_SIZE 2 #define TEST_PERF_BY_TIME 3 static int test_type = TEST_PING; #define TEST_START 2 #define TEST_STOP 4 #define TEST_COMPLETE 6 #define ONE_GIGABYTE 1073741824 static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE; static uint64_t perf_by_time_secs = 10; struct node { int nodeid; int links; + uint8_t transport[KNET_MAX_LINK]; struct sockaddr_storage address[KNET_MAX_LINK]; }; static void print_help(void) { printf("knet_bench usage:\n"); printf(" -h print this help (no really)\n"); printf(" -d enable debug logs (default INFO)\n"); printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n"); printf(" Example: -c nss:aes128:sha1\n"); printf(" -z [implementation]:[level]:[threshold] compress configuration. (default disabled)\n"); printf(" Example: -z zlib:5:100\n"); printf(" -p [active|passive|rr] (default: passive)\n"); - printf(" -P [udp|sctp] (default: udp) protocol (transport) to use\n"); + printf(" -P [UDP|SCTP] (default: UDP) protocol (transport) to use for all links\n"); printf(" -t [nodeid] This nodeid (required)\n"); - printf(" -n [nodeid],[link1_ip_addr],[link2_..] Other nodes information (at least one required)\n"); - printf(" Example: -t 1,192.168.8.1,3ffe::8:1,..\n"); + printf(" -n [nodeid],[proto]/[link1_ip],[link2_..] Other nodes information (at least one required)\n"); + printf(" Example: -t 1,192.168.8.1,SCTP/3ffe::8:1,UDP/172...\n"); printf(" can be repeated up to %d and should contain also the localnode info\n", MAX_NODES); printf(" -b [port] baseport (default: 50000)\n"); printf(" -l enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n"); printf(" -o enable baseport offset per nodeid\n"); + printf(" -m change PMTUd interval in seconds (default: 60)\n"); printf(" -w dont wait for all nodes to be up before starting the test (default: wait)\n"); printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n"); printf(" test type (default: ping)\n"); printf(" ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n"); printf(" ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n"); printf(" perf-by-size: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size/quantity of packets and measuring the time, then quit\n"); printf(" perf-by-time: will wait for all hosts to join the knet network,\n"); printf(" perform a series of benchmarks by transmitting a known\n"); printf(" size of packets for a given amount of time (10 seconds)\n"); printf(" and measuring the quantity of data transmitted, then quit\n"); printf(" -s nodeid that will generate traffic for benchmarks\n"); printf(" -S [size|seconds] when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n"); printf(" when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n"); printf(" -C repeat the test continously (default: off)\n"); printf(" -X[XX] show stats at the end of the run (default: 1)\n"); printf(" 1: show handle stats, 2: show summary link stats\n"); printf(" 3: show detailed link stats\n"); + printf(" -a enable machine parsable output (default: off).\n"); } static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx) { int i; char *temp = NULL; char port_str[10]; memset(port_str, 0, sizeof(port_str)); sprintf(port_str, "%d", port); for (i = 0; i < onidx; i++) { nodes[i].nodeid = atoi(strtok(nodesinfo[i], ",")); if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) { printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST); exit(FAIL); } if (thisnodeid == nodes[i].nodeid) { *thisidx = i; } while((temp = strtok(NULL, ","))) { + char *slash = NULL; + uint8_t transport; + if (nodes[i].links == KNET_MAX_LINK) { printf("Too many links configured. Max %d\n", KNET_MAX_LINK); exit(FAIL); } + + slash = strstr(temp, "/"); + if (slash) { + memset(slash, 0, 1); + transport = knet_get_transport_id_by_name(temp); + if (transport == KNET_MAX_TRANSPORTS) { + printf("Unknown transport: %s\n", temp); + exit(FAIL); + } + nodes[i].transport[nodes[i].links] = transport; + temp = slash + 1; + } else { + nodes[i].transport[nodes[i].links] = KNET_TRANSPORT_UDP; + } + if (knet_strtoaddr(temp, port_str, &nodes[i].address[nodes[i].links], sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert %s to sockaddress\n", temp); exit(FAIL); } nodes[i].links++; } } if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert 0.0.0.0 to sockaddress\n"); exit(FAIL); } if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert :: to sockaddress\n"); exit(FAIL); } for (i = 1; i < onidx; i++) { if (nodes[0].links != nodes[i].links) { printf("knet_bench does not support unbalanced link configuration\n"); exit(FAIL); } } return; } static int private_data; static void sock_notify(void *pvt_data, int local_datafd, int8_t local_channel, uint8_t tx_rx, int error, int errorno) { - printf("Error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd); + printf("[info]: error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd); return; } static int ping_dst_host_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_host_id, int8_t *dst_channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries) { if (broadcast_test) { return 1; } if (tx_rx == KNET_NOTIFY_TX) { memmove(&dst_host_ids[0], outdata, 2); } else { dst_host_ids[0] = this_host_id; } *dst_host_ids_entries = 1; return 0; } static void setup_knet(int argc, char *argv[]) { - int logfd; + int logfd = 0; int rv; char *policystr = NULL, *protostr = NULL; char *othernodeinfo[MAX_NODES]; struct node nodes[MAX_NODES]; int thisidx = -1; int onidx = 0; int debug = KNET_LOG_INFO; int port = 50000, portoffset = 0; int thisport = 0, otherport = 0; int thisnewport = 0, othernewport = 0; struct sockaddr_in *so_in; struct sockaddr_in6 *so_in6; struct sockaddr_storage *src; int i, link_idx, allnodesup = 0; int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0; int protocol = KNET_TRANSPORT_UDP, protofound = 0; int wait = 1; + int pmtud_interval = 60; struct knet_handle_crypto_cfg knet_handle_crypto_cfg; char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL; struct knet_handle_compress_cfg knet_handle_compress_cfg; memset(nodes, 0, sizeof(nodes)); - while ((rv = getopt(argc, argv, "CT:S:s:ldowb:t:n:c:p:X::P:z:h")) != EOF) { + while ((rv = getopt(argc, argv, "aCT:S:s:ldom:wb:t:n:c:p:X::P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); exit(PASS); break; + case 'a': + machine_output = 1; + break; case 'd': debug = KNET_LOG_DEBUG; break; case 'c': if (cryptocfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } cryptocfg = optarg; break; case 'p': if (policystr) { printf("Error: -p can only be specified once\n"); exit(FAIL); } policystr = optarg; if (!strcmp(policystr, "active")) { policy = KNET_LINK_POLICY_ACTIVE; policyfound = 1; } /* * we can't use rr because clangs can't compile * an array of 3 strings, one of which is 2 bytes long */ if (!strcmp(policystr, "round-robin")) { policy = KNET_LINK_POLICY_RR; policyfound = 1; } if (!strcmp(policystr, "passive")) { policy = KNET_LINK_POLICY_PASSIVE; policyfound = 1; } if (!policyfound) { printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr); exit(FAIL); } break; case 'P': if (protostr) { printf("Error: -P can only be specified once\n"); exit(FAIL); } protostr = optarg; - if (!strcmp(protostr, "udp")) { + if (!strcmp(protostr, "UDP")) { protocol = KNET_TRANSPORT_UDP; protofound = 1; } - if (!strcmp(protostr, "sctp")) { + if (!strcmp(protostr, "SCTP")) { protocol = KNET_TRANSPORT_SCTP; protofound = 1; } if (!protofound) { printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr); exit(FAIL); } break; case 't': if (thisnodeid >= 0) { printf("Error: -t can only be specified once\n"); exit(FAIL); } thisnodeid = atoi(optarg); if ((thisnodeid < 0) || (thisnodeid > 65536)) { printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid); exit(FAIL); } break; case 'n': if (onidx == MAX_NODES) { printf("Error: too many other nodes. Max %d\n", MAX_NODES); exit(FAIL); } othernodeinfo[onidx] = optarg; onidx++; break; case 'b': port = atoi(optarg); if ((port < 1) || (port > 65536)) { printf("Error: port %d out of range (1 - 65536)\n", port); exit(FAIL); } break; case 'o': if (globallistener) { printf("Error: -l cannot be used with -o\n"); exit(FAIL); } portoffset = 1; break; + case 'm': + pmtud_interval = atoi(optarg); + if (pmtud_interval < 1) { + printf("Error: pmtud interval %d out of range (> 0)\n", pmtud_interval); + exit(FAIL); + } + break; case 'l': if (portoffset) { printf("Error: -o cannot be used with -l\n"); exit(FAIL); } globallistener = 1; break; case 'w': wait = 0; break; case 's': if (senderid >= 0) { printf("Error: -s can only be specified once\n"); exit(FAIL); } senderid = atoi(optarg); if ((senderid < 0) || (senderid > 65536)) { printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid); exit(FAIL); } break; case 'T': if (!strcmp("ping", optarg)) { test_type = TEST_PING; } if (!strcmp("ping_data", optarg)) { test_type = TEST_PING_AND_DATA; } if (!strcmp("perf-by-size", optarg)) { test_type = TEST_PERF_BY_SIZE; } if (!strcmp("perf-by-time", optarg)) { test_type = TEST_PERF_BY_TIME; } break; case 'S': perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE; perf_by_time_secs = (uint64_t)atoi(optarg); break; case 'C': continous = 1; break; case 'X': if (optarg) { show_stats = atoi(optarg); } else { show_stats = 1; } break; case 'z': if (compresscfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } compresscfg = optarg; break; default: break; } } if (thisnodeid < 0) { printf("Who am I?!? missing -t from command line?\n"); exit(FAIL); } if (onidx < 1) { printf("no other nodes configured?!? missing -n from command line\n"); exit(FAIL); } parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx); if (thisidx < 0) { printf("no config for this node found\n"); exit(FAIL); } if (senderid >= 0) { for (i=0; i < onidx; i++) { if (senderid == nodes[i].nodeid) { break; } } if (i == onidx) { printf("Unable to find senderid in nodelist\n"); exit(FAIL); } } if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) { printf("Error: performance test requires -s to be set (for now)\n"); exit(FAIL); } logfd = start_logging(stdout); knet_h = knet_handle_new(thisnodeid, logfd, debug); if (!knet_h) { printf("Unable to knet_handle_new: %s\n", strerror(errno)); exit(FAIL); } if (cryptocfg) { memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg)); cryptomodel = strtok(cryptocfg, ":"); cryptotype = strtok(NULL, ":"); cryptohash = strtok(NULL, ":"); if (cryptomodel) { strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1); } if (cryptotype) { strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1); } if (cryptohash) { strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1); } knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN; if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) { printf("Unable to init crypto\n"); exit(FAIL); } } if (compresscfg) { memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":")); knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":")); knet_handle_compress_cfg.compress_threshold = atoi(strtok(NULL, ":")); if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) { printf("Unable to configure compress\n"); exit(FAIL); } } if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } datafd = 0; channel = -1; if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) { printf("knet_handle_add_datafd failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } - if (knet_handle_pmtud_setfreq(knet_h, 60) < 0) { + if (knet_handle_pmtud_setfreq(knet_h, pmtud_interval) < 0) { printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_host_add(knet_h, nodes[i].nodeid) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) { printf("knet_host_set_policy failed: %s\n", strerror(errno)); exit(FAIL); } for (link_idx = 0; link_idx < nodes[i].links; link_idx++) { if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in->sin_port); thisnewport = thisport + nodes[i].nodeid; so_in->sin_port = (htons(thisnewport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; otherport = ntohs(so_in->sin_port); othernewport = otherport + nodes[thisidx].nodeid; so_in->sin_port = (htons(othernewport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in6->sin6_port); thisnewport = thisport + nodes[i].nodeid; so_in6->sin6_port = (htons(thisnewport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; otherport = ntohs(so_in6->sin6_port); othernewport = otherport + nodes[thisidx].nodeid; so_in6->sin6_port = (htons(othernewport)); } } if (!globallistener) { src = &nodes[thisidx].address[link_idx]; } else { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { src = &allv4; } else { src = &allv6; } } + /* + * -P overrides per link protocol configuration + */ + if (protofound) { + nodes[i].transport[link_idx] = protocol; + } if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx, - protocol, src, + nodes[i].transport[link_idx], src, &nodes[i].address[link_idx], 0) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); exit(FAIL); } if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; so_in->sin_port = (htons(thisport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; so_in->sin_port = (htons(otherport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; so_in6->sin6_port = (htons(thisport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; so_in6->sin6_port = (htons(otherport)); } } if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) { printf("knet_link_set_ping_timers failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) { printf("knet_link_set_pong_count failed: %s\n", strerror(errno)); exit(FAIL); } } } if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); exit(FAIL); } if (wait) { while(!allnodesup) { allnodesup = 1; for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } - if(knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) { - printf("waiting host %d to be reachable\n", nodes[i].nodeid); + if (knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) { + printf("[info]: waiting host %d to be reachable\n", nodes[i].nodeid); allnodesup = 0; } } if (!allnodesup) { sleep(1); } } sleep(1); } } static void *_rx_thread(void *args) { int rx_epoll; struct epoll_event ev; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, msg_recv; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; uint64_t rx_pkts = 0; uint64_t rx_bytes = 0; unsigned int current_pckt_size = 0; for (i = 0; i < PCKT_FRAG_MAX; i++) { rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!rx_buf[i]) { - printf("RXT: Unable to malloc!\n"); + printf("RXT: Unable to malloc!\nHALTING RX THREAD!\n"); return NULL; } memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_in[i].iov_base = (void *)rx_buf[i]; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (rx_epoll < 0) { printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n"); return NULL; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = datafd; if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) { printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n"); return NULL; } memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (!bench_shutdown_in_progress) { if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) { msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg_recv < 0) { - printf("RXT: error from recvmmsg: %s\n", strerror(errno)); + printf("[info]: RXT: error from recvmmsg: %s\n", strerror(errno)); } switch(test_type) { case TEST_PING_AND_DATA: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len == 0) { - printf("RXT: received 0 bytes message?\n"); + printf("[info]: RXT: received 0 bytes message?\n"); } - printf("received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base); + printf("[info]: received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base); } break; case TEST_PERF_BY_TIME: case TEST_PERF_BY_SIZE: for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len < 64) { if (msg[i].msg_len == 0) { - printf("RXT: received 0 bytes message?\n"); + printf("[info]: RXT: received 0 bytes message?\n"); } if (msg[i].msg_len == TEST_START) { if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { - printf("Unable to get start time!\n"); + printf("[info]: unable to get start time!\n"); } } if (msg[i].msg_len == TEST_STOP) { double average_rx_mbytes; double average_rx_pkts; double time_diff_sec; if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { - printf("Unable to get end time!\n"); + printf("[info]: unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); /* * adjust for sleep(2) between sending the last data and TEST_STOP */ time_diff = time_diff - 2000000000llu; /* * convert to seconds */ time_diff_sec = (double)time_diff / 1000000000llu; average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024)); average_rx_pkts = (double)(rx_pkts / time_diff_sec); - printf("Execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n", time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts); + if (!machine_output) { + printf("[perf] execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n", + time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts); + } else { + printf("[perf],%.4f,%u,%" PRIu64 ",%.4f,%.4f\n", time_diff_sec, current_pckt_size, rx_pkts, average_rx_mbytes, average_rx_pkts); + } rx_pkts = 0; rx_bytes = 0; current_pckt_size = 0; } if (msg[i].msg_len == TEST_COMPLETE) { wait_for_perf_rx = 1; } continue; } rx_pkts++; rx_bytes = rx_bytes + msg[i].msg_len; current_pckt_size = msg[i].msg_len; } break; } } } epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev); close(rx_epoll); return NULL; } static void setup_data_txrx_common(void) { if (!rx_thread) { if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } - printf("Setting up rx thread\n"); + printf("[info]: setting up rx thread\n"); if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) { printf("Unable to start rx thread\n"); exit(FAIL); } } } static void stop_rx_thread(void) { void *retval; int i; if (rx_thread) { - printf("Shutting down rx thread\n"); + printf("[info]: shutting down rx thread\n"); sleep(2); pthread_cancel(rx_thread); pthread_join(rx_thread, &retval); for (i = 0; i < PCKT_FRAG_MAX; i ++) { free(rx_buf[i]); } } } static void send_ping_data(void) { char buf[65535]; ssize_t len; memset(&buf, 0, sizeof(buf)); snprintf(buf, sizeof(buf), "Hello world!"); if (compresscfg) { len = sizeof(buf); } else { len = strlen(buf); } if (knet_send(knet_h, buf, len, channel) != len) { - printf("Error sending hello world: %s\n", strerror(errno)); + printf("[info]: Error sending hello world: %s\n", strerror(errno)); } sleep(1); } static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send) { int sent_msgs, prev_sent, progress, total_sent; total_sent = 0; sent_msgs = 0; prev_sent = 0; progress = 1; retry: errno = 0; sent_msgs = _sendmmsg(datafd, &msg[0], msgs_to_send, MSG_NOSIGNAL); if (sent_msgs < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { usleep(KNET_THREADS_TIMERES / 16); goto retry; } - printf("Unable to send messages: %s\n", strerror(errno)); + printf("[info]: Unable to send messages: %s\n", strerror(errno)); return -1; } total_sent = total_sent + sent_msgs; if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) { if ((sent_msgs) || (progress)) { msgs_to_send = msgs_to_send - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } goto retry; } if (!progress) { - printf("Unable to send more messages after retry\n"); + printf("[info]: Unable to send more messages after retry\n"); } } return total_sent; } static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[]) { int i; for (i = 0; i < PCKT_FRAG_MAX; i++) { tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!tx_buf[i]) { printf("TXT: Unable to malloc!\n"); return -1; } memset(tx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_out[i].iov_base = (void *)tx_buf[i]; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_iov = &iov_out[i]; msg[i].msg_hdr.msg_iovlen = 1; } return 0; } static void send_perf_data_by_size(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; int i; uint64_t total_pkts_to_tx; uint64_t packets_to_send; uint32_t packetsize = 64; setup_send_buffers_common(msg, iov_out, tx_buf); while (packetsize <= KNET_MAX_PACKET_SIZE) { for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } total_pkts_to_tx = perf_by_size_size / packetsize; - printf("Testing with %u packet size. Total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx); + printf("[info]: testing with %u packet size. total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); while (total_pkts_to_tx > 0) { if (total_pkts_to_tx >= PCKT_FRAG_MAX) { packets_to_send = PCKT_FRAG_MAX; } else { packets_to_send = total_pkts_to_tx; } sent_msgs = send_messages(&msg[0], packets_to_send); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } total_pkts_to_tx = total_pkts_to_tx - sent_msgs; } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); if (packetsize == KNET_MAX_PACKET_SIZE) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } /* For sorting the node list into order */ static int node_compare(const void *aptr, const void *bptr) { uint16_t a,b; a = *(uint16_t *)aptr; b = *(uint16_t *)bptr; return a > b; } static void display_stats(int level) { struct knet_handle_stats handle_stats; struct knet_link_status link_status; struct knet_link_stats total_link_stats; knet_node_id_t host_list[KNET_MAX_HOST]; uint8_t link_list[KNET_MAX_LINK]; int res; unsigned int i,j; size_t num_hosts, num_links; res = knet_handle_get_stats(knet_h, &handle_stats, sizeof(handle_stats)); if (res) { - perror("Failed to get knet handle stats"); + perror("[info]: failed to get knet handle stats"); return; } if (compresscfg || cryptocfg) { printf("\n"); - printf("handle stats\n"); - printf("------------\n"); + printf("[stat]: handle stats\n"); + printf("[stat]: ------------\n"); if (compresscfg) { - printf(" tx_uncompressed_packets: %" PRIu64 "\n", handle_stats.tx_uncompressed_packets); - printf(" tx_compressed_packets: %" PRIu64 "\n", handle_stats.tx_compressed_packets); - printf(" tx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_original_bytes); - printf(" tx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_size_bytes ); - printf(" tx_compress_time_ave: %" PRIu64 "\n", handle_stats.tx_compress_time_ave); - printf(" tx_compress_time_min: %" PRIu64 "\n", handle_stats.tx_compress_time_min); - printf(" tx_compress_time_max: %" PRIu64 "\n", handle_stats.tx_compress_time_max); - printf(" rx_compressed_packets: %" PRIu64 "\n", handle_stats.rx_compressed_packets); - printf(" rx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_original_bytes); - printf(" rx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_size_bytes); - printf(" rx_compress_time_ave: %" PRIu64 "\n", handle_stats.rx_compress_time_ave); - printf(" rx_compress_time_min: %" PRIu64 "\n", handle_stats.rx_compress_time_min); - printf(" rx_compress_time_max: %" PRIu64 "\n", handle_stats.rx_compress_time_max); + printf("[stat]: tx_uncompressed_packets: %" PRIu64 "\n", handle_stats.tx_uncompressed_packets); + printf("[stat]: tx_compressed_packets: %" PRIu64 "\n", handle_stats.tx_compressed_packets); + printf("[stat]: tx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_original_bytes); + printf("[stat]: tx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_size_bytes ); + printf("[stat]: tx_compress_time_ave: %" PRIu64 "\n", handle_stats.tx_compress_time_ave); + printf("[stat]: tx_compress_time_min: %" PRIu64 "\n", handle_stats.tx_compress_time_min); + printf("[stat]: tx_compress_time_max: %" PRIu64 "\n", handle_stats.tx_compress_time_max); + printf("[stat]: rx_compressed_packets: %" PRIu64 "\n", handle_stats.rx_compressed_packets); + printf("[stat]: rx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_original_bytes); + printf("[stat]: rx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_size_bytes); + printf("[stat]: rx_compress_time_ave: %" PRIu64 "\n", handle_stats.rx_compress_time_ave); + printf("[stat]: rx_compress_time_min: %" PRIu64 "\n", handle_stats.rx_compress_time_min); + printf("[stat]: rx_compress_time_max: %" PRIu64 "\n", handle_stats.rx_compress_time_max); printf("\n"); } if (cryptocfg) { - printf(" tx_crypt_packets: %" PRIu64 "\n", handle_stats.tx_crypt_packets); - printf(" tx_crypt_byte_overhead: %" PRIu64 "\n", handle_stats.tx_crypt_byte_overhead); - printf(" tx_crypt_time_ave: %" PRIu64 "\n", handle_stats.tx_crypt_time_ave); - printf(" tx_crypt_time_min: %" PRIu64 "\n", handle_stats.tx_crypt_time_min); - printf(" tx_crypt_time_max: %" PRIu64 "\n", handle_stats.tx_crypt_time_max); - printf(" rx_crypt_packets: %" PRIu64 "\n", handle_stats.rx_crypt_packets); - printf(" rx_crypt_time_ave: %" PRIu64 "\n", handle_stats.rx_crypt_time_ave); - printf(" rx_crypt_time_min: %" PRIu64 "\n", handle_stats.rx_crypt_time_min); - printf(" rx_crypt_time_max: %" PRIu64 "\n", handle_stats.rx_crypt_time_max); + printf("[stat]: tx_crypt_packets: %" PRIu64 "\n", handle_stats.tx_crypt_packets); + printf("[stat]: tx_crypt_byte_overhead: %" PRIu64 "\n", handle_stats.tx_crypt_byte_overhead); + printf("[stat]: tx_crypt_time_ave: %" PRIu64 "\n", handle_stats.tx_crypt_time_ave); + printf("[stat]: tx_crypt_time_min: %" PRIu64 "\n", handle_stats.tx_crypt_time_min); + printf("[stat]: tx_crypt_time_max: %" PRIu64 "\n", handle_stats.tx_crypt_time_max); + printf("[stat]: rx_crypt_packets: %" PRIu64 "\n", handle_stats.rx_crypt_packets); + printf("[stat]: rx_crypt_time_ave: %" PRIu64 "\n", handle_stats.rx_crypt_time_ave); + printf("[stat]: rx_crypt_time_min: %" PRIu64 "\n", handle_stats.rx_crypt_time_min); + printf("[stat]: rx_crypt_time_max: %" PRIu64 "\n", handle_stats.rx_crypt_time_max); printf("\n"); } } if (level < 2) { return; } memset(&total_link_stats, 0, sizeof(struct knet_link_stats)); res = knet_host_get_host_list(knet_h, host_list, &num_hosts); if (res) { - perror("Cannot get host list for stats"); + perror("[info]: cannot get host list for stats"); return; } /* Print in host ID order */ qsort(host_list, num_hosts, sizeof(uint16_t), node_compare); for (j=0; j 2) { printf("\n"); - printf("Node %d Link %d\n", host_list[j], link_list[i]); - - printf(" tx_data_packets: %" PRIu64 "\n", link_status.stats.tx_data_packets); - printf(" rx_data_packets: %" PRIu64 "\n", link_status.stats.rx_data_packets); - printf(" tx_data_bytes: %" PRIu64 "\n", link_status.stats.tx_data_bytes); - printf(" rx_data_bytes: %" PRIu64 "\n", link_status.stats.rx_data_bytes); - printf(" rx_ping_packets: %" PRIu64 "\n", link_status.stats.rx_ping_packets); - printf(" tx_ping_packets: %" PRIu64 "\n", link_status.stats.tx_ping_packets); - printf(" rx_ping_bytes: %" PRIu64 "\n", link_status.stats.rx_ping_bytes); - printf(" tx_ping_bytes: %" PRIu64 "\n", link_status.stats.tx_ping_bytes); - printf(" rx_pong_packets: %" PRIu64 "\n", link_status.stats.rx_pong_packets); - printf(" tx_pong_packets: %" PRIu64 "\n", link_status.stats.tx_pong_packets); - printf(" rx_pong_bytes: %" PRIu64 "\n", link_status.stats.rx_pong_bytes); - printf(" tx_pong_bytes: %" PRIu64 "\n", link_status.stats.tx_pong_bytes); - printf(" rx_pmtu_packets: %" PRIu64 "\n", link_status.stats.rx_pmtu_packets); - printf(" tx_pmtu_packets: %" PRIu64 "\n", link_status.stats.tx_pmtu_packets); - printf(" rx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.rx_pmtu_bytes); - printf(" tx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.tx_pmtu_bytes); - - printf(" tx_total_packets: %" PRIu64 "\n", link_status.stats.tx_total_packets); - printf(" rx_total_packets: %" PRIu64 "\n", link_status.stats.rx_total_packets); - printf(" tx_total_bytes: %" PRIu64 "\n", link_status.stats.tx_total_bytes); - printf(" rx_total_bytes: %" PRIu64 "\n", link_status.stats.rx_total_bytes); - printf(" tx_total_errors: %" PRIu64 "\n", link_status.stats.tx_total_errors); - printf(" tx_total_retries: %" PRIu64 "\n", link_status.stats.tx_total_retries); - - printf(" tx_pmtu_errors: %" PRIu32 "\n", link_status.stats.tx_pmtu_errors); - printf(" tx_pmtu_retries: %" PRIu32 "\n", link_status.stats.tx_pmtu_retries); - printf(" tx_ping_errors: %" PRIu32 "\n", link_status.stats.tx_ping_errors); - printf(" tx_ping_retries: %" PRIu32 "\n", link_status.stats.tx_ping_retries); - printf(" tx_pong_errors: %" PRIu32 "\n", link_status.stats.tx_pong_errors); - printf(" tx_pong_retries: %" PRIu32 "\n", link_status.stats.tx_pong_retries); - printf(" tx_data_errors: %" PRIu32 "\n", link_status.stats.tx_data_errors); - printf(" tx_data_retries: %" PRIu32 "\n", link_status.stats.tx_data_retries); - - printf(" latency_min: %" PRIu32 "\n", link_status.stats.latency_min); - printf(" latency_max: %" PRIu32 "\n", link_status.stats.latency_max); - printf(" latency_ave: %" PRIu32 "\n", link_status.stats.latency_ave); - printf(" latency_samples: %" PRIu32 "\n", link_status.stats.latency_samples); - - printf(" down_count: %" PRIu32 "\n", link_status.stats.down_count); - printf(" up_count: %" PRIu32 "\n", link_status.stats.up_count); + printf("[stat]: Node %d Link %d\n", host_list[j], link_list[i]); + + printf("[stat]: tx_data_packets: %" PRIu64 "\n", link_status.stats.tx_data_packets); + printf("[stat]: rx_data_packets: %" PRIu64 "\n", link_status.stats.rx_data_packets); + printf("[stat]: tx_data_bytes: %" PRIu64 "\n", link_status.stats.tx_data_bytes); + printf("[stat]: rx_data_bytes: %" PRIu64 "\n", link_status.stats.rx_data_bytes); + printf("[stat]: rx_ping_packets: %" PRIu64 "\n", link_status.stats.rx_ping_packets); + printf("[stat]: tx_ping_packets: %" PRIu64 "\n", link_status.stats.tx_ping_packets); + printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", link_status.stats.rx_ping_bytes); + printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", link_status.stats.tx_ping_bytes); + printf("[stat]: rx_pong_packets: %" PRIu64 "\n", link_status.stats.rx_pong_packets); + printf("[stat]: tx_pong_packets: %" PRIu64 "\n", link_status.stats.tx_pong_packets); + printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", link_status.stats.rx_pong_bytes); + printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", link_status.stats.tx_pong_bytes); + printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", link_status.stats.rx_pmtu_packets); + printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", link_status.stats.tx_pmtu_packets); + printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.rx_pmtu_bytes); + printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.tx_pmtu_bytes); + + printf("[stat]: tx_total_packets: %" PRIu64 "\n", link_status.stats.tx_total_packets); + printf("[stat]: rx_total_packets: %" PRIu64 "\n", link_status.stats.rx_total_packets); + printf("[stat]: tx_total_bytes: %" PRIu64 "\n", link_status.stats.tx_total_bytes); + printf("[stat]: rx_total_bytes: %" PRIu64 "\n", link_status.stats.rx_total_bytes); + printf("[stat]: tx_total_errors: %" PRIu64 "\n", link_status.stats.tx_total_errors); + printf("[stat]: tx_total_retries: %" PRIu64 "\n", link_status.stats.tx_total_retries); + + printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", link_status.stats.tx_pmtu_errors); + printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", link_status.stats.tx_pmtu_retries); + printf("[stat]: tx_ping_errors: %" PRIu32 "\n", link_status.stats.tx_ping_errors); + printf("[stat]: tx_ping_retries: %" PRIu32 "\n", link_status.stats.tx_ping_retries); + printf("[stat]: tx_pong_errors: %" PRIu32 "\n", link_status.stats.tx_pong_errors); + printf("[stat]: tx_pong_retries: %" PRIu32 "\n", link_status.stats.tx_pong_retries); + printf("[stat]: tx_data_errors: %" PRIu32 "\n", link_status.stats.tx_data_errors); + printf("[stat]: tx_data_retries: %" PRIu32 "\n", link_status.stats.tx_data_retries); + + printf("[stat]: latency_min: %" PRIu32 "\n", link_status.stats.latency_min); + printf("[stat]: latency_max: %" PRIu32 "\n", link_status.stats.latency_max); + printf("[stat]: latency_ave: %" PRIu32 "\n", link_status.stats.latency_ave); + printf("[stat]: latency_samples: %" PRIu32 "\n", link_status.stats.latency_samples); + + printf("[stat]: down_count: %" PRIu32 "\n", link_status.stats.down_count); + printf("[stat]: up_count: %" PRIu32 "\n", link_status.stats.up_count); } } } printf("\n"); - printf("Total link stats\n"); - printf("----------------\n"); - printf("tx_data_packets: %" PRIu64 "\n", total_link_stats.tx_data_packets); - printf("rx_data_packets: %" PRIu64 "\n", total_link_stats.rx_data_packets); - printf("tx_data_bytes: %" PRIu64 "\n", total_link_stats.tx_data_bytes); - printf("rx_data_bytes: %" PRIu64 "\n", total_link_stats.rx_data_bytes); - printf("rx_ping_packets: %" PRIu64 "\n", total_link_stats.rx_ping_packets); - printf("tx_ping_packets: %" PRIu64 "\n", total_link_stats.tx_ping_packets); - printf("rx_ping_bytes: %" PRIu64 "\n", total_link_stats.rx_ping_bytes); - printf("tx_ping_bytes: %" PRIu64 "\n", total_link_stats.tx_ping_bytes); - printf("rx_pong_packets: %" PRIu64 "\n", total_link_stats.rx_pong_packets); - printf("tx_pong_packets: %" PRIu64 "\n", total_link_stats.tx_pong_packets); - printf("rx_pong_bytes: %" PRIu64 "\n", total_link_stats.rx_pong_bytes); - printf("tx_pong_bytes: %" PRIu64 "\n", total_link_stats.tx_pong_bytes); - printf("rx_pmtu_packets: %" PRIu64 "\n", total_link_stats.rx_pmtu_packets); - printf("tx_pmtu_packets: %" PRIu64 "\n", total_link_stats.tx_pmtu_packets); - printf("rx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.rx_pmtu_bytes); - printf("tx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.tx_pmtu_bytes); - - printf("tx_total_packets: %" PRIu64 "\n", total_link_stats.tx_total_packets); - printf("rx_total_packets: %" PRIu64 "\n", total_link_stats.rx_total_packets); - printf("tx_total_bytes: %" PRIu64 "\n", total_link_stats.tx_total_bytes); - printf("rx_total_bytes: %" PRIu64 "\n", total_link_stats.rx_total_bytes); - printf("tx_total_errors: %" PRIu64 "\n", total_link_stats.tx_total_errors); - printf("tx_total_retries: %" PRIu64 "\n", total_link_stats.tx_total_retries); - - printf("tx_pmtu_errors: %" PRIu32 "\n", total_link_stats.tx_pmtu_errors); - printf("tx_pmtu_retries: %" PRIu32 "\n", total_link_stats.tx_pmtu_retries); - printf("tx_ping_errors: %" PRIu32 "\n", total_link_stats.tx_ping_errors); - printf("tx_ping_retries: %" PRIu32 "\n", total_link_stats.tx_ping_retries); - printf("tx_pong_errors: %" PRIu32 "\n", total_link_stats.tx_pong_errors); - printf("tx_pong_retries: %" PRIu32 "\n", total_link_stats.tx_pong_retries); - printf("tx_data_errors: %" PRIu32 "\n", total_link_stats.tx_data_errors); - printf("tx_data_retries: %" PRIu32 "\n", total_link_stats.tx_data_retries); - - printf("down_count: %" PRIu32 "\n", total_link_stats.down_count); - printf("up_count: %" PRIu32 "\n", total_link_stats.up_count); + printf("[stat]: Total link stats\n"); + printf("[stat]: ----------------\n"); + printf("[stat]: tx_data_packets: %" PRIu64 "\n", total_link_stats.tx_data_packets); + printf("[stat]: rx_data_packets: %" PRIu64 "\n", total_link_stats.rx_data_packets); + printf("[stat]: tx_data_bytes: %" PRIu64 "\n", total_link_stats.tx_data_bytes); + printf("[stat]: rx_data_bytes: %" PRIu64 "\n", total_link_stats.rx_data_bytes); + printf("[stat]: rx_ping_packets: %" PRIu64 "\n", total_link_stats.rx_ping_packets); + printf("[stat]: tx_ping_packets: %" PRIu64 "\n", total_link_stats.tx_ping_packets); + printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", total_link_stats.rx_ping_bytes); + printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", total_link_stats.tx_ping_bytes); + printf("[stat]: rx_pong_packets: %" PRIu64 "\n", total_link_stats.rx_pong_packets); + printf("[stat]: tx_pong_packets: %" PRIu64 "\n", total_link_stats.tx_pong_packets); + printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", total_link_stats.rx_pong_bytes); + printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", total_link_stats.tx_pong_bytes); + printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", total_link_stats.rx_pmtu_packets); + printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", total_link_stats.tx_pmtu_packets); + printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.rx_pmtu_bytes); + printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.tx_pmtu_bytes); + + printf("[stat]: tx_total_packets: %" PRIu64 "\n", total_link_stats.tx_total_packets); + printf("[stat]: rx_total_packets: %" PRIu64 "\n", total_link_stats.rx_total_packets); + printf("[stat]: tx_total_bytes: %" PRIu64 "\n", total_link_stats.tx_total_bytes); + printf("[stat]: rx_total_bytes: %" PRIu64 "\n", total_link_stats.rx_total_bytes); + printf("[stat]: tx_total_errors: %" PRIu64 "\n", total_link_stats.tx_total_errors); + printf("[stat]: tx_total_retries: %" PRIu64 "\n", total_link_stats.tx_total_retries); + + printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", total_link_stats.tx_pmtu_errors); + printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", total_link_stats.tx_pmtu_retries); + printf("[stat]: tx_ping_errors: %" PRIu32 "\n", total_link_stats.tx_ping_errors); + printf("[stat]: tx_ping_retries: %" PRIu32 "\n", total_link_stats.tx_ping_retries); + printf("[stat]: tx_pong_errors: %" PRIu32 "\n", total_link_stats.tx_pong_errors); + printf("[stat]: tx_pong_retries: %" PRIu32 "\n", total_link_stats.tx_pong_retries); + printf("[stat]: tx_data_errors: %" PRIu32 "\n", total_link_stats.tx_data_errors); + printf("[stat]: tx_data_retries: %" PRIu32 "\n", total_link_stats.tx_data_retries); + + printf("[stat]: down_count: %" PRIu32 "\n", total_link_stats.down_count); + printf("[stat]: up_count: %" PRIu32 "\n", total_link_stats.up_count); } static void send_perf_data_by_time(void) { char *tx_buf[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_out[PCKT_FRAG_MAX]; char ctrl_message[16]; int sent_msgs; int i; - uint32_t packetsize = 65536; + uint32_t packetsize = 64; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; setup_send_buffers_common(msg, iov_out, tx_buf); memset(&clock_start, 0, sizeof(clock_start)); memset(&clock_end, 0, sizeof(clock_start)); while (packetsize <= KNET_MAX_PACKET_SIZE) { for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } - printf("Testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs); + printf("[info]: testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs); memset(ctrl_message, 0, sizeof(ctrl_message)); knet_send(knet_h, ctrl_message, TEST_START, channel); if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) { - printf("Unable to get start time!\n"); + printf("[info]: unable to get start time!\n"); } time_diff = 0; while (time_diff < (perf_by_time_secs * 1000000000llu)) { sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX); if (sent_msgs < 0) { printf("Something went wrong, aborting\n"); exit(FAIL); } if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) { - printf("Unable to get end time!\n"); + printf("[info]: unable to get end time!\n"); } timespec_diff(clock_start, clock_end, &time_diff); } sleep(2); knet_send(knet_h, ctrl_message, TEST_STOP, channel); if (packetsize == KNET_MAX_PACKET_SIZE) { break; } /* * Use a multiplier that can always divide properly a GB * into smaller chunks without worry about boundaries */ packetsize *= 4; if (packetsize > KNET_MAX_PACKET_SIZE) { packetsize = KNET_MAX_PACKET_SIZE; } } knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel); for (i = 0; i < PCKT_FRAG_MAX; i++) { free(tx_buf[i]); } } static void cleanup_all(void) { if (pthread_mutex_lock(&shutdown_mutex)) { return; } if (bench_shutdown_in_progress) { pthread_mutex_unlock(&shutdown_mutex); return; } bench_shutdown_in_progress = 1; pthread_mutex_unlock(&shutdown_mutex); if (rx_thread) { stop_rx_thread(); } knet_handle_stop(knet_h); } static void sigint_handler(int signum) { - printf("Cleaning up... got signal: %d\n", signum); + printf("[info]: cleaning up... got signal: %d\n", signum); cleanup_all(); exit(PASS); } int main(int argc, char *argv[]) { if (signal(SIGINT, sigint_handler) == SIG_ERR) { printf("Unable to configure SIGINT handler\n"); exit(FAIL); } setup_knet(argc, argv); setup_data_txrx_common(); sleep(5); restart: switch(test_type) { default: case TEST_PING: /* basic ping, no data */ sleep(5); break; case TEST_PING_AND_DATA: send_ping_data(); break; case TEST_PERF_BY_SIZE: if (senderid == thisnodeid) { send_perf_data_by_size(); } else { - printf("Waiting for perf rx thread to finish\n"); + printf("[info]: waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; case TEST_PERF_BY_TIME: if (senderid == thisnodeid) { send_perf_data_by_time(); } else { - printf("Waiting for perf rx thread to finish\n"); + printf("[info]: waiting for perf rx thread to finish\n"); while(!wait_for_perf_rx) { sleep(1); } } break; } if (continous) { goto restart; } if (show_stats) { display_stats(show_stats); } cleanup_all(); return PASS; } diff --git a/libknet/tests/test-common.c b/libknet/tests/test-common.c index 36599baf..3f6746b8 100644 --- a/libknet/tests/test-common.c +++ b/libknet/tests/test-common.c @@ -1,513 +1,513 @@ /* * Copyright (C) 2016-2017 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "test-common.h" static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; static int log_init = 0; static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_t log_thread; static int log_thread_init = 0; static int log_fds[2]; struct log_thread_data { int logfd; FILE *std; }; static struct log_thread_data data; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; static int shutdown_in_progress = 0; static int _read_pipe(int fd, char **file, size_t *length) { char buf[4096]; int n; int done = 0; *file = NULL; *length = 0; memset(buf, 0, sizeof(buf)); while (!done) { n = read(fd, buf, sizeof(buf)); if (n < 0) { if (errno == EINTR) continue; if (*file) free(*file); return n; } if (n == 0 && (!*length)) return 0; if (n == 0) done = 1; if (*file) *file = realloc(*file, (*length) + n + done); else *file = malloc(n + done); if (!*file) return -1; memmove((*file) + (*length), buf, n); *length += (done + n); } /* Null terminator */ (*file)[(*length) - 1] = 0; return 0; } int execute_shell(const char *command, char **error_string) { pid_t pid; int status, err = 0; int fd[2]; size_t size = 0; if ((command == NULL) || (!error_string)) { errno = EINVAL; return FAIL; } *error_string = NULL; err = pipe(fd); if (err) goto out_clean; pid = fork(); if (pid < 0) { err = pid; goto out_clean; } if (pid) { /* parent */ close(fd[1]); err = _read_pipe(fd[0], error_string, &size); if (err) goto out_clean0; waitpid(pid, &status, 0); if (!WIFEXITED(status)) { err = -1; goto out_clean0; } if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { err = WEXITSTATUS(status); goto out_clean0; } goto out_clean0; } else { /* child */ close(0); close(1); close(2); close(fd[0]); dup2(fd[1], 1); dup2(fd[1], 2); close(fd[1]); execlp("/bin/sh", "/bin/sh", "-c", command, NULL); exit(FAIL); } out_clean: close(fd[1]); out_clean0: close(fd[0]); return err; } int is_memcheck(void) { char *val; val = getenv("KNETMEMCHECK"); if (val) { if (!strncmp(val, "yes", 3)) { return 1; } } return 0; } int is_helgrind(void) { char *val; val = getenv("KNETHELGRIND"); if (val) { if (!strncmp(val, "yes", 3)) { return 1; } } return 0; } void set_scheduler(int policy) { struct sched_param sched_param; int err; err = sched_get_priority_max(policy); if (err < 0) { printf("Could not get maximum scheduler priority\n"); exit(FAIL); } sched_param.sched_priority = err; err = sched_setscheduler(0, policy, &sched_param); if (err < 0) { printf("Could not set priority\n"); exit(FAIL); } return; } int setup_logpipes(int *logfds) { if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) { printf("Unable to setup logging pipe\n"); exit(FAIL); } return PASS; } void close_logpipes(int *logfds) { close(logfds[0]); logfds[0] = 0; close(logfds[1]); logfds[1] = 0; } void flush_logs(int logfd, FILE *std) { struct knet_log_msg msg; size_t bytes_read; int len; next: len = 0; bytes_read = 0; memset(&msg, 0, sizeof(struct knet_log_msg)); while (bytes_read < sizeof(struct knet_log_msg)) { len = read(logfd, &msg + bytes_read, sizeof(struct knet_log_msg) - bytes_read); if (len <= 0) { return; } bytes_read += len; } if (len > 0) { - fprintf(std, "knet logs: [%s] %s: %s\n", + fprintf(std, "[knet]: [%s] %s: %s\n", knet_log_get_loglevel_name(msg.msglevel), knet_log_get_subsystem_name(msg.subsystem), msg.msg); goto next; } } static void *_logthread(void *args) { fd_set rfds; ssize_t len; struct timeval tv; select_loop: tv.tv_sec = 60; tv.tv_usec = 0; FD_ZERO(&rfds); FD_SET(data.logfd, &rfds); len = select(FD_SETSIZE, &rfds, NULL, NULL, &tv); if (len < 0) { fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n"); return NULL; } if (!len) { - fprintf(data.std, "knet logs: No logs in the last 60 seconds\n"); + fprintf(data.std, "[knet]: No logs in the last 60 seconds\n"); } if (FD_ISSET(data.logfd, &rfds)) { flush_logs(data.logfd, data.std); } goto select_loop; return NULL; } int start_logthread(int logfd, FILE *std) { int savederrno = 0; savederrno = pthread_mutex_lock(&log_thread_mutex); if (savederrno) { printf("Unable to get log_thread mutex lock\n"); return -1; } if (!log_thread_init) { data.logfd = logfd; data.std = std; savederrno = pthread_create(&log_thread, 0, _logthread, NULL); if (savederrno) { printf("Unable to start logging thread: %s\n", strerror(savederrno)); pthread_mutex_unlock(&log_thread_mutex); return -1; } log_thread_init = 1; } pthread_mutex_unlock(&log_thread_mutex); return 0; } int stop_logthread(void) { int savederrno = 0; void *retval; savederrno = pthread_mutex_lock(&log_thread_mutex); if (savederrno) { printf("Unable to get log_thread mutex lock\n"); return -1; } if (log_thread_init) { pthread_cancel(log_thread); pthread_join(log_thread, &retval); log_thread_init = 0; } pthread_mutex_unlock(&log_thread_mutex); return 0; } static void stop_logging(void) { stop_logthread(); flush_logs(log_fds[0], stdout); close_logpipes(log_fds); } int start_logging(FILE *std) { int savederrno = 0; savederrno = pthread_mutex_lock(&log_mutex); if (savederrno) { printf("Unable to get log_mutex lock\n"); return -1; } if (!log_init) { setup_logpipes(log_fds); if (atexit(&stop_logging) != 0) { printf("Unable to register atexit handler to stop logging: %s\n", strerror(errno)); exit(FAIL); } if (start_logthread(log_fds[0], std) < 0) { exit(FAIL); } log_init = 1; } pthread_mutex_unlock(&log_mutex); return log_fds[1]; } knet_handle_t knet_handle_start(int logfds[2], uint8_t log_level) { knet_handle_t knet_h = knet_handle_new(1, logfds[1], log_level); if (knet_h) { return knet_h; } else { int exit_status; if (errno == ENAMETOOLONG) { exit_status = SKIP; } else { exit_status = FAIL; } printf("knet_handle_new failed: %s\n", strerror(errno)); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(exit_status); } } int knet_handle_stop(knet_handle_t knet_h) { int savederrno; size_t i, j; knet_node_id_t host_ids[KNET_MAX_HOST]; uint8_t link_ids[KNET_MAX_LINK]; size_t host_ids_entries = 0, link_ids_entries = 0; struct knet_link_status status; savederrno = pthread_mutex_lock(&shutdown_mutex); if (savederrno) { printf("Unable to get shutdown mutex lock\n"); return -1; } if (shutdown_in_progress) { pthread_mutex_unlock(&shutdown_mutex); errno = EINVAL; return -1; } shutdown_in_progress = 1; pthread_mutex_unlock(&shutdown_mutex); if (!knet_h) { errno = EINVAL; return -1; } if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) { printf("knet_host_get_host_list failed: %s\n", strerror(errno)); return -1; } for (i = 0; i < host_ids_entries; i++) { if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) { printf("knet_link_get_link_list failed: %s\n", strerror(errno)); return -1; } for (j = 0; j < link_ids_entries; j++) { if (knet_link_get_status(knet_h, host_ids[i], link_ids[j], &status, sizeof(struct knet_link_status))) { printf("knet_link_get_status failed: %s\n", strerror(errno)); return -1; } if (status.enabled) { if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); return -1; } } knet_link_clear_config(knet_h, host_ids[i], j); } if (knet_host_remove(knet_h, host_ids[i]) < 0) { printf("knet_host_remove failed: %s\n", strerror(errno)); return -1; } } if (knet_handle_free(knet_h)) { printf("knet_handle_free failed: %s\n", strerror(errno)); return -1; } return 0; } int make_local_sockaddr(struct sockaddr_storage *lo, uint16_t offset) { uint32_t port; char portstr[32]; /* Use the pid if we can. but makes sure its in a sensible range */ port = (uint32_t)getpid() + offset; if (port < 1024) { port += 1024; } if (port > 65536) { port = port & 0xFFFF; } sprintf(portstr, "%u", port); memset(lo, 0, sizeof(struct sockaddr_storage)); printf("Using port %u\n", port); return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage)); } int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std) { int i = 0; if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n"); seconds = seconds * 16; } while (i < seconds) { flush_logs(logfd, std); if (knet_h->host_index[host_id]->status.reachable == 1) { return 0; } printf("waiting host %u to be reachable for %d more seconds\n", host_id, seconds - i); sleep(1); i++; } return -1; } int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd) { fd_set rfds; struct timeval tv; int err = 0; if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_packet timeout\n"); seconds = seconds * 16; } FD_ZERO(&rfds); FD_SET(datafd, &rfds); tv.tv_sec = seconds; tv.tv_usec = 0; err = select(datafd+1, &rfds, NULL, NULL, &tv); if ((err > 0) && (FD_ISSET(datafd, &rfds))) { return 0; } return -1; }