Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3151598
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
22 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/tests/test-common.c b/libknet/tests/test-common.c
index 2729f551..86b76b0c 100644
--- a/libknet/tests/test-common.c
+++ b/libknet/tests/test-common.c
@@ -1,889 +1,952 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <pthread.h>
#include <dirent.h>
#include <sys/select.h>
+#include <sys/poll.h>
#include "libknet.h"
#include "test-common.h"
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
static int log_init = 0;
static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t log_thread;
static int log_thread_init = 0;
static int log_fds[2];
struct log_thread_data {
int logfd;
FILE *std;
};
static struct log_thread_data data;
static char plugin_path[PATH_MAX];
static int _read_pipe(int fd, char **file, size_t *length)
{
char buf[4096];
int n;
int done = 0;
*file = NULL;
*length = 0;
memset(buf, 0, sizeof(buf));
while (!done) {
n = read(fd, buf, sizeof(buf));
if (n < 0) {
if (errno == EINTR)
continue;
if (*file)
free(*file);
return n;
}
if (n == 0 && (!*length))
return 0;
if (n == 0)
done = 1;
if (*file)
*file = realloc(*file, (*length) + n + done);
else
*file = malloc(n + done);
if (!*file)
return -1;
memmove((*file) + (*length), buf, n);
*length += (done + n);
}
/* Null terminator */
(*file)[(*length) - 1] = 0;
return 0;
}
int execute_shell(const char *command, char **error_string)
{
pid_t pid;
int status, err = 0;
int fd[2];
size_t size = 0;
if ((command == NULL) || (!error_string)) {
errno = EINVAL;
return FAIL;
}
*error_string = NULL;
err = pipe(fd);
if (err)
goto out_clean;
pid = fork();
if (pid < 0) {
err = pid;
goto out_clean;
}
if (pid) { /* parent */
close(fd[1]);
err = _read_pipe(fd[0], error_string, &size);
if (err)
goto out_clean0;
waitpid(pid, &status, 0);
if (!WIFEXITED(status)) {
err = -1;
goto out_clean0;
}
if (WIFEXITED(status) && WEXITSTATUS(status) != 0) {
err = WEXITSTATUS(status);
goto out_clean0;
}
goto out_clean0;
} else { /* child */
close(0);
close(1);
close(2);
close(fd[0]);
dup2(fd[1], 1);
dup2(fd[1], 2);
close(fd[1]);
execlp("/bin/sh", "/bin/sh", "-c", command, NULL);
exit(FAIL);
}
out_clean:
close(fd[1]);
out_clean0:
close(fd[0]);
return err;
}
int is_memcheck(void)
{
char *val;
val = getenv("KNETMEMCHECK");
if (val) {
if (!strncmp(val, "yes", 3)) {
return 1;
}
}
return 0;
}
int is_helgrind(void)
{
char *val;
val = getenv("KNETHELGRIND");
if (val) {
if (!strncmp(val, "yes", 3)) {
return 1;
}
}
return 0;
}
void set_scheduler(int policy)
{
struct sched_param sched_param;
int err;
err = sched_get_priority_max(policy);
if (err < 0) {
printf("Could not get maximum scheduler priority\n");
exit(FAIL);
}
sched_param.sched_priority = err;
err = sched_setscheduler(0, policy, &sched_param);
if (err < 0) {
printf("Could not set priority\n");
exit(FAIL);
}
return;
}
int setup_logpipes(int *logfds)
{
if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) {
printf("Unable to setup logging pipe\n");
exit(FAIL);
}
return PASS;
}
void close_logpipes(int *logfds)
{
close(logfds[0]);
logfds[0] = 0;
close(logfds[1]);
logfds[1] = 0;
}
void flush_logs(int logfd, FILE *std)
{
struct knet_log_msg msg;
int len;
while (1) {
len = read(logfd, &msg, sizeof(msg));
if (len != sizeof(msg)) {
/*
* clear errno to avoid incorrect propagation
*/
errno = 0;
return;
}
msg.msg[sizeof(msg.msg) - 1] = 0;
fprintf(std, "[knet]: [%s] %s: %.*s\n",
knet_log_get_loglevel_name(msg.msglevel),
knet_log_get_subsystem_name(msg.subsystem),
KNET_MAX_LOG_MSG_SIZE, msg.msg);
}
}
static void *_logthread(void *args)
{
while (1) {
int num;
struct timeval tv = { 60, 0 };
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(data.logfd, &rfds);
num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
if (num < 0) {
fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n");
return NULL;
}
if (num == 0) {
fprintf(data.std, "[knet]: No logs in the last 60 seconds\n");
continue;
}
if (FD_ISSET(data.logfd, &rfds)) {
flush_logs(data.logfd, data.std);
}
}
}
int start_logthread(int logfd, FILE *std)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&log_thread_mutex);
if (savederrno) {
printf("Unable to get log_thread mutex lock\n");
return -1;
}
if (!log_thread_init) {
data.logfd = logfd;
data.std = std;
savederrno = pthread_create(&log_thread, 0, _logthread, NULL);
if (savederrno) {
printf("Unable to start logging thread: %s\n", strerror(savederrno));
pthread_mutex_unlock(&log_thread_mutex);
return -1;
}
log_thread_init = 1;
}
pthread_mutex_unlock(&log_thread_mutex);
return 0;
}
int stop_logthread(void)
{
int savederrno = 0;
void *retval;
savederrno = pthread_mutex_lock(&log_thread_mutex);
if (savederrno) {
printf("Unable to get log_thread mutex lock\n");
return -1;
}
if (log_thread_init) {
pthread_cancel(log_thread);
pthread_join(log_thread, &retval);
log_thread_init = 0;
}
pthread_mutex_unlock(&log_thread_mutex);
return 0;
}
static void stop_logging(void)
{
stop_logthread();
flush_logs(log_fds[0], stdout);
close_logpipes(log_fds);
}
int start_logging(FILE *std)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&log_mutex);
if (savederrno) {
printf("Unable to get log_mutex lock\n");
return -1;
}
if (!log_init) {
setup_logpipes(log_fds);
if (atexit(&stop_logging) != 0) {
printf("Unable to register atexit handler to stop logging: %s\n",
strerror(errno));
exit(FAIL);
}
if (start_logthread(log_fds[0], std) < 0) {
exit(FAIL);
}
log_init = 1;
}
pthread_mutex_unlock(&log_mutex);
return log_fds[1];
}
static int dir_filter(const struct dirent *dname)
{
if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) &&
((strncmp(dname->d_name,"crypto", 6) == 0) ||
(strncmp(dname->d_name,"compress", 8) == 0))) {
return 1;
}
return 0;
}
/* Make sure the proposed plugin path has at least 1 of each plugin available
- just as a sanity check really */
static int contains_plugins(char *path)
{
struct dirent **namelist;
int n,i;
size_t j;
struct knet_compress_info compress_list[256];
struct knet_crypto_info crypto_list[256];
size_t num_compress, num_crypto;
size_t compress_found = 0;
size_t crypto_found = 0;
if (knet_get_compress_list(compress_list, &num_compress) == -1) {
return 0;
}
if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) {
return 0;
}
n = scandir(path, &namelist, dir_filter, alphasort);
if (n == -1) {
return 0;
}
/* Look for plugins in the list */
for (i=0; i<n; i++) {
for (j=0; j<num_crypto; j++) {
if (strlen(namelist[i]->d_name) >= 7 &&
strncmp(crypto_list[j].name, namelist[i]->d_name+7,
strlen(crypto_list[j].name)) == 0) {
crypto_found++;
}
}
for (j=0; j<num_compress; j++) {
if (strlen(namelist[i]->d_name) >= 9 &&
strncmp(compress_list[j].name, namelist[i]->d_name+9,
strlen(compress_list[j].name)) == 0) {
compress_found++;
}
}
free(namelist[i]);
}
free(namelist);
/* If at least one plugin was found (or none were built) */
if ((crypto_found || num_crypto == 0) &&
(compress_found || num_compress == 0)) {
return 1;
} else {
return 0;
}
}
/* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */
static char *find_plugins_path(void)
{
char *ld_libs_env = getenv("LD_LIBRARY_PATH");
if (ld_libs_env) {
char *ld_libs = strdup(ld_libs_env);
char *str = strtok(ld_libs, ":");
while (str) {
if (contains_plugins(str)) {
strncpy(plugin_path, str, sizeof(plugin_path)-1);
free(ld_libs);
printf("Using plugins from %s\n", plugin_path);
return plugin_path;
}
str = strtok(NULL, ":");
}
free(ld_libs);
}
return NULL;
}
knet_handle_t knet_handle_start(int logfds[2], uint8_t log_level)
{
knet_handle_t knet_h = knet_handle_new_ex(1, logfds[1], log_level, 0);
char *plugins_path;
if (knet_h) {
printf("knet_handle_new at %p\n", knet_h);
plugins_path = find_plugins_path();
/* Use plugins from the build tree */
if (plugins_path) {
knet_h->plugin_path = plugins_path;
}
return knet_h;
} else {
printf("knet_handle_new failed: %s\n", strerror(errno));
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
}
int knet_handle_stop(knet_handle_t knet_h)
{
size_t i, j;
knet_node_id_t host_ids[KNET_MAX_HOST];
uint8_t link_ids[KNET_MAX_LINK];
size_t host_ids_entries = 0, link_ids_entries = 0;
unsigned int enabled;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (knet_handle_setfwd(knet_h, 0) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
return -1;
}
if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
printf("knet_host_get_host_list failed: %s\n", strerror(errno));
return -1;
}
for (i = 0; i < host_ids_entries; i++) {
if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
printf("knet_link_get_link_list failed: %s\n", strerror(errno));
return -1;
}
for (j = 0; j < link_ids_entries; j++) {
if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
printf("knet_link_get_enable failed: %s\n", strerror(errno));
return -1;
}
if (enabled) {
if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) {
printf("knet_link_set_enable failed: %s\n", strerror(errno));
return -1;
}
}
printf("clearing config for: %p host: %u link: %zu\n", knet_h, host_ids[i], j);
knet_link_clear_config(knet_h, host_ids[i], j);
}
if (knet_host_remove(knet_h, host_ids[i]) < 0) {
printf("knet_host_remove failed: %s\n", strerror(errno));
return -1;
}
}
if (knet_handle_free(knet_h)) {
printf("knet_handle_free failed: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family)
{
in_port_t port;
char portstr[32];
if (offset < 0) {
/*
* api_knet_link_set_config needs to access the API directly, but
* it does not send any traffic, so it´s safe to ask the kernel
* for a random port.
*/
port = 0;
} else {
/* Use the pid if we can. but makes sure its in a sensible range */
port = (getpid() + offset) % (65536-1024) + 1024;
}
sprintf(portstr, "%u", port);
memset(lo, 0, sizeof(struct sockaddr_storage));
printf("Using port %u\n", port);
if (family == AF_INET6) {
return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
}
return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
}
int make_local_sockaddr(struct sockaddr_storage *lo, int offset)
{
return _make_local_sockaddr(lo, offset, AF_INET);
}
int make_local_sockaddr6(struct sockaddr_storage *lo, int offset)
{
return _make_local_sockaddr(lo, offset, AF_INET6);
}
int _knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport, uint64_t flags, int family, int dynamic,
struct sockaddr_storage *lo)
{
int err = 0, savederrno = 0;
uint32_t port;
char portstr[32];
for (port = 1025; port < 65536; port++) {
sprintf(portstr, "%u", port);
memset(lo, 0, sizeof(struct sockaddr_storage));
if (family == AF_INET6) {
err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
} else {
err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
}
if (err < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
goto out;
}
errno = 0;
if (dynamic) {
err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags);
} else {
err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags);
}
savederrno = errno;
if ((err < 0) && (savederrno != EADDRINUSE)) {
printf("Unable to configure link: %s\n", strerror(savederrno));
goto out;
}
if (!err) {
printf("Using port %u\n", port);
goto out;
}
}
if (err) {
printf("No more ports available\n");
}
out:
errno = savederrno;
return err;
}
void test_sleep(knet_handle_t knet_h, int seconds)
{
if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting sleep timers\n");
seconds = seconds * 16;
}
sleep(seconds);
}
int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd, FILE *std)
{
fd_set rfds;
struct timeval tv;
int err = 0, i = 0;
if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting wait_for_packet timeout\n");
seconds = seconds * 16;
}
try_again:
FD_ZERO(&rfds);
FD_SET(datafd, &rfds);
tv.tv_sec = 1;
tv.tv_usec = 0;
err = select(datafd+1, &rfds, NULL, NULL, &tv);
/*
* on slow arches the first call to select can return 0.
* pick an arbitrary 10 times loop (multiplied by waiting seconds)
* before failing.
*/
if ((!err) && (i < seconds)) {
flush_logs(logfd, std);
i++;
goto try_again;
}
if ((err > 0) && (FD_ISSET(datafd, &rfds))) {
return 0;
}
errno = ETIMEDOUT;
return -1;
}
/*
* functional tests helpers
*/
void knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfds[2], uint8_t log_level)
{
uint8_t i;
char *plugins_path = find_plugins_path();
for (i = 1; i <= numnodes; i++) {
knet_h[i] = knet_handle_new_ex(i, logfds[1], log_level, 0);
if (!knet_h[i]) {
printf("failed to create handle: %s\n", strerror(errno));
break;
} else {
printf("knet_h[%u] at %p\n", i, knet_h[i]);
}
/* Use plugins from the build tree */
if (plugins_path) {
knet_h[i]->plugin_path = plugins_path;
}
}
if (i < numnodes) {
knet_handle_stop_nodes(knet_h, i);
exit(FAIL);
}
return;
}
void knet_handle_stop_nodes(knet_handle_t knet_h[], uint8_t numnodes)
{
uint8_t i;
for (i = 1; i <= numnodes; i++) {
if (knet_h[i]) {
printf("stopping handle %u at %p\n", i, knet_h[i]);
knet_handle_stop(knet_h[i]);
}
}
return;
}
void knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport)
{
uint8_t i, x, j;
struct sockaddr_storage src, dst;
int offset = 0;
int res;
for (i = 1; i <= numnodes; i++) {
for (j = 1; j <= numnodes; j++) {
/*
* don´t connect to itself
*/
if (j == i) {
continue;
}
printf("host %u adding host: %u\n", i, j);
if (knet_host_add(knet_h[i], j) < 0) {
printf("Unable to add host: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
for (x = 0; x < numlinks; x++) {
res = -1;
offset = 0;
while (i + x + offset++ < 65535 && res != 0) {
if (_make_local_sockaddr(&src, i + x + offset, family) < 0) {
printf("Unable to convert src to sockaddr: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
if (_make_local_sockaddr(&dst, j + x + offset, family) < 0) {
printf("Unable to convert dst to sockaddr: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
res = knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0);
}
printf("joining node %u with node %u via link %u src offset: %u dst offset: %u\n", i, j, x, i+x, j+x);
if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) {
printf("unable to enable link: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
}
}
}
for (i = 1; i <= numnodes; i++) {
wait_for_nodes_state(knet_h[i], numnodes, 1, 600, knet_h[1]->logfd, stdout);
}
return;
}
static int target=0;
-static pthread_mutex_t wait_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t wait_cond = PTHREAD_COND_INITIALIZER;
+
+static int state_wait_pipe[2] = {0,0};
+static int host_wait_pipe[2] = {0,0};
static int count_nodes(knet_handle_t knet_h)
{
int nodes = 0;
int i;
for (i=0; i< KNET_MAX_HOST; i++) {
if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) {
nodes++;
}
}
return nodes;
}
static void nodes_notify_callback(void *private_data,
knet_node_id_t host_id,
uint8_t reachable, uint8_t remote, uint8_t external)
{
knet_handle_t knet_h = (knet_handle_t) private_data;
int nodes;
+ int res;
nodes = count_nodes(knet_h);
if (nodes == target) {
- pthread_cond_signal(&wait_cond);
+ res = write(state_wait_pipe[1], ".", 1);
+ if (res != 1) {
+ printf("***FAILed to signal wait_for_nodes_state: %s\n", strerror(errno));
+ }
+ }
+}
+
+/* Called atexit() */
+static void finish_state_pipes()
+{
+ if (state_wait_pipe[0] != 0) {
+ close(state_wait_pipe[0]);
+ close(state_wait_pipe[1]);
+ state_wait_pipe[0] = 0;
+ }
+ if (host_wait_pipe[0] != 0) {
+ close(host_wait_pipe[0]);
+ close(host_wait_pipe[1]);
+ host_wait_pipe[0] = 0;
}
}
static void host_notify_callback(void *private_data,
knet_node_id_t host_id,
uint8_t reachable, uint8_t remote, uint8_t external)
{
knet_handle_t knet_h = (knet_handle_t) private_data;
+ int res;
if (knet_h->host_index[host_id]->status.reachable == 1) {
- pthread_cond_signal(&wait_cond);
+ res = write(host_wait_pipe[1], ".", 1);
+ if (res != 1) {
+ printf("***FAILed to signal wait_for_host: %s\n", strerror(errno));
+ }
+ }
+}
+
+static int wait_for_reply(int seconds, int pipefd)
+{
+ int res;
+ struct pollfd pfds;
+ char tmpbuf[32];
+
+ pfds.fd = pipefd;
+ pfds.events = POLLIN | POLLERR | POLLHUP;
+ pfds.revents = 0;
+
+ res = poll(&pfds, 1, seconds*1000);
+ if (res == 1) {
+ if (pfds.revents & POLLIN) {
+ res = read(pipefd, tmpbuf, sizeof(tmpbuf));
+ if (res > 0) {
+ return 0;
+ }
+ } else {
+ printf("Error on pipe poll revent = 0x%x\n", pfds.revents);
+ errno = EIO;
+ }
}
+ if (res == 0) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ return -1;
}
/* Wait for a cluster of 'numnodes' to come up/go down */
int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
uint8_t state, uint32_t timeout,
int logfd, FILE *std)
{
- struct timespec ts;
int res, savederrno = 0;
+ if (state_wait_pipe[0] == 0) {
+ res = pipe(state_wait_pipe);
+ if (res == -1) {
+ savederrno = errno;
+ printf("Error creating host reply pipe: %s\n", strerror(errno));
+ errno = savederrno;
+ return -1;
+ }
+ if (atexit(finish_state_pipes)) {
+ printf("Unable to register atexit handler to close pipes: %s\n",
+ strerror(errno));
+ exit(FAIL);
+ }
+
+ }
+
if (state) {
target = numnodes-1; /* exclude us */
} else {
target = 0; /* Wait for all to go down */
}
/* Set this before checking existing status or there's a race condition */
knet_host_enable_status_change_notify(knet_h,
(void *)(long)knet_h,
nodes_notify_callback);
/* Check we haven't already got all the nodes in the correct state */
if (count_nodes(knet_h) == target) {
fprintf(stderr, "target already reached\n");
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
return 0;
}
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += timeout;
- if (pthread_mutex_lock(&wait_mutex)) {
- fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno));
- return -1;
- }
-
- res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts);
- if (res != 0 && res != ETIMEDOUT) {
- fprintf(stderr, "pthread_cond_timedwait fatal error\n");
- errno = res;
- return -1;
- }
- if (res == ETIMEDOUT) {
- fprintf(stderr, "Timed-out\n");
- savederrno = ETIMEDOUT;
- res = -1;
+ res = wait_for_reply(timeout, state_wait_pipe[0]);
+ if (res == -1) {
+ savederrno = errno;
+ printf("Error waiting for nodes status reply: %s\n", strerror(errno));
}
- pthread_mutex_unlock(&wait_mutex);
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
errno = savederrno;
return res;
}
/* Wait for a single node to come up */
int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std)
{
- int res;
- struct timespec ts;
+ int res = 0;
+ int savederrno = 0;
if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n");
seconds = seconds * 16;
}
+ if (host_wait_pipe[0] == 0) {
+ res = pipe(host_wait_pipe);
+ if (res == -1) {
+ savederrno = errno;
+ printf("Error creating host reply pipe: %s\n", strerror(errno));
+ errno = savederrno;
+ return -1;
+ }
+ if (atexit(finish_state_pipes)) {
+ printf("Unable to register atexit handler to close pipes: %s\n",
+ strerror(errno));
+ exit(FAIL);
+ }
+
+ }
+
/* Set this before checking existing status or there's a race condition */
knet_host_enable_status_change_notify(knet_h,
(void *)(long)knet_h,
host_notify_callback);
/* Check it's not already reachable */
if (knet_h->host_index[host_id]->status.reachable == 1) {
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
return 0;
}
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += seconds;
- if (pthread_mutex_lock(&wait_mutex)) {
- fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno));
- return -1;
- }
- res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts);
- if (res == -1 && errno == ETIMEDOUT) {
- fprintf(stderr, "Timed-out\n");
- knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
- pthread_mutex_unlock(&wait_mutex);
- flush_logs(logfd, std);
- return -1;
+ res = wait_for_reply(seconds, host_wait_pipe[0]);
+ if (res == -1) {
+ savederrno = errno;
+ printf("Error waiting for host status reply: %s\n", strerror(errno));
}
- pthread_mutex_unlock(&wait_mutex);
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
/* Still wait for it to settle */
flush_logs(logfd, std);
test_sleep(knet_h, 1);
- return 0;
+ errno = savederrno;
+ return res;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 7:18 AM (1 d, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464028
Default Alt Text
(22 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment