Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/example.conf b/example.conf
index 09cdf6f2..0d6d6327 100644
--- a/example.conf
+++ b/example.conf
@@ -1,40 +1,66 @@
global {
statistics: on
rerouting: on
}
logging {
- debug: on
- to_logfile: yes
- to_syslog: yes
+ debug: off
+ to_logfile: no
+ to_syslog: no
# syslog_facility:
# syslog_priority:
# logfile:
# logfile_priority:
}
node {
nodename: rhel6-node1
nodeid: 1
cnet_ips: 192.168.66.1/24 3ffe::1/64
cnet_mtu: 1500
nodeips: 192.168.2.225
inet:
preup:
up:
down:
postdown:
}
node {
nodename: rhel6-node2
nodeid: 2
cnet_ips: 192.168.66.2/24 3ffe::2/64
cnet_mtu: 1500
nodeips: 192.168.2.226
inet:
preup:
up:
down:
postdown:
}
+
+node {
+ nodename: rhel6-node3
+ nodeid: 3
+ cnet_ips: 192.168.66.2/24 3ffe::2/64
+ cnet_mtu: 1500
+ nodeips: 192.168.2.227
+ inet:
+ preup:
+ up:
+ down:
+ postdown:
+}
+
+node {
+ nodename: rhel6-node2
+ nodeid: 4
+ cnet_ips: 192.168.66.2/24 3ffe::2/64
+ cnet_mtu: 1500
+ nodeips: 192.168.2.228
+ inet:
+ preup:
+ up:
+ down:
+ postdown:
+}
diff --git a/main.c b/main.c
index e227e146..0c9bb216 100644
--- a/main.c
+++ b/main.c
@@ -1,445 +1,547 @@
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sched.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/time.h>
#include <pthread.h>
#include "conf.h"
#include "logging.h"
#include "nodes.h"
#include "controlt.h"
#include "netsocket.h"
#include "utils.h"
#include "cnet.h"
#define LOCKFILE_NAME RUNDIR PACKAGE ".pid"
#define OPTION_STRING "hdfVc:"
int daemonize = 1;
int debug = 0;
int daemon_quit = 0;
char *conffile = NULL;
int statistics = 0;
int rerouting = 0;
int net_sock;
int eth_fd;
char localnet[16]; /* match IFNAMSIZ from linux/if.h */
static pthread_t eth_thread;
+static pthread_t cnet_thread;
struct node *mainconf;
+#define MAX_FDS 1024
+
+static int fd_array[MAX_FDS];
+static int fd_array_count = 0;
+static int fd_highest = 0;
+
+static pthread_mutex_t cnet_mutex;
+
static void print_usage(void)
{
printf("Usage:\n\n");
printf(PACKAGE " [options]\n\n");
printf("Options:\n\n");
printf(" -c <file> Use config file (default "CONFFILE")\n");
printf(" -f Do not fork in background\n");
printf(" -d Enable debugging output\n");
printf(" -h This help\n");
printf(" -V Print program version information\n");
return;
}
static void read_arguments(int argc, char **argv)
{
int cont = 1;
int optchar;
while (cont) {
optchar = getopt(argc, argv, OPTION_STRING);
switch (optchar) {
case 'c':
conffile = strdup(optarg);
break;
case 'd':
debug = 1;
break;
case 'f':
daemonize = 0;
break;
case 'h':
print_usage();
exit(EXIT_SUCCESS);
break;
case 'V':
printf(PACKAGE " " PACKAGE_VERSION " (built " __DATE__
" " __TIME__ ")\n");
exit(EXIT_SUCCESS);
break;
case EOF:
cont = 0;
break;
default:
fprintf(stderr, "unknown option: %c\n", optchar);
print_usage();
exit(EXIT_FAILURE);
break;
}
}
}
static void set_oom_adj(int val)
{
FILE *fp;
fp = fopen("/proc/self/oom_adj", "w");
if (!fp)
return;
fprintf(fp, "%i", val);
fclose(fp);
}
static void set_scheduler(void)
{
struct sched_param sched_param;
int err;
err = sched_get_priority_max(SCHED_RR);
if (err != -1) {
sched_param.sched_priority = err;
err = sched_setscheduler(0, SCHED_RR, &sched_param);
if (err == -1)
logt_print(LOG_WARNING,
"could not set SCHED_RR priority %d err %d",
sched_param.sched_priority, errno);
} else {
logt_print(LOG_WARNING,
"could not get maximum scheduler priority err %d",
errno);
}
}
static void remove_lockfile(void)
{
unlink(LOCKFILE_NAME);
}
static int create_lockfile(const char *lockfile)
{
int fd, value;
size_t bufferlen;
ssize_t write_out;
struct flock lock;
char buffer[50];
if ((fd = open(lockfile, O_CREAT | O_WRONLY,
(S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH))) < 0) {
fprintf(stderr, "Cannot open lockfile [%s], error was [%s]\n",
lockfile, strerror(errno));
return -1;
}
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
retry_fcntl:
if (fcntl(fd, F_SETLK, &lock) < 0) {
switch (errno) {
case EINTR:
goto retry_fcntl;
break;
case EACCES:
case EAGAIN:
fprintf(stderr, "Cannot lock lockfile [%s], error was [%s]\n",
lockfile, strerror(errno));
break;
default:
fprintf(stderr, "process is already running\n");
}
goto fail_close;
}
if (ftruncate(fd, 0) < 0) {
fprintf(stderr, "Cannot truncate pidfile [%s], error was [%s]\n",
lockfile, strerror(errno));
goto fail_close_unlink;
}
memset(buffer, 0, sizeof(buffer));
snprintf(buffer, sizeof(buffer)-1, "%u\n", getpid());
bufferlen = strlen(buffer);
write_out = write(fd, buffer, bufferlen);
if ((write_out < 0) || (write_out == 0 && errno)) {
fprintf(stderr, "Cannot write pid to pidfile [%s], error was [%s]\n",
lockfile, strerror(errno));
goto fail_close_unlink;
}
if ((write_out == 0) || (write_out < bufferlen)) {
fprintf(stderr, "Cannot write pid to pidfile [%s], shortwrite of"
"[%zu] bytes, expected [%zu]\n",
lockfile, write_out, bufferlen);
goto fail_close_unlink;
}
if ((value = fcntl(fd, F_GETFD, 0)) < 0) {
fprintf(stderr, "Cannot get close-on-exec flag from pidfile [%s], "
"error was [%s]\n", lockfile, strerror(errno));
goto fail_close_unlink;
}
value |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, value) < 0) {
fprintf(stderr, "Cannot set close-on-exec flag from pidfile [%s], "
"error was [%s]\n", lockfile, strerror(errno));
goto fail_close_unlink;
}
return 0;
fail_close_unlink:
if (unlink(lockfile))
fprintf(stderr, "Unable to unlink %s\n", lockfile);
fail_close:
if (close(fd))
fprintf(stderr, "Unable to close %s file descriptor\n", lockfile);
return -1;
}
static void sigterm_handler(int sig)
{
daemon_quit = 1;
}
static void sigpipe_handler(int sig)
{
return;
}
static void *eth_to_cnet_thread(void *arg)
{
fd_set rfds;
int se_result;
char read_buf[131072];
ssize_t read_len = 0;
struct timeval tv;
do {
FD_ZERO (&rfds);
FD_SET (eth_fd, &rfds);
tv.tv_sec = 1;
tv.tv_usec = 0;
se_result = select((eth_fd + 1), &rfds, 0, 0, &tv);
if (se_result == -1) {
logt_print(LOG_CRIT, "Unable to select in eth thread: %s\n", strerror(errno));
daemon_quit = 1;
}
if (se_result == 0)
continue;
if (FD_ISSET(eth_fd, &rfds)) {
read_len = read(eth_fd, read_buf, sizeof(read_buf));
if (read_len > 0) {
logt_print(LOG_DEBUG, "Read %zu\n", read_len);
dispatch_buf(mainconf, read_buf, read_len);
} else if (read_len < 0) {
logt_print(LOG_INFO, "Error reading from localnet error: %s\n", strerror(errno));
} else
logt_print(LOG_DEBUG, "Read 0?\n");
}
} while (se_result >= 0 && !daemon_quit);
return NULL;
}
-//static void *cnet_to_eth_thread(void *arg)
-//{
+static void *cnet_to_eth_thread(void *arg)
+{
+ fd_set rfds;
+ int se_result;
+ char read_buf[131072];
+ ssize_t read_len = 0;
+ struct timeval tv;
+ int count;
+ int rv;
+
+ do {
+ FD_ZERO (&rfds);
+
+ pthread_mutex_lock(&cnet_mutex);
+ count = 0;
+ while (count <= fd_array_count) {
+ FD_SET(fd_array[count], &rfds);
+ count++;
+ }
+ pthread_mutex_unlock(&cnet_mutex);
- /* strip and process our internal header here */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
- /* and write starting from read_buf+sizeof(our header) */
-// rv = do_write(eth_fd, read_buf, read_len);
-// close(net_fd);
+ se_result = select((fd_highest + 1), &rfds, 0, 0, &tv);
+ if (se_result == -1) {
+ logt_print(LOG_CRIT, "Unable to select in cnet thread: %s\n", strerror(errno));
+ daemon_quit = 1;
+ }
-//}
+ if (se_result == 0)
+ continue;
+
+ count = 0;
+ while (count <= fd_array_count) {
+ if (FD_ISSET(fd_array[count], &rfds)) {
+ read_len = read(fd_array[count], read_buf, sizeof(read_buf));
+ if (read_len > 0) {
+ logt_print(LOG_DEBUG, "CNET: Read %zu\n", read_len);
+ rv = do_write(eth_fd, read_buf, read_len);
+ } else if (read_len < 0) {
+ logt_print(LOG_INFO, "Error reading from CNET error %d: %s\n", fd_array[count], strerror(errno));
+ } else
+ logt_print(LOG_DEBUG, "Read 0?\n");
+ }
+ count++;
+ }
+ } while (se_result >= 0 && !daemon_quit);
+
+ return NULL;
+}
+
+static void refresh_incoming_fds(struct node *next)
+{
+ int new_fd_array[MAX_FDS];
+ int new_fd_array_count = 0;
+ int new_fd_highest = 0;
+
+ memset(new_fd_array, 0, sizeof(int) * MAX_FDS);
+
+ while (next) {
+ struct conn *conn;
+ conn = next->conn;
+ while (conn) {
+ if (conn->fdin) {
+ logt_print(LOG_DEBUG, "Adding %s fd %d\n", next->nodename, conn->fdin);
+ new_fd_array[new_fd_array_count] = conn->fdin;
+ if (conn->fdin > new_fd_highest)
+ new_fd_highest = conn->fdin;
+
+ new_fd_array_count++;
+ }
+ conn = conn->next;
+ }
+ next = next->next;
+ }
+
+ pthread_mutex_lock(&cnet_mutex);
+ memcpy(fd_array, new_fd_array, sizeof(int) * MAX_FDS);
+ fd_array_count = new_fd_array_count;
+ fd_highest = new_fd_highest;
+ pthread_mutex_unlock(&cnet_mutex);
+
+ return;
+}
static void loop(void) {
int net_sock_new, se_result;
fd_set rfds;
struct timeval tv;
do {
connect_to_nodes(mainconf);
FD_ZERO (&rfds);
FD_SET (net_sock, &rfds);
tv.tv_sec = 1;
tv.tv_usec = 0;
se_result = select((net_sock + 1), &rfds, 0, 0, &tv);
if (daemon_quit)
goto out;
if (se_result == -1) {
logt_print(LOG_CRIT, "Unable to select: %s\n", strerror(errno));
goto out;
}
if (se_result == 0)
continue;
if (FD_ISSET(net_sock, &rfds)) {
struct sockaddr addr;
socklen_t addrlen;
addrlen = sizeof(struct sockaddr);
net_sock_new = accept(net_sock, &addr, &addrlen);
if (net_sock_new < 0) {
logt_print(LOG_INFO, "Error accepting connections on netsocket error: %s\n", strerror(errno));
continue;
}
add_incoming_connection_to_nodes(mainconf, net_sock_new, &addr, addrlen);
- /* create a thread that we can signal to reload the fd entries for read */
+ refresh_incoming_fds(mainconf);
}
out:
if (se_result <0 || daemon_quit)
logt_print(LOG_DEBUG, "End of mail loop\n");
} while (se_result >= 0 && !daemon_quit);
}
int main(int argc, char **argv)
{
confdb_handle_t confdb_handle = 0;
- int rv, eth_thread_started = 1;
+ int rv;
+ int cnet_thread_started = 1, eth_thread_started = 1;
if (create_lockfile(LOCKFILE_NAME) < 0)
exit(EXIT_FAILURE);
atexit(remove_lockfile);
read_arguments(argc, argv);
if (!conffile)
conffile = strdup(CONFFILE);
confdb_handle = readconf(conffile);
if (confdb_handle == 0)
exit(EXIT_FAILURE);
if (configure_logging(confdb_handle, 0) < 0) {
fprintf(stderr, "Unable to initialize logging subsystem\n");
exit(EXIT_FAILURE);
}
logt_print(LOG_INFO, PACKAGE " version " VERSION "\n");
logt_exit();
if (daemonize) {
if (daemon(0, 0) < 0) {
perror("Unable to daemonize");
exit(EXIT_FAILURE);
}
}
logt_reinit();
signal(SIGTERM, sigterm_handler);
signal(SIGPIPE, sigpipe_handler);
parse_global_config(confdb_handle);
mainconf = parse_nodes_config(confdb_handle);
if (statistics)
logt_print(LOG_DEBUG, "statistics collector enabled\n");
if (rerouting)
logt_print(LOG_DEBUG, "rerouting engine enabled\n");
logt_print(LOG_DEBUG, "Adjust OOM to -16\n");
set_oom_adj(-16);
logt_print(LOG_DEBUG, "Set RR scheduler\n");
set_scheduler();
/* do stuff here, should we */
logt_print(LOG_DEBUG, "Starting daemon control thread\n");
if (start_control_thread() < 0)
goto out;
logt_print(LOG_DEBUG, "Initializing local ethernet\n");
strncpy(localnet, "clusternet", 16);
eth_fd = cnet_open(localnet, 16);
if (eth_fd < 0) {
logt_print(LOG_INFO, "Unable to inizialize local tap device: %s\n",
strerror(errno));
goto out;
}
logt_print(LOG_DEBUG, "Initializing local ethernet delivery thread\n");
rv = pthread_create(&eth_thread, NULL, eth_to_cnet_thread, NULL);
if (rv < 0) {
eth_thread_started = 0;
logt_print(LOG_INFO, "Unable to inizialize local RX thread. error: %s\n",
strerror(errno));
goto out;
}
+ if (pthread_mutex_init(&cnet_mutex, NULL) < 0) {
+ logt_print(LOG_INFO, "Unable to initialize cnet mutex: %s\n", strerror(errno));
+ goto out;
+ }
+
+ logt_print(LOG_DEBUG, "Initializing remote delivery thread\n");
+ rv = pthread_create(&cnet_thread, NULL, cnet_to_eth_thread, NULL);
+ if (rv < 0) {
+ cnet_thread_started = 0;
+ logt_print(LOG_INFO, "Unable to inizialize local TX thread. error: %s\n",
+ strerror(errno));
+ goto out;
+ }
+
logt_print(LOG_DEBUG, "Starting network socket listener\n");
net_sock = setup_net_listener();
if (net_sock < 0)
goto out;
logt_print(LOG_DEBUG, "Entering main loop\n");
loop();
out:
- disconnect_from_nodes(mainconf);
+ if (cnet_thread_started > 0)
+ pthread_cancel(cnet_thread);
if (eth_thread_started > 0)
pthread_cancel(eth_thread);
+ disconnect_from_nodes(mainconf);
+
if (eth_fd >= 0)
close(eth_fd);
if (net_sock >= 0)
close(net_sock);
stop_control_thread();
free_nodes_config(mainconf);
free(conffile);
freeconf(confdb_handle);
close_logging();
return 0;
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Jun 25, 5:53 AM (1 d, 43 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952368
Default Alt Text
(13 KB)

Event Timeline