diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6a5584b --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +INIT_DIR=/etc/init.d +RSC_DIR=/usr/lib/ocf/resource.d/pacemaker +BOOTH_SITE=./script/ocf/booth-site +BOOTH_ARBITRATOR=./script/lsb/booth-arbitrator + +INSTALL=$(shell which install) + +all: + ${MAKE} -C ./src + +install: + ${MAKE} -C ./src install + mkdir -p $(DESTDIR)/$(RSC_DIR) + mkdir -p $(DESTDIR)/$(INIT_DIR) + $(INSTALL) -c -m 755 $(BOOTH_SITE) $(DESTDIR)/$(RSC_DIR) + $(INSTALL) -c -m 755 $(BOOTH_ARBITRATOR) $(DESTDIR)/$(INIT_DIR) + +clean: + ${MAKE} -C ./src clean diff --git a/README b/README new file mode 100644 index 0000000..0614448 --- /dev/null +++ b/README @@ -0,0 +1,17 @@ +The Booth Cluster Ticket Manager + +Booth manages the ticket which authorizes one of the cluster sites located in +geographically dispersed distances to run certain resources. It is designed to +be an add-on of Pacemaker, which extends Pacemaker to support geographically +distributed clustering. + +Booth includes an implementation of Paxos and Paxos Lease algorithm, which +guarantees the distributed consensus among different cluster sites. One +development goal of the Paxos implementation is to be flexible enough so that +it can also be used by some other projects which need Paxos. + +Now, the Booth is still in heavy development, review and comments are highly +appreciated ;) + +Regards, +Jiaju Zhang diff --git a/script/lsb/booth-arbitrator b/script/lsb/booth-arbitrator new file mode 100755 index 0000000..c5e2f82 --- /dev/null +++ b/script/lsb/booth-arbitrator @@ -0,0 +1,89 @@ +#!/bin/bash +# +# BOOTH daemon init script for LSB-compliant Linux distributions. +# +# booth-arbitrator BOOTH arbitrator daemon +# +# chkconfig: - 20 20 +# processname: booth +# pidfile: /var/run/booth.pid +# description: Cluster Ticket Registry +### BEGIN INIT INFO +# Provides: booth +# Required-Start: $network $syslog +# Required-Stop: $network $syslog +# Should-Start: +# Should-Stop: +# Default-Start: 3 5 +# Default-Stop: 0 6 +# Short-Description: start and stop BOOTH arbitrator daemon +### END INIT INFO + +prog="booth" +exec="/usr/sbin/$prog" +type="arbitrator" +lockfile="/var/run/$prog.pid" + +[ -f /etc/sysconfig/$prog ] && . /etc/sysconfig/$prog + +. /etc/rc.status + +internal_status() { + checkproc $exec > /dev/null 2>&1 + return $? +} + +status() { + if internal_status; then + echo "Running" + return 0 + else + echo "Stopped" + return 7 + fi +} + +start() { + [ -x $exec ] || exit 5 + echo -n $"Starting BOOTH arbitrator daemon: " + if ! internal_status; then + startproc $exec $type + fi + rc_status -v +} + +stop() { + echo -n $"Stopping BOOTH arbitrator daemon: " + killproc -p $lockfile $prog -TERM + rc_status -v +} + +wait_for_stop() { + while [ -e $lockfile ]; do + sleep .5 + done +} + +restart() { + stop + wait_for_stop + start +} + +case "$1" in + start|stop|restart) + $1 + ;; + reload|force-reload) + restart + ;; + condrestart|try-restart) + [ ! -f "$lockfile" ] || restart + ;; + status) + status $prog + ;; + *) + echo $"Usage: $0 {start|stop|restart|try-restart|condrestart|reload|force-reload|status}" + exit 2 +esac diff --git a/script/ocf/booth-site b/script/ocf/booth-site new file mode 100755 index 0000000..0072422 --- /dev/null +++ b/script/ocf/booth-site @@ -0,0 +1,180 @@ +#!/bin/sh +# +# Resource Agent for BOOTH site daemon. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it would be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# Further, this software is distributed without any warranty that it is +# free of the rightful claim of any third person regarding infringement +# or the like. Any license provided herein, whether implied or +# otherwise, applies only to this software file. Patent licenses, if +# any, provided herein do not apply to combinations of this program with +# other software, or any other product whatsoever. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston MA 02111-1307, USA. +# + +####################################################################### +# Initialization: + +. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs + +####################################################################### + +meta_data() { + cat < + + +1.0 + + +This Resource Agent can control the BOOTH site daemon. +It assumes that the binary booth is in your default PATH. +In most cases, it should be run as a primitive resource. + +BOOTH site daemon + + + + + +Any additional options to start the BOOTH daemon with + +BOOTH Options + + + + + +The type of BOOTH daemon which should be started + +BOOTH Type + + + + + +The daemon to start + +The daemon to start + + + + + + + + + + + + + +END +} + +####################################################################### + +booth_usage() { + cat <> cscope.files + find . -name '*.[ch]' >> cscope.files + cscope -b + diff --git a/src/booth.h b/src/booth.h new file mode 100644 index 0000000..b6dfddd --- /dev/null +++ b/src/booth.h @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _BOOTH_H +#define _BOOTH_H + +#include +#include +#include + +#define BOOTH_LOG_DUMP_SIZE (1024*1024) + +#define BOOTH_RUN_DIR "/var/run" +#define BOOTH_LOG_DIR "/var/log" +#define BOOTH_LOGFILE_NAME "booth.log" +#define BOOTH_LOCKFILE_NAME "booth.pid" +#define BOOTH_DEFAULT_CONF "/etc/sysconfig/booth" + +#define DAEMON_NAME "booth" +#define BOOTH_NAME_LEN 63 + +#define BOOTHC_SOCK_PATH "boothc_lock" +#define BOOTH_PROTO_FAMILY AF_INET +#define BOOTH_CMD_PORT 22075 + +#define BOOTHC_MAGIC 0x5F1BA08C +#define BOOTHC_VERSION 0x00010000 + +#define BOOTHC_OPT_FORCE 0x00000001 + +struct boothc_header { + uint32_t magic; + uint32_t version; + uint32_t cmd; + uint32_t option; + uint32_t expiry; + uint32_t len; + uint32_t result; +}; + +typedef enum { + BOOTHC_CMD_LIST = 1, + BOOTHC_CMD_GRANT, + BOOTHC_CMD_REVOKE, +} cmd_request_t; + +typedef enum { + BOOTHC_RLT_ASYNC = 1, + BOOTHC_RLT_SYNC_SUCC, + BOOTHC_RLT_SYNC_FAIL, + BOOTHC_RLT_INVALID_ARG, + BOOTHC_RLT_REMOTE_OP, +} cmd_result_t; + +struct client { + int fd; + void *workfn; + void *deadfn; +}; + +int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)); +int do_read(int fd, void *buf, size_t count); +int do_write(int fd, void *buf, size_t count); +void process_connection(int ci); + +#endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c new file mode 100644 index 0000000..725743f --- /dev/null +++ b/src/config.c @@ -0,0 +1,270 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include "booth.h" +#include "config.h" +#include "log.h" + +static int ticket_size = 0; + +static int ticket_realloc(void) +{ + void *p; + + booth_conf = realloc(booth_conf, sizeof(struct booth_config) + + (ticket_size + TICKET_ALLOC) + * sizeof(struct ticket_config)); + if (!booth_conf) { + log_error("can't alloc more booth config"); + return -ENOMEM; + } + + p = booth_conf + sizeof(struct booth_config) + + ticket_size * sizeof(struct ticket_config); + memset(p, 0, TICKET_ALLOC * sizeof(struct ticket_config)); + ticket_size += TICKET_ALLOC; + + return 0; +} + +int read_config(const char *path) +{ + char line[1024]; + FILE *fp; + char *s, *k, *v, *e, *w, *c; + int quo, equ, ischar, i; + int lineno = 0; + + fp = fopen(path, "r"); + if (!fp) { + log_error("failed to open %s: %s", path, strerror(errno)); + return -1; + } + + booth_conf = malloc(sizeof(struct booth_config) + + TICKET_ALLOC * sizeof(struct ticket_config)); + if (!booth_conf) { + log_error("failed to alloc memory for booth config"); + return -ENOMEM; + } + memset(booth_conf, 0, sizeof(struct booth_config) + + TICKET_ALLOC * sizeof(struct ticket_config)); + ticket_size = TICKET_ALLOC; + + while (fgets(line, sizeof(line), fp)) { + lineno++; + s = line; + while (*s == ' ') + s++; + if (*s == '#' || *s == '\n') + continue; + if (*s == '-' || *s == '.' || *s =='/' + || *s == '+' || *s == '(' || *s == ')' + || *s == ':' || *s == ',' || *s == '@' + || *s == '=' || *s == '"') { + log_error("invalid key name in config file " + "('%c', lineno %d)", *s, lineno); + goto out; + } + k = s; + v = NULL; + quo = 0; + equ = 0; + ischar = 0; + while (*s != '\n') { + if (!(*s >='a' && *s <= 'z') + && !(*s >= 'A' && *s <= 'Z') + && !(*s >= '0' && *s <= '9') + && !(*s == '_') + && !(*s == '-') + && !(*s == '.') + && !(*s == '/') + && !(*s == ' ') + && !(*s == '+') + && !(*s == '(') + && !(*s == ')') + && !(*s == ':') + && !(*s == ',') + && !(*s == '@') + && !(*s == '=') + && !(*s == '"')) { + log_error("invalid character ('%c', lineno %d)" + " in config file", *s, lineno); + goto out; + } + if (*s == '=' && !equ) { + equ = 1; + *s = '\0'; + v = s + 1; + } else if (*s == '=' && equ && !quo) { + log_error("invalid config file format " + "(lineno %d)", lineno); + goto out; + } else if ((*s == '_' || *s == '-' || *s == '.') + && equ && !quo) { + log_error("invalid config file format " + "(lineno %d)", lineno); + goto out; + } else if ((*s == '/' || *s == ' ' || *s == '+' + || *s == '(' || *s == ')' || *s == ':' + || *s == ',' || *s == '@') && !quo) { + log_error("invalid config file format " + "(lineno %d)", lineno); + goto out; + } else if (*s == '"' && !equ) { + log_error("invalid config file format " + "(lineno %d)", lineno); + goto out; + } else if (*s == '"' && !quo) { + quo = 1; + if (v) { + v++; + ischar = 1; + } + } else if (*s == '"' && quo) { + quo = 0; + *s = '\0'; + } + s++; + } + if (!equ || quo) { + log_error("invalid config file format (lineno %d)", + lineno); + goto out; + } + if (!ischar) + *s = '\0'; + + if (strlen(k) > BOOTH_NAME_LEN + || strlen(v) > BOOTH_NAME_LEN) { + log_error("key/value too long"); + goto out; + } + + if (!strcmp(k, "transport")) { + if (!strcmp(v, "UDP")) + booth_conf->proto = UDP; + else if (!strcmp(v, "SCTP")) + booth_conf->proto = SCTP; + else { + log_error("invalid transport protocol"); + goto out; + } + } + + if (!strcmp(k, "port")) + booth_conf->port = atoi(v); + + if (!strcmp(k, "site")) { + if (booth_conf->node_count == MAX_NODES) { + log_error("too many nodes"); + goto out; + } + booth_conf->node[booth_conf->node_count].family = + BOOTH_PROTO_FAMILY; + booth_conf->node[booth_conf->node_count].type = SITE; + booth_conf->node[booth_conf->node_count].nodeid = + booth_conf->node_count; + strcpy(booth_conf->node[booth_conf->node_count++].addr, + v); + } + + if (!strcmp(k, "arbitrator")) { + if (booth_conf->node_count == MAX_NODES) { + log_error("too many nodes"); + goto out; + } + booth_conf->node[booth_conf->node_count].family = + BOOTH_PROTO_FAMILY; + booth_conf->node[booth_conf->node_count].type = + ARBITRATOR; + booth_conf->node[booth_conf->node_count].nodeid = + booth_conf->node_count; + strcpy(booth_conf->node[booth_conf->node_count++].addr, + v); + } + + if (!strcmp(k, "ticket")) { + int count = booth_conf->ticket_count; + if (booth_conf->ticket_count == ticket_size) { + if (ticket_realloc() < 0) + goto out; + } + e = index(v, ';'); + w = rindex(v, ';'); + if (!e) + strcpy(booth_conf->ticket[count].name, v); + else if (e && e == w) { + *e++ = '\0'; + while (*e == ' ') + e++; + strcpy(booth_conf->ticket[count].name, v); + booth_conf->ticket[count].expiry = atoi(e); + } else { + *e++ = '\0'; + *w++ = '\0'; + while (*e == ' ') + e++; + while (*w == ' ') + w++; + strcpy(booth_conf->ticket[count].name, v); + booth_conf->ticket[count].expiry = atoi(e); + i = 0; + while ((c = index(w, ','))) { + *c++ = '\0'; + booth_conf->ticket[count].weight[i++] + = atoi(w); + while (*c == ' ') + c++; + w = c; + if (i == MAX_NODES) { + log_error("too many weights"); + break; + } + } + } + booth_conf->ticket_count++; + } + } + return 0; + +out: + free(booth_conf); + return -1; +} + +int check_config(int type) +{ +// int i; + + if (!booth_conf) + return -1; + +/* for (i = 0; i < booth_conf->node_count; i++) { + if (booth_conf->node[i].local && booth_conf->node[i].type == + type) + return 0; + } + + return -1;*/ + return 0; +} diff --git a/src/config.h b/src/config.h new file mode 100644 index 0000000..8df752f --- /dev/null +++ b/src/config.h @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _CONFIG_H +#define _CONFIG_H + +#include +#include "booth.h" +#include "transport.h" + +#define MAX_NODES 16 +#define TICKET_ALLOC 16 + +struct ticket_config { + int weight[MAX_NODES]; + int expiry; + char name[BOOTH_NAME_LEN]; +}; + +struct booth_config { + int node_count; + int ticket_count; + int proto; + uint16_t port; + struct booth_node node[MAX_NODES]; + struct ticket_config ticket[0]; +}; + +struct booth_config *booth_conf; + +int read_config(const char *path); + +int check_config(int type); + +#endif /* _CONFIG_H */ diff --git a/src/list.h b/src/list.h new file mode 100644 index 0000000..3a2b3d4 --- /dev/null +++ b/src/list.h @@ -0,0 +1,568 @@ +/* Copied from linux kernel */ + +#ifndef _LINUX_LIST_H +#define _LINUX_LIST_H + +/* + * Simple doubly linked list implementation. + * + * Some of the internal functions ("__xxx") are useful when + * manipulating whole lists rather than single entries, as + * sometimes we already know the next/prev entries and we can + * generate better code by using them directly rather than + * using the generic single-entry routines. + */ + +/** + * Get offset of a member + */ +#define offsetof(type, member) ((size_t) &((type *)0)->member) + +/** + * container_of - cast a member of a structure out to the containing structure + * + * @ptr: the pointer to the member. + * @type: the type of the container struct this is embedded in. + * @member: the name of the member within the struct. + * + */ +#define container_of(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - offsetof(type,member) );}) + +#define LIST_POISON1 ((void *) 0x00100100) +#define LIST_POISON2 ((void *) 0x00200200) + +struct list_head { + struct list_head *next, *prev; +}; + +#define LIST_HEAD_INIT(name) { &(name), &(name) } + +#define LIST_HEAD(name) \ + struct list_head name = LIST_HEAD_INIT(name) + +static inline void INIT_LIST_HEAD(struct list_head *list) +{ + list->next = list; + list->prev = list; +} + +/* + * Insert a new entry between two known consecutive entries. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_add(struct list_head *new, + struct list_head *prev, + struct list_head *next) +{ + next->prev = new; + new->next = next; + new->prev = prev; + prev->next = new; +} + +/** + * list_add - add a new entry + * @new: new entry to be added + * @head: list head to add it after + * + * Insert a new entry after the specified head. + * This is good for implementing stacks. + */ +static inline void list_add(struct list_head *new, struct list_head *head) +{ + __list_add(new, head, head->next); +} + + +/** + * list_add_tail - add a new entry + * @new: new entry to be added + * @head: list head to add it before + * + * Insert a new entry before the specified head. + * This is useful for implementing queues. + */ +static inline void list_add_tail(struct list_head *new, struct list_head *head) +{ + __list_add(new, head->prev, head); +} + +/* + * Delete a list entry by making the prev/next entries + * point to each other. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_del(struct list_head * prev, struct list_head * next) +{ + next->prev = prev; + prev->next = next; +} + +/** + * list_del - deletes entry from list. + * @entry: the element to delete from the list. + * Note: list_empty() on entry does not return true after this, the entry is + * in an undefined state. + */ +static inline void list_del(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + entry->next = LIST_POISON1; + entry->prev = LIST_POISON2; +} + +/** + * list_replace - replace old entry by new one + * @old : the element to be replaced + * @new : the new element to insert + * + * If @old was empty, it will be overwritten. + */ +static inline void list_replace(struct list_head *old, + struct list_head *new) +{ + new->next = old->next; + new->next->prev = new; + new->prev = old->prev; + new->prev->next = new; +} + +static inline void list_replace_init(struct list_head *old, + struct list_head *new) +{ + list_replace(old, new); + INIT_LIST_HEAD(old); +} + +/** + * list_del_init - deletes entry from list and reinitialize it. + * @entry: the element to delete from the list. + */ +static inline void list_del_init(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + INIT_LIST_HEAD(entry); +} + +/** + * list_move - delete from one list and add as another's head + * @list: the entry to move + * @head: the head that will precede our entry + */ +static inline void list_move(struct list_head *list, struct list_head *head) +{ + __list_del(list->prev, list->next); + list_add(list, head); +} + +/** + * list_move_tail - delete from one list and add as another's tail + * @list: the entry to move + * @head: the head that will follow our entry + */ +static inline void list_move_tail(struct list_head *list, + struct list_head *head) +{ + __list_del(list->prev, list->next); + list_add_tail(list, head); +} + +/** + * list_is_last - tests whether @list is the last entry in list @head + * @list: the entry to test + * @head: the head of the list + */ +static inline int list_is_last(const struct list_head *list, + const struct list_head *head) +{ + return list->next == head; +} + +/** + * list_empty - tests whether a list is empty + * @head: the list to test. + */ +static inline int list_empty(const struct list_head *head) +{ + return head->next == head; +} + +/** + * list_empty_careful - tests whether a list is empty and not being modified + * @head: the list to test + * + * Description: + * tests whether a list is empty _and_ checks that no other CPU might be + * in the process of modifying either member (next or prev) + * + * NOTE: using list_empty_careful() without synchronization + * can only be safe if the only activity that can happen + * to the list entry is list_del_init(). Eg. it cannot be used + * if another CPU could re-list_add() it. + */ +static inline int list_empty_careful(const struct list_head *head) +{ + struct list_head *next = head->next; + return (next == head) && (next == head->prev); +} + +/** + * list_rotate_left - rotate the list to the left + * @head: the head of the list + */ +static inline void list_rotate_left(struct list_head *head) +{ + struct list_head *first; + + if (!list_empty(head)) { + first = head->next; + list_move_tail(first, head); + } +} + +/** + * list_is_singular - tests whether a list has just one entry. + * @head: the list to test. + */ +static inline int list_is_singular(const struct list_head *head) +{ + return !list_empty(head) && (head->next == head->prev); +} + +static inline void __list_cut_position(struct list_head *list, + struct list_head *head, struct list_head *entry) +{ + struct list_head *new_first = entry->next; + list->next = head->next; + list->next->prev = list; + list->prev = entry; + entry->next = list; + head->next = new_first; + new_first->prev = head; +} + +/** + * list_cut_position - cut a list into two + * @list: a new list to add all removed entries + * @head: a list with entries + * @entry: an entry within head, could be the head itself + * and if so we won't cut the list + * + * This helper moves the initial part of @head, up to and + * including @entry, from @head to @list. You should + * pass on @entry an element you know is on @head. @list + * should be an empty list or a list you do not care about + * losing its data. + * + */ +static inline void list_cut_position(struct list_head *list, + struct list_head *head, struct list_head *entry) +{ + if (list_empty(head)) + return; + if (list_is_singular(head) && + (head->next != entry && head != entry)) + return; + if (entry == head) + INIT_LIST_HEAD(list); + else + __list_cut_position(list, head, entry); +} + +static inline void __list_splice(const struct list_head *list, + struct list_head *prev, + struct list_head *next) +{ + struct list_head *first = list->next; + struct list_head *last = list->prev; + + first->prev = prev; + prev->next = first; + + last->next = next; + next->prev = last; +} + +/** + * list_splice - join two lists, this is designed for stacks + * @list: the new list to add. + * @head: the place to add it in the first list. + */ +static inline void list_splice(const struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) + __list_splice(list, head, head->next); +} + +/** + * list_splice_tail - join two lists, each list being a queue + * @list: the new list to add. + * @head: the place to add it in the first list. + */ +static inline void list_splice_tail(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) + __list_splice(list, head->prev, head); +} + +/** + * list_splice_init - join two lists and reinitialise the emptied list. + * @list: the new list to add. + * @head: the place to add it in the first list. + * + * The list at @list is reinitialised + */ +static inline void list_splice_init(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) { + __list_splice(list, head, head->next); + INIT_LIST_HEAD(list); + } +} + +/** + * list_splice_tail_init - join two lists and reinitialise the emptied list + * @list: the new list to add. + * @head: the place to add it in the first list. + * + * Each of the lists is a queue. + * The list at @list is reinitialised + */ +static inline void list_splice_tail_init(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) { + __list_splice(list, head->prev, head); + INIT_LIST_HEAD(list); + } +} + +/** + * list_entry - get the struct for this entry + * @ptr: the &struct list_head pointer. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + */ +#define list_entry(ptr, type, member) \ + container_of(ptr, type, member) + +/** + * list_first_entry - get the first element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +/** + * list_for_each - iterate over a list + * @pos: the &struct list_head to use as a loop cursor. + * @head: the head for your list. + */ +#define list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); pos = pos->next) + +/** + * __list_for_each - iterate over a list + * @pos: the &struct list_head to use as a loop cursor. + * @head: the head for your list. + * + * This variant differs from list_for_each() in that it's the + * simplest possible list iteration code, no prefetching is done. + * Use this for code that knows the list to be very short (empty + * or 1 entry) most of the time. + */ +#define __list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); pos = pos->next) + +/** + * list_for_each_prev - iterate over a list backwards + * @pos: the &struct list_head to use as a loop cursor. + * @head: the head for your list. + */ +#define list_for_each_prev(pos, head) \ + for (pos = (head)->prev; pos != (head); pos = pos->prev) + +/** + * list_for_each_safe - iterate over a list safe against removal of list entry + * @pos: the &struct list_head to use as a loop cursor. + * @n: another &struct list_head to use as temporary storage + * @head: the head for your list. + */ +#define list_for_each_safe(pos, n, head) \ + for (pos = (head)->next, n = pos->next; pos != (head); \ + pos = n, n = pos->next) + +/** + * list_for_each_prev_safe - iterate over a list backwards safe against removal of list entry + * @pos: the &struct list_head to use as a loop cursor. + * @n: another &struct list_head to use as temporary storage + * @head: the head for your list. + */ +#define list_for_each_prev_safe(pos, n, head) \ + for (pos = (head)->prev, n = pos->prev; \ + pos != (head); \ + pos = n, n = pos->prev) + +/** + * list_for_each_entry - iterate over list of given type + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry(pos, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + +/** + * list_for_each_entry_reverse - iterate backwards over list of given type. + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_reverse(pos, head, member) \ + for (pos = list_entry((head)->prev, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.prev, typeof(*pos), member)) + +/** + * list_prepare_entry - prepare a pos entry for use in list_for_each_entry_continue() + * @pos: the type * to use as a start point + * @head: the head of the list + * @member: the name of the list_struct within the struct. + * + * Prepares a pos entry for use as a start point in list_for_each_entry_continue(). + */ +#define list_prepare_entry(pos, head, member) \ + ((pos) ? : list_entry(head, typeof(*pos), member)) + +/** + * list_for_each_entry_continue - continue iteration over list of given type + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + * + * Continue to iterate over list of given type, continuing after + * the current position. + */ +#define list_for_each_entry_continue(pos, head, member) \ + for (pos = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + +/** + * list_for_each_entry_continue_reverse - iterate backwards from the given point + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + * + * Start to iterate over list of given type backwards, continuing after + * the current position. + */ +#define list_for_each_entry_continue_reverse(pos, head, member) \ + for (pos = list_entry(pos->member.prev, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.prev, typeof(*pos), member)) + +/** + * list_for_each_entry_from - iterate over list of given type from the current point + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + * + * Iterate over list of given type, continuing from current position. + */ +#define list_for_each_entry_from(pos, head, member) \ + for (; &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + +/** + * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry + * @pos: the type * to use as a loop cursor. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe(pos, n, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member), \ + n = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.next, typeof(*n), member)) + +/** + * list_for_each_entry_safe_continue - continue list iteration safe against removal + * @pos: the type * to use as a loop cursor. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + * + * Iterate over list of given type, continuing after current point, + * safe against removal of list entry. + */ +#define list_for_each_entry_safe_continue(pos, n, head, member) \ + for (pos = list_entry(pos->member.next, typeof(*pos), member), \ + n = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.next, typeof(*n), member)) + +/** + * list_for_each_entry_safe_from - iterate over list from current point safe against removal + * @pos: the type * to use as a loop cursor. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + * + * Iterate over list of given type from current point, safe against + * removal of list entry. + */ +#define list_for_each_entry_safe_from(pos, n, head, member) \ + for (n = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.next, typeof(*n), member)) + +/** + * list_for_each_entry_safe_reverse - iterate backwards over list safe against removal + * @pos: the type * to use as a loop cursor. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + * + * Iterate backwards over list of given type, safe against removal + * of list entry. + */ +#define list_for_each_entry_safe_reverse(pos, n, head, member) \ + for (pos = list_entry((head)->prev, typeof(*pos), member), \ + n = list_entry(pos->member.prev, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.prev, typeof(*n), member)) + +/** + * list_safe_reset_next - reset a stale list_for_each_entry_safe loop + * @pos: the loop cursor used in the list_for_each_entry_safe loop + * @n: temporary storage used in list_for_each_entry_safe + * @member: the name of the list_struct within the struct. + * + * list_safe_reset_next is not safe to use in general if the list may be + * modified concurrently (eg. the lock is dropped in the loop body). An + * exception to this is if the cursor element (pos) is pinned in the list, + * and list_safe_reset_next is called after re-taking the lock and before + * completing the current iteration of the loop body. + */ +#define list_safe_reset_next(pos, n, member) \ + n = list_entry(pos->member.next, typeof(*pos), member) + +#endif + diff --git a/src/log.c b/src/log.c new file mode 100644 index 0000000..2270912 --- /dev/null +++ b/src/log.c @@ -0,0 +1,245 @@ +/* + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "booth.h" +#include "log.h" + +#define LOG_STR_LEN 256 +static char log_str[LOG_STR_LEN]; + +static pthread_t thread_handle; + +static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t log_cond = PTHREAD_COND_INITIALIZER; + +static char log_dump[BOOTH_LOG_DUMP_SIZE]; +static unsigned int log_point; +static unsigned int log_wrap; + +static char logfile_path[PATH_MAX]; +static FILE *logfile_fp; + +struct entry { + int level; + char str[LOG_STR_LEN]; +}; + +#define BOOTH_LOG_DEFAULT_ENTRIES 4096 +static struct entry *log_ents; +static unsigned int log_num_ents = BOOTH_LOG_DEFAULT_ENTRIES; +static unsigned int log_head_ent; /* add at head */ +static unsigned int log_tail_ent; /* remove from tail */ +static unsigned int log_dropped; +static unsigned int log_pending_ents; +static unsigned int log_thread_done; + +extern int log_logfile_priority; +extern int log_syslog_priority; +extern int log_stderr_priority; + +static void _log_save_dump(int level __attribute__((unused)), int len) +{ + int i; + + for (i = 0; i < len; i++) { + log_dump[log_point++] = log_str[i]; + + if (log_point == BOOTH_LOG_DUMP_SIZE) { + log_point = 0; + log_wrap = 1; + } + } +} + +static void _log_save_ent(int level, int len) +{ + struct entry *e; + + if (!log_ents) + return; + + if (log_pending_ents == log_num_ents) { + log_dropped++; + return; + } + + e = &log_ents[log_head_ent++]; + log_head_ent = log_head_ent % log_num_ents; + log_pending_ents++; + + e->level = level; + memcpy(e->str, log_str, len); +} + +void log_level(int level, const char *fmt, ...) +{ + va_list ap; + char name[BOOTH_NAME_LEN + 1]; + int ret, pos = 0; + int len = LOG_STR_LEN - 2; /* leave room for \n\0 */ + + memset(name, 0, sizeof(name)); + + pthread_mutex_lock(&log_mutex); + + ret = snprintf(log_str + pos, len - pos, "%ld %s ", + time(NULL), name); + pos += ret; + + va_start(ap, fmt); + ret = vsnprintf(log_str + pos, len - pos, fmt, ap); + va_end(ap); + + if (ret >= len - pos) + pos = len - 1; + else + pos += ret; + + log_str[pos++] = '\n'; + log_str[pos++] = '\0'; + + /* + * save all messages in circular buffer "log_dump" that can be + * sent over unix socket + */ + + _log_save_dump(level, pos - 1); + + /* + * save some messages in circular array "log_ents" that a thread + * writes to logfile/syslog + */ + + if (level <= log_logfile_priority || level <= log_syslog_priority) + _log_save_ent(level, pos); + + if (level <= log_stderr_priority) + fprintf(stderr, "%s", log_str); + + pthread_cond_signal(&log_cond); + pthread_mutex_unlock(&log_mutex); +} + +static void write_entry(int level, char *str) +{ + if ((level <= log_logfile_priority) && logfile_fp) { + fprintf(logfile_fp, "%s", str); + fflush(logfile_fp); + } + if (level <= log_syslog_priority) + syslog(level, "%s", str); +} + +static void write_dropped(int level, int num) +{ + char str[LOG_STR_LEN]; + sprintf(str, "dropped %d entries", num); + write_entry(level, str); +} + +static void *log_thread_fn(void *arg __attribute__((unused))) +{ + char str[LOG_STR_LEN]; + struct entry *e; + int level, prev_dropped = 0; + + while (1) { + pthread_mutex_lock(&log_mutex); + while (log_head_ent == log_tail_ent) { + if (log_thread_done) { + pthread_mutex_unlock(&log_mutex); + goto out; + } + pthread_cond_wait(&log_cond, &log_mutex); + } + + e = &log_ents[log_tail_ent++]; + log_tail_ent = log_tail_ent % log_num_ents; + log_pending_ents--; + + memcpy(str, e->str, LOG_STR_LEN); + level = e->level; + + prev_dropped = log_dropped; + log_dropped = 0; + pthread_mutex_unlock(&log_mutex); + + if (prev_dropped) { + write_dropped(level, prev_dropped); + prev_dropped = 0; + } + + write_entry(level, str); + } +out: + pthread_exit(NULL); +} + + +int setup_logging(void) +{ + int fd, rv; + + snprintf(logfile_path, PATH_MAX, "%s/%s", BOOTH_LOG_DIR, + BOOTH_LOGFILE_NAME); + + logfile_fp = fopen(logfile_path, "a+"); + if (logfile_fp) { + fd = fileno(logfile_fp); + fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC); + } + + log_ents = malloc(log_num_ents * sizeof(struct entry)); + if (!log_ents) { + fclose(logfile_fp); + logfile_fp = NULL; + return -1; + } + memset(log_ents, 0, log_num_ents * sizeof(struct entry)); + + openlog(DAEMON_NAME, LOG_CONS | LOG_PID, LOG_DAEMON); + + rv = pthread_create(&thread_handle, NULL, log_thread_fn, NULL); + if (rv) + return -1; + + return 0; +} + +void close_logging(void) +{ + pthread_mutex_lock(&log_mutex); + log_thread_done = 1; + pthread_cond_signal(&log_cond); + pthread_mutex_unlock(&log_mutex); + pthread_join(thread_handle, NULL); + + pthread_mutex_lock(&log_mutex); + closelog(); + if (logfile_fp) { + fclose(logfile_fp); + logfile_fp = NULL; + } + pthread_mutex_unlock(&log_mutex); +} diff --git a/src/log.h b/src/log.h new file mode 100644 index 0000000..5d36497 --- /dev/null +++ b/src/log.h @@ -0,0 +1,32 @@ +/* + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _LOG_H +#define _LOG_H + +#include + +void log_level(int level, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); + +int setup_logging(void); +void close_logging(void); + +#define log_debug(fmt, args...) log_level(LOG_DEBUG, fmt, ##args) +#define log_info(fmt, args...) log_level(LOG_INFO, fmt, ##args) +#define log_error(fmt, args...) log_level(LOG_ERR, fmt, ##args) + +#endif /* _LOG_H */ diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..0822a0f --- /dev/null +++ b/src/main.c @@ -0,0 +1,1000 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "log.h" +#include "booth.h" +#include "config.h" +#include "transport.h" +#include "timer.h" +#include "pacemaker.h" +#include "ticket.h" + +#define RELEASE_VERSION "1.0" + +#define CLIENT_NALLOC 32 + +int log_logfile_priority = LOG_INFO; +int log_syslog_priority = LOG_ERR; +int log_stderr_priority = LOG_ERR; + +static int client_maxi; +static int client_size = 0; +struct client *client = NULL; +struct pollfd *pollfd = NULL; + +int poll_timeout = -1; + +typedef enum { + ACT_ARBITRATOR = 1, + ACT_SITE, + ACT_CLIENT, +} booth_role_t; + +typedef enum { + OP_LIST = 1, + OP_GRANT, + OP_REVOKE, +} operation_t; + +struct command_line { + int type; /* ACT_ */ + int op; /* OP_ */ + int debug; + int force; + int expiry; + char site[BOOTH_NAME_LEN]; + char ticket[BOOTH_NAME_LEN]; +}; + +static struct command_line cl; + +int do_read(int fd, void *buf, size_t count) +{ + int rv, off = 0; + + while (off < count) { + rv = read(fd, (char *)buf + off, count - off); + if (rv == 0) + return -1; + if (rv == -1 && errno == EINTR) + continue; + if (rv == -1) + return -1; + off += rv; + } + return 0; +} + +int do_write(int fd, void *buf, size_t count) +{ + int rv, off = 0; + +retry: + rv = write(fd, (char *)buf + off, count); + if (rv == -1 && errno == EINTR) + goto retry; + if (rv < 0) { + log_error("write errno %d", errno); + return rv; + } + + if (rv != count) { + count -= rv; + off += rv; + goto retry; + } + return 0; +} + +static int do_connect(const char *sock_path) +{ + struct sockaddr_un sun; + socklen_t addrlen; + int rv, fd; + + fd = socket(PF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + goto out; + + memset(&sun, 0, sizeof(sun)); + sun.sun_family = AF_UNIX; + strcpy(&sun.sun_path[1], sock_path); + addrlen = sizeof(sa_family_t) + strlen(sun.sun_path+1) + 1; + + rv = connect(fd, (struct sockaddr *) &sun, addrlen); + if (rv < 0) { + close(fd); + fd = rv; + } +out: + return fd; +} + +static void init_header(struct boothc_header *h, int cmd, int option, + int expiry, int result, int data_len) +{ + memset(h, 0, sizeof(struct boothc_header)); + + h->magic = BOOTHC_MAGIC; + h->version = BOOTHC_VERSION; + h->len = data_len; + h->cmd = cmd; + h->option = option; + h->expiry = expiry; + h->result = result; +} + +static void client_alloc(void) +{ + int i; + + if (!client) { + client = malloc(CLIENT_NALLOC * sizeof(struct client)); + pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); + } else { + client = realloc(client, (client_size + CLIENT_NALLOC) * + sizeof(struct client)); + pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * + sizeof(struct pollfd)); + if (!pollfd) + log_error("can't alloc for pollfd"); + } + if (!client || !pollfd) + log_error("can't alloc for client array"); + + for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { + client[i].workfn = NULL; + client[i].deadfn = NULL; + client[i].fd = -1; + pollfd[i].fd = -1; + pollfd[i].revents = 0; + } + client_size += CLIENT_NALLOC; +} + +static void client_dead(int ci) +{ + close(client[ci].fd); + client[ci].workfn = NULL; + client[ci].fd = -1; + pollfd[ci].fd = -1; +} + +int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) +{ + int i; + + if (!client) + client_alloc(); +again: + for (i = 0; i < client_size; i++) { + if (client[i].fd == -1) { + client[i].workfn = workfn; + if (deadfn) + client[i].deadfn = deadfn; + else + client[i].deadfn = client_dead; + client[i].fd = fd; + pollfd[i].fd = fd; + pollfd[i].events = POLLIN; + if (i > client_maxi) + client_maxi = i; + return i; + } + } + + client_alloc(); + goto again; +} + +static int setup_listener(const char *sock_path) +{ + struct sockaddr_un addr; + socklen_t addrlen; + int rv, s; + + s = socket(AF_LOCAL, SOCK_STREAM, 0); + if (s < 0) { + log_error("socket error %d %d", s, errno); + return s; + } + + memset(&addr, 0, sizeof(addr)); + + addr.sun_family = AF_LOCAL; + strcpy(&addr.sun_path[1], sock_path); + addrlen = sizeof(sa_family_t) + strlen(addr.sun_path+1) + 1; + + rv = bind(s, (struct sockaddr *) &addr, addrlen); + if (rv < 0) { + log_error("bind error %d %d", rv, errno); + close(s); + return rv; + } + + rv = listen(s, 5); + if (rv < 0) { + log_error("listen error %d %d", rv, errno); + close(s); + return rv; + } + return s; +} + +void process_connection(int ci) +{ + struct boothc_header h; + char *data = NULL; + char *site, *ticket; + int local, rv; + + rv = do_read(client[ci].fd, &h, sizeof(h)); + if (rv < 0) { + log_error("connection %d read error %d", ci, rv); + return; + } + if (h.magic != BOOTHC_MAGIC) { + log_error("connection %d magic error %x", ci, h.magic); + return; + } + if (h.version != BOOTHC_VERSION) { + log_error("connection %d version error %x", ci, h.version); + return; + } + + if (h.len) { + data = malloc(h.len); + if (!data) { + log_error("process_connection no mem %u", h.len); + return; + } + memset(data, 0, h.len); + + rv = do_read(client[ci].fd, data, h.len); + if (rv < 0) { + log_error("connection %d read data error %d", ci, rv); + goto out; + } + } + + switch (h.cmd) { + case BOOTHC_CMD_LIST: + break; + + case BOOTHC_CMD_GRANT: + site = data; + ticket = data + BOOTH_NAME_LEN; + if (!check_ticket(ticket)) { + h.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (!check_site(site, &local)) { + h.result = BOOTHC_RLT_INVALID_ARG; + goto reply; + } + if (local) + h.result = grant_ticket(ticket, h.option, h.expiry); + else + h.result = BOOTHC_RLT_REMOTE_OP; + break; + + case BOOTHC_CMD_REVOKE: + break; + + default: + log_error("connection %d cmd %x unknown", ci, h.cmd); + break; + } + +reply: + rv = do_write(client[ci].fd, &h, sizeof(h)); + if (rv < 0) + log_error("connection %d write error %d", ci, rv); +out: + free(data); +} + +static void process_listener(int ci) +{ + int fd, i; + + fd = accept(client[ci].fd, NULL, NULL); + if (fd < 0) { + log_error("process_listener: accept error %d %d", fd, errno); + return; + } + + i = client_add(fd, process_connection, NULL); + + log_debug("client connection %d fd %d", i, fd); +} + +static int setup_config(int type) +{ + int rv; + + rv = read_config(BOOTH_DEFAULT_CONF); + if (rv < 0) + goto out; + + rv = check_config(type); + if (rv < 0) + goto out; + +out: + return rv; +} + +static int setup_transport(void) +{ + int rv; + + rv = booth_transport[booth_conf->proto].init(ticket_recv); + if (rv < 0) + goto out; + + rv = booth_transport[TCP].init(NULL); + if (rv < 0) + goto out; + +out: + return rv; +} + +static int setup_timer(void) +{ + return timerlist_init(); +} + +static void loop(int type) +{ + void (*workfn) (int ci); + void (*deadfn) (int ci); + int rv, i; + + rv = setup_config(type); + if (rv < 0) + goto out; + + rv = setup_timer(); + if (rv < 0) + goto out; + + rv = setup_transport(); + if (rv < 0) + goto out; + + rv = setup_ticket(); + if (rv < 0) + goto out; + + rv = setup_listener(BOOTHC_SOCK_PATH); + if (rv < 0) + goto out; + client_add(rv, process_listener, NULL); + + while (1) { + rv = poll(pollfd, client_maxi + 1, poll_timeout); + if (rv == -1 && errno == EINTR) + continue; + if (rv < 0) { + log_error("poll errno %d", errno); + goto out; + } + + for (i = 0; i <= client_maxi; i++) { + if (client[i].fd < 0) + continue; + if (pollfd[i].revents & POLLIN) { + workfn = client[i].workfn; + if (workfn) + workfn(i); + } + if (pollfd[i].revents & + (POLLERR | POLLHUP | POLLNVAL)) { + deadfn = client[i].deadfn; + if (deadfn) + deadfn(i); + } + } + + process_timerlist(); + } + +out: + return; +} + +static int do_list(void) +{ + struct boothc_header h, *rh; + char *reply = NULL, *data; + int data_len; + int fd, rv; + + init_header(&h, BOOTHC_CMD_LIST, 0, 0, 0, 0); + + fd = do_connect(BOOTHC_SOCK_PATH); + if (fd < 0) { + rv = fd; + goto out; + } + + rv = do_write(fd, &h, sizeof(h)); + if (rv < 0) + goto out_close; + + reply = malloc(sizeof(struct boothc_header)); + if (!reply) { + rv = -ENOMEM; + goto out_close; + } + + rv = do_read(fd, reply, sizeof(struct boothc_header)); + if (rv < 0) + goto out_free; + + rh = (struct boothc_header *)reply; + data_len = rh->len; + + reply = realloc(reply, sizeof(struct boothc_header) + data_len); + if (!reply) { + rv = -ENOMEM; + goto out_free; + } + data = reply + sizeof(struct boothc_header); + rv = do_read(fd, data, data_len); + if (rv < 0) + goto out_free; + + do_write(STDOUT_FILENO, data, data_len); + rv = 0; + +out_free: + free(reply); +out_close: + close(fd); +out: + return rv; +} + +static int do_grant(void) +{ + char *buf; + struct boothc_header *h, reply; + int buflen; + uint32_t force; + int fd, rv; + + buflen = sizeof(struct boothc_header) + + sizeof(cl.site) + sizeof(cl.ticket); + buf = malloc(buflen); + if (!buf) { + rv = -ENOMEM; + goto out; + } + h = (struct boothc_header *)buf; + if (cl.force) + force = BOOTHC_OPT_FORCE; + init_header(h, BOOTHC_CMD_GRANT, force, cl.expiry, 0, + sizeof(cl.site) + sizeof(cl.ticket)); + strcpy(buf + sizeof(struct boothc_header), cl.site); + strcpy(buf + sizeof(struct boothc_header) + sizeof(cl.site), cl.ticket); + + fd = do_connect(BOOTHC_SOCK_PATH); + if (fd < 0) { + rv = fd; + goto out_free; + } + + rv = do_write(fd, buf, buflen); + if (rv < 0) + goto out_close; + + rv = do_read(fd, &reply, sizeof(struct boothc_header)); + if (rv < 0) + goto out_close; + + if (reply.result == BOOTHC_RLT_INVALID_ARG) { + log_info("invalid argument!"); + rv = -1; + goto out_close; + } + + if (reply.result == BOOTHC_RLT_REMOTE_OP) { + struct booth_node to; + int s; + + memset(&to, 0, sizeof(struct booth_node)); + to.family = BOOTH_PROTO_FAMILY; + strcpy(to.addr, cl.site); + + s = booth_transport[TCP].open(&to); + if (s < 0) + goto out_close; + + rv = booth_transport[TCP].send(s, buf, buflen); + if (rv < 0) { + booth_transport[TCP].close(s); + goto out_close; + } + rv = booth_transport[TCP].recv(s, &reply, + sizeof(struct boothc_header)); + if (rv < 0) { + booth_transport[TCP].close(s); + goto out_close; + } + booth_transport[TCP].close(s); + } + + if (reply.result == BOOTHC_RLT_ASYNC) { + log_info("grant command sent, but result is async."); + rv = 0; + } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { + log_info("grant succeeded!"); + rv = 0; + } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { + log_info("grant failed!"); + rv = 0; + } else { + log_error("internal error!"); + rv = -1; + } + +out_close: + close(fd); +out_free: + free(buf); +out: + return rv; +} + +static int do_revoke(void) +{ + char *buf; + struct boothc_header *h, reply; + int buflen; + int fd, rv; + + buflen = sizeof(struct boothc_header) + sizeof(cl.ticket); + buf = malloc(buflen); + if (!buf) { + rv = -ENOMEM; + goto out; + } + h = (struct boothc_header *)buf; + init_header(h, BOOTHC_CMD_REVOKE, 0, 0, 0, sizeof(cl.ticket)); + strcpy(buf + sizeof(struct boothc_header), cl.ticket); + + fd = do_connect(BOOTHC_SOCK_PATH); + if (fd < 0) { + rv = fd; + goto out_free; + } + + rv = do_write(fd, buf, buflen); + if (rv < 0) + goto out_close; + + rv = do_read(fd, &reply, sizeof(struct boothc_header)); + if (rv < 0) + goto out_close; + if (reply.result == BOOTHC_RLT_ASYNC) { + log_info("revoke command sent, but result is async."); + rv = 0; + } else if (reply.result == BOOTHC_RLT_SYNC_SUCC) { + log_info("revoke succeeded!"); + rv = 0; + } else if (reply.result == BOOTHC_RLT_SYNC_FAIL) { + log_info("revoke failed!"); + rv = 0; + } else { + log_error("internal error!"); + rv = -1; + } + +out_close: + close(fd); +out_free: + free(buf); +out: + return rv; +} + +static int lockfile(void) +{ + char path[PATH_MAX]; + char buf[16]; + struct flock lock; + int fd, rv; + + snprintf(path, PATH_MAX, "%s/%s", BOOTH_RUN_DIR, BOOTH_LOCKFILE_NAME); + + fd = open(path, O_CREAT|O_WRONLY, 0666); + if (fd < 0) { + log_error("lockfile open error %s: %s", + path, strerror(errno)); + return -1; + } + + lock.l_type = F_WRLCK; + lock.l_start = 0; + lock.l_whence = SEEK_SET; + lock.l_len = 0; + + rv = fcntl(fd, F_SETLK, &lock); + if (rv < 0) { + log_error("lockfile setlk error %s: %s", + path, strerror(errno)); + goto fail; + } + + rv = ftruncate(fd, 0); + if (rv < 0) { + log_error("lockfile truncate error %s: %s", + path, strerror(errno)); + goto fail; + } + + memset(buf, 0, sizeof(buf)); + snprintf(buf, sizeof(buf), "%d\n", getpid()); + + rv = write(fd, buf, strlen(buf)); + if (rv <= 0) { + log_error("lockfile write error %s: %s", + path, strerror(errno)); + goto fail; + } + + return fd; + fail: + close(fd); + return -1; +} + +static void unlink_lockfile(int fd) +{ + char path[PATH_MAX]; + + snprintf(path, PATH_MAX, "%s/%s", BOOTH_RUN_DIR, BOOTH_LOCKFILE_NAME); + unlink(path); + close(fd); +} + +static void print_usage(void) +{ + printf("Usage:\n"); + printf("booth [options]\n"); + printf("\n"); + printf("Types:\n"); + printf(" arbitrator: daemon running on arbitrator\n"); + printf(" site: daemon running on cluster site\n"); + printf(" client: command running from client\n"); + printf("\n"); + printf("Operations:\n"); + printf("Please note that operations are valid iff type is client!\n"); + printf("list: List all the tickets\n"); + printf("grant: Grant ticket T(-t T) to site S(-s S)\n"); + printf("revoke: Revoke ticket T(-t T) from local site\n"); + printf("\n"); + printf("Options:\n"); + printf(" -D Enable debugging to stderr and don't fork\n"); + printf(" -t ticket name\n"); + printf(" -s site name\n"); + printf(" -f ticket attribute: force, only valid when " + "granting\n"); + printf(" -e ticket will failover after the expiry time if " + "not being renewed,\n\t\tset it while " + "granting. default: 600sec\n"); + printf(" -h Print this help, then exit\n"); +} + +#define OPTION_STRING "Dt:s:fe:h" + +static int read_arguments(int argc, char **argv) +{ + int optchar; + char *arg1 = argv[1]; + char *op; + + if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || + !strcmp(arg1, "-h")) { + print_usage(); + exit(EXIT_SUCCESS); + } + + if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || + !strcmp(arg1, "-V")) { + printf("%s %s (built %s %s)\n", + argv[0], RELEASE_VERSION, __DATE__, __TIME__); + exit(EXIT_SUCCESS); + } + + if (!strcmp(arg1, "arbitrator")) { + cl.type = ACT_ARBITRATOR; + optind = 2; + } else if (!strcmp(arg1, "site")) { + cl.type = ACT_SITE; + optind = 2; + } else if (!strcmp(arg1, "client")) { + cl.type = ACT_CLIENT; + if (argc < 3) { + print_usage(); + exit(EXIT_FAILURE); + } + op = argv[2]; + optind = 3; + } else { + cl.type = ACT_CLIENT; + op = argv[1]; + optind = 2; + } + + switch (cl.type) { + case ACT_ARBITRATOR: + break; + + case ACT_SITE: + break; + + case ACT_CLIENT: + if (!strcmp(op, "list")) + cl.op = OP_LIST; + else if (!strcmp(op, "grant")) + cl.op = OP_GRANT; + else if (!strcmp(op, "revoke")) + cl.op = OP_REVOKE; + else { + fprintf(stderr, "client operation \"%s\" is unknown\n", + op); + exit(EXIT_FAILURE); + } + break; + } + + while (optind < argc) { + optchar = getopt(argc, argv, OPTION_STRING); + + switch (optchar) { + case 'D': + cl.debug = 1; + log_logfile_priority = LOG_DEBUG; + log_syslog_priority = LOG_DEBUG; + break; + + case 't': + if (cl.op == OP_GRANT || cl.op == OP_REVOKE) + strcpy(cl.ticket, optarg); + else { + print_usage(); + exit(EXIT_FAILURE); + } + break; + + case 's': + if (cl.op == OP_GRANT || cl.op == OP_REVOKE) + strcpy(cl.site, optarg); + else { + print_usage(); + exit(EXIT_FAILURE); + } + break; + + case 'f': + if (cl.op == OP_GRANT) + cl.force = 1; + else { + print_usage(); + exit(EXIT_FAILURE); + } + break; + + case 'e': + if (cl.op == OP_GRANT) + cl.expiry = atoi(optarg); + else { + print_usage(); + exit(EXIT_FAILURE); + } + break; + + case 'h': + print_usage(); + exit(EXIT_SUCCESS); + break; + + case ':': + case '?': + fprintf(stderr, "Please use '-h' for usage.\n"); + exit(EXIT_FAILURE); + break; + + default: + fprintf(stderr, "unknown option: %c\n", optchar); + exit(EXIT_FAILURE); + break; + }; + } + + return 0; +} + +static void set_scheduler(void) +{ + struct sched_param sched_param; + struct rlimit rlimit; + int rv; + + rlimit.rlim_cur = RLIM_INFINITY; + rlimit.rlim_max = RLIM_INFINITY; + setrlimit(RLIMIT_MEMLOCK, &rlimit); + rv = mlockall(MCL_CURRENT | MCL_FUTURE); + if (rv < 0) { + log_error("mlockall failed"); + } + + rv = sched_get_priority_max(SCHED_RR); + if (rv != -1) { + sched_param.sched_priority = rv; + rv = sched_setscheduler(0, SCHED_RR, &sched_param); + if (rv == -1) + log_error("could not set SCHED_RR priority %d err %d", + sched_param.sched_priority, errno); + } else { + log_error("could not get maximum scheduler priority err %d", + errno); + } +} + +static void set_oom_adj(int val) +{ + FILE *fp; + + fp = fopen("/proc/self/oom_adj", "w"); + if (!fp) + return; + + fprintf(fp, "%i", val); + fclose(fp); +} + +static int do_arbitrator(void) +{ + int fd; + + if (!cl.debug) { + if (daemon(0, 0) < 0) { + perror("daemon error"); + exit(EXIT_FAILURE); + } + } + + setup_logging(); + fd = lockfile(); + if (fd < 0) + return fd; + + log_info("BOOTH arbitrator daemon started"); + set_scheduler(); + set_oom_adj(-16); + + loop(ARBITRATOR); + + unlink_lockfile(fd); + close_logging(); + + return 0; +} + +static int do_site(void) +{ + int fd; + + if (!cl.debug) { + if (daemon(0, 0) < 0) { + perror("daemon error"); + exit(EXIT_FAILURE); + } + } + + setup_logging(); + fd = lockfile(); + if (fd < 0) + return fd; + + log_info("BOOTH cluster site daemon started"); + set_scheduler(); + set_oom_adj(-16); + + loop(SITE); + + unlink_lockfile(fd); + close_logging(); + + return 0; +} + +static int do_client(void) +{ + int rv = -1; + + setup_logging(); + + switch (cl.op) { + case OP_LIST: + rv = do_list(); + break; + + case OP_GRANT: + rv = do_grant(); + break; + + case OP_REVOKE: + rv = do_revoke(); + break; + } + + close_logging(); + + return rv; +} + +int main(int argc, char *argv[]) +{ + int rv; + + memset(&cl, 0, sizeof(cl)); + + rv = read_arguments(argc, argv); + if (rv < 0) + goto out; + + switch (cl.type) { + case ACT_ARBITRATOR: + rv = do_arbitrator(); + break; + + case ACT_SITE: + rv = do_site(); + break; + + case ACT_CLIENT: + rv = do_client(); + break; + } + +out: + return rv; +} diff --git a/src/pacemaker.c b/src/pacemaker.c new file mode 100644 index 0000000..9fd596a --- /dev/null +++ b/src/pacemaker.c @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include "log.h" +#include "pacemaker.h" + +#define COMMAND_MAX 256 + +static void pcmk_grant_ticket(const void *ticket) +{ + FILE *p; + char cmd[COMMAND_MAX]; + + snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -n true", (char *)ticket); + log_info("command: '%s' was executed", cmd); + p = popen(cmd, "r"); + if (p == NULL) { + log_error("popen error: %s", cmd); + return; + } + pclose(p); + + return; +} + +static void pcmk_revoke_ticket(const void *ticket) +{ + FILE *p; + char cmd[COMMAND_MAX]; + + snprintf(cmd, COMMAND_MAX, "crm_ticket -t %s -n false", (char *)ticket); + log_info("command: '%s' was executed", cmd); + p = popen(cmd, "r"); + if (p == NULL) { + log_error("popen error: %s", cmd); + return; + } + pclose(p); + + return; +} + +static void pcmk_store_ticket(const void *ticket, int owner, + unsigned long long expires) +{ + FILE *p; + char cmd[COMMAND_MAX]; + + snprintf(cmd, COMMAND_MAX, + "crm_attribute -t tickets -n owner-%s -v %d", + (char *)ticket, owner); + log_info("command: '%s' was executed", cmd); + p = popen(cmd, "r"); + if (p == NULL) { + log_error("popen error: %s", cmd); + return; + } + pclose(p); + + snprintf(cmd, COMMAND_MAX, + "crm_attribute -t tickets -n expires-%s -v %llu", + (char *)ticket, expires); + log_info("command: '%s' was executed", cmd); + p = popen(cmd, "r"); + if (p == NULL) { + log_error("popen error: %s", cmd); + return; + } + pclose(p); + + return; +} + +static void pcmk_load_ticket(const void *ticket, int *owner, + unsigned long long *expires) +{ + FILE *p; + char cmd[COMMAND_MAX]; + char line[256]; + int ow; + unsigned long long ex; + + snprintf(cmd, COMMAND_MAX, + "crm_attribute -t tickets -n owner-%s -G --quiet", + (char *)ticket); + log_info("command: '%s' was executed", cmd); + p = popen(cmd, "r"); + if (p == NULL) { + log_error("popen error: %s", cmd); + return; + } + if (fgets(line, sizeof(line) - 1, p) == NULL) { + pclose(p); + return; + } + if (sscanf(line, "%d", &ow) == 1) + *owner = ow; + pclose(p); + + snprintf(cmd, COMMAND_MAX, + "crm_attribute -t tickets -n expires-%s -G --quiet", + (char *)ticket); + log_info("command: '%s' was executed", cmd); + p = popen(cmd, "r"); + if (p == NULL) { + log_error("popen error: %s", cmd); + return; + } + if (fgets(line, sizeof(line) - 1, p) == NULL) { + pclose(p); + return; + } + if (sscanf(line, "%llu", &ex) == 1) + *expires = ex; + pclose(p); + + return; +} + +struct ticket_handler pcmk_handler = { + .grant_ticket = pcmk_grant_ticket, + .revoke_ticket = pcmk_revoke_ticket, + .store_ticket = pcmk_store_ticket, + .load_ticket = pcmk_load_ticket, +}; diff --git a/src/pacemaker.h b/src/pacemaker.h new file mode 100644 index 0000000..c3eb9d2 --- /dev/null +++ b/src/pacemaker.h @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _PACEMAKER_H +#define _PACEMAKER_H + +struct ticket_handler { + void (*grant_ticket) (const void *); + void (*revoke_ticket) (const void *); + void (*store_ticket) (const void *, int, unsigned long long); + void (*load_ticket) (const void *, int *, unsigned long long *); +}; + +struct ticket_handler pcmk_handler; + +#endif /* _PACEMAKER_H */ diff --git a/src/paxos.c b/src/paxos.c new file mode 100644 index 0000000..0d6199b --- /dev/null +++ b/src/paxos.c @@ -0,0 +1,718 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include "list.h" +#include "paxos.h" + +typedef enum { + INIT = 1, + PREPARING, + PROMISING, + PROPOSING, + ACCEPTING, + RECOVERY, +} paxos_state_t; + +struct proposal { + int ballot_number; + char value[0]; +}; + +struct learned { + int ballot; + int number; +}; + +struct paxos_msghdr { + paxos_state_t state; + int from; + char psname[PAXOS_NAME_LEN+1]; + char piname[PAXOS_NAME_LEN+1]; + int ballot_number; + int reject; + int proposer_id; + unsigned int extralen; + unsigned int valuelen; +}; + +struct proposer { + int state; + int open_number; + int accepted_number; + int count_me; + struct proposal *proposal; +}; + +struct acceptor { + int state; + int highest_promised; + struct proposal *accepted_proposal; +}; + +struct learner { + int state; + int learned_max; + int learned_ballot; + struct learned learned[0]; +}; + +struct paxos_space; +struct paxos_instance; + +struct proposer_operations { + void (*prepare) (struct paxos_instance *, + int *); + void (*propose) (struct paxos_space *, + struct paxos_instance *, + void *, int); + void (*commit) (struct paxos_space *, + struct paxos_instance *, + void *, int); +}; + +struct acceptor_operations { + void (*promise) (struct paxos_space *, + struct paxos_instance *, + void *, int); + void (*accepted) (struct paxos_space *, + struct paxos_instance *, + void *, int); +}; + +struct learner_operations { + void (*response) (struct paxos_space *, + struct paxos_instance *, + void *, int); +}; + + +struct paxos_space { + char name[PAXOS_NAME_LEN+1]; + unsigned int number; + unsigned int extralen; + unsigned int valuelen; + const unsigned char *role; + const struct paxos_operations *p_op; + const struct proposer_operations *r_op; + const struct acceptor_operations *a_op; + const struct learner_operations *l_op; + struct list_head list; + struct list_head pi_head; +}; + +struct paxos_instance { + char name[PAXOS_NAME_LEN+1]; + int round; + int *prio; + struct proposer *proposer; + struct acceptor *acceptor; + struct learner *learner; + void (*end) (pi_handle_t pih, int round, int result); + struct list_head list; + struct paxos_space *ps; +}; + +static LIST_HEAD(ps_head); + +static int have_quorum(struct paxos_space *ps, int member) +{ + int i, sum = 0; + + for (i = 0; i < ps->number; i++) { + if (ps->role[i] & ACCEPTOR) + sum++; + } + + if (member * 2 > sum) + return 1; + else + return 0; +} + +static int next_ballot_number(struct paxos_instance *pi) +{ + int ballot; + int myid = pi->ps->p_op->get_myid(); + + if (pi->prio) + ballot = pi->prio[myid]; + else + ballot = myid; + + while (ballot <= pi->round) + ballot += pi->ps->number; + + return ballot; +} + +static void proposer_prepare(struct paxos_instance *pi, int *round) +{ + struct paxos_msghdr *hdr; + void *msg; + int msglen = sizeof(struct paxos_msghdr) + pi->ps->extralen; + int ballot; + + msg = malloc(msglen); + if (!msg) { + *round = 0; + return; + } + memset(msg, 0, msglen); + hdr = msg; + + ballot = next_ballot_number(pi); + + hdr->state = htonl(PREPARING); + hdr->from = htonl(pi->ps->p_op->get_myid()); + hdr->proposer_id = hdr->from; + strcpy(hdr->psname, pi->ps->name); + strcpy(hdr->piname, pi->name); + hdr->ballot_number = htonl(ballot); + hdr->extralen = htonl(pi->ps->extralen); + + if (pi->ps->p_op->broadcast) + pi->ps->p_op->broadcast(msg, msglen); + else { + int i; + for (i = 0; i < pi->ps->number; i++) { + if (pi->ps->role[i] & ACCEPTOR) + pi->ps->p_op->send(i, msg, msglen); + } + } + + free(msg); + *round = ballot; +} + +static void proposer_propose(struct paxos_space *ps, + struct paxos_instance *pi, + void *msg, int msglen) +{ + struct paxos_msghdr *hdr; + pi_handle_t pih = (pi_handle_t)pi; + void *extra, *value, *message; + int ballot; + + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) + return; + hdr = msg; + ballot = ntohl(hdr->ballot_number); + + if (ntohl(hdr->reject)) { + pi->round = ballot; + pi->end(pih, pi->round, -EAGAIN); + return; + } + + extra = (char *)msg + sizeof(struct paxos_msghdr); + if (ps->p_op->prepare) { + if (ps->p_op->prepare(pih, extra)) + pi->proposer->open_number++; + } else + pi->proposer->open_number++; + + if (!have_quorum(ps, pi->proposer->open_number)) + return; + + if (ps->p_op->propose) + ps->p_op->propose(pih, extra, ballot, value); + + value = pi->proposer->proposal->value; + hdr->valuelen = htonl(ps->valuelen); + + message = malloc(msglen + ps->valuelen); + if (!message) + return; + memset(message, 0, msglen + ps->valuelen); + memcpy(message, msg, msglen); + memcpy((char *)message + msglen, value, ps->valuelen); + pi->acceptor->state = PROPOSING; + hdr = message; + hdr->from = htonl(ps->p_op->get_myid()); + hdr->state = htonl(PROPOSING); + + if (ps->p_op->broadcast) + ps->p_op->broadcast(message, msglen + ps->valuelen); + else { + int i; + for (i = 0; i < ps->number; i++) { + if (ps->role[i] & ACCEPTOR) + ps->p_op->send(i, message, + msglen + ps->valuelen); + } + } +} + +static void proposer_commit(struct paxos_space *ps, + struct paxos_instance *pi, + void *msg, int msglen) +{ + struct paxos_msghdr *hdr; + pi_handle_t pih = (pi_handle_t)pi; + void *extra; + + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) + return; + + extra = (char *)msg + sizeof(struct paxos_msghdr); + hdr = msg; + + pi->proposer->accepted_number++; + + if (!have_quorum(ps, pi->proposer->accepted_number)) + return; + + pi->round = ntohl(hdr->ballot_number); + if (ps->p_op->commit) + ps->p_op->commit(pih, extra, pi->round); + pi->end(pih, pi->round, 0); +} + +static void acceptor_promise(struct paxos_space *ps, + struct paxos_instance *pi, + void *msg, int msglen) +{ + struct paxos_msghdr *hdr; + unsigned long to; + pi_handle_t pih = (pi_handle_t)pi; + void *extra; + + if (pi->acceptor->state == RECOVERY) + return; + + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) + return; + hdr = msg; + + if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) { + to = hdr->from; + hdr->from = htonl(ps->p_op->get_myid()); + hdr->state = htonl(PROMISING); + hdr->reject = htonl(1); + ps->p_op->send(to, hdr, sizeof(struct paxos_msghdr)); + return; + } + pi->acceptor->highest_promised = ntohl(hdr->ballot_number); + + extra = (char *)msg + sizeof(struct paxos_msghdr); + if (ps->p_op->promise) + ps->p_op->promise(pih, extra); + + pi->acceptor->state = PROMISING; + to = hdr->from; + hdr->from = htonl(ps->p_op->get_myid()); + hdr->state = htonl(PROMISING); + ps->p_op->send(to, hdr, sizeof(struct paxos_msghdr)); +} + +static void acceptor_accepted(struct paxos_space *ps, + struct paxos_instance *pi, + void *msg, int msglen) +{ + struct paxos_msghdr *hdr; + unsigned long to; + pi_handle_t pih = (pi_handle_t)pi; + void *extra, *value; + int myid = ps->p_op->get_myid(); + int ballot; + + if (pi->acceptor->state == RECOVERY) + return; + + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + ps->valuelen) + return; + hdr = msg; + extra = (char *)msg + sizeof(struct paxos_msghdr); + ballot = ntohl(hdr->ballot_number); + + if (ballot < pi->acceptor->highest_promised) { + to = hdr->from; + hdr->from = htonl(myid); + hdr->state = htonl(ACCEPTING); + hdr->reject = htonl(1); + ps->p_op->send(to, hdr, sizeof(struct paxos_msghdr)); + return; + } + + value = pi->acceptor->accepted_proposal->value; + memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen, + ps->valuelen); + + if (ps->p_op->accepted) + ps->p_op->accepted(pih, extra, ballot, value); + + pi->acceptor->state = ACCEPTING; + to = hdr->from; + hdr->from = htonl(myid); + hdr->state = htonl(ACCEPTING); + + if (ps->p_op->broadcast) + ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr) + + ps->extralen); + else { + int i; + for (i = 0; i < ps->number; i++) { + if (ps->role[i] & LEARNER) + ps->p_op->send(i, msg, + sizeof(struct paxos_msghdr) + + ps->extralen); + } + if (!(ps->role[to] & LEARNER)) + ps->p_op->send(to, msg, sizeof(struct paxos_msghdr) + + ps->extralen); + } +} + +static void learner_response(struct paxos_space *ps, + struct paxos_instance *pi, + void *msg, int msglen) +{ + struct paxos_msghdr *hdr; + pi_handle_t pih = (pi_handle_t)pi; + void *extra; + int i, unused, found = 0; + int ballot; + + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) + return; + hdr = msg; + extra = (char *)msg + sizeof(struct paxos_msghdr); + ballot = ntohl(hdr->ballot_number); + + for (i = 0; i < ps->number; i++) { + if (!pi->learner->learned[i].ballot) { + unused = i; + break; + } + if (pi->learner->learned[i].ballot == ballot) { + pi->learner->learned[i].number++; + if (pi->learner->learned[i].number + > pi->learner->learned_max) + pi->learner->learned_max + = pi->learner->learned[i].number; + found = 1; + break; + } + } + if (!found) { + pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number); + pi->learner->learned[unused].number = 1; + } + + if (!have_quorum(ps, pi->learner->learned_max)) + return; + + if (ps->p_op->learned) + ps->p_op->learned(pih, extra, ballot); +} + +const struct proposer_operations generic_proposer_operations = { + .prepare = proposer_prepare, + .propose = proposer_propose, + .commit = proposer_commit, +}; + +const struct acceptor_operations generic_acceptor_operations = { + .promise = acceptor_promise, + .accepted = acceptor_accepted, +}; + +const struct learner_operations generic_learner_operations = { + .response = learner_response, +}; + +ps_handle_t paxos_space_init(const void *name, + unsigned int number, + unsigned int extralen, + unsigned int valuelen, + const unsigned char *role, + const struct paxos_operations *p_op) +{ + struct paxos_space *ps; + + list_for_each_entry(ps, &ps_head, list) { + if (!strcmp(ps->name, name)) + return -EEXIST; + } + + if (!number || !valuelen || !p_op || !p_op->get_myid || !p_op->send) + return -EINVAL; + + ps = malloc(sizeof(struct paxos_space)); + if (!ps) + return -ENOMEM; + memset(ps, 0, sizeof(struct paxos_space)); + + strncpy(ps->name, name, PAXOS_NAME_LEN + 1); + ps->number = number; + ps->extralen = extralen; + ps->valuelen = valuelen; + ps->role = role; + ps->p_op = p_op; + ps->r_op = &generic_proposer_operations; + ps->a_op = &generic_acceptor_operations; + ps->l_op = &generic_learner_operations; + + list_add_tail(&ps->list, &ps_head); + INIT_LIST_HEAD(&ps->pi_head); + + return (ps_handle_t)ps; +} + +pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) +{ + struct paxos_space *ps = (struct paxos_space *)handle; + struct paxos_instance *pi; + struct proposer *proposer; + struct acceptor *acceptor; + struct learner *learner; + int myid, valuelen, rv; + + list_for_each_entry(pi, &ps->pi_head, list) { + if (!strcmp(pi->name, name)) + return (pi_handle_t)pi; + } + + if (handle <= 0 || !ps->p_op || !ps->p_op->get_myid) { + rv = -EINVAL; + goto out; + } + myid = ps->p_op->get_myid(); + valuelen = ps->valuelen; + + pi = malloc(sizeof(struct paxos_instance)); + if (!pi) { + rv = -ENOMEM; + goto out; + } + memset(pi, 0, sizeof(struct paxos_instance)); + strncpy(pi->name, name, PAXOS_NAME_LEN + 1); + + if (prio) { + pi->prio = malloc(ps->number * sizeof(int)); + if (!pi->prio) { + rv = -ENOMEM; + goto out_pi; + } + memcpy(pi->prio, prio, ps->number * sizeof(int)); + } + + if (ps->role[myid] & PROPOSER) { + proposer = malloc(sizeof(struct proposer)); + if (!proposer) { + rv = -ENOMEM; + goto out_prio; + } + memset(proposer, 0, sizeof(struct proposer)); + proposer->state = INIT; + + proposer->proposal = malloc(sizeof(struct proposal) + valuelen); + if (!proposer->proposal) { + rv = -ENOMEM; + goto out_proposer; + } + memset(proposer->proposal, 0, + sizeof(struct proposal) + valuelen); + pi->proposer = proposer; + } + + if (ps->role[myid] & ACCEPTOR) { + acceptor = malloc(sizeof(struct acceptor)); + if (!acceptor) { + rv = -ENOMEM; + goto out_proposal; + } + memset(acceptor, 0, sizeof(struct acceptor)); + acceptor->state = INIT; + + acceptor->accepted_proposal = malloc(sizeof(struct proposal) + + valuelen); + if (!acceptor->accepted_proposal) { + rv = -ENOMEM; + goto out_acceptor; + } + memset(acceptor->accepted_proposal, 0, + sizeof(struct proposal) + valuelen); + pi->acceptor = acceptor; + + if (ps->p_op->catchup) { + pi->acceptor->state = RECOVERY; + ps->p_op->catchup(name); + pi->acceptor->state = INIT; + } + } + + if (ps->role[myid] & LEARNER) { + learner = malloc(sizeof(struct learner) + + ps->number * sizeof(struct learned)); + if (!learner) { + rv = -ENOMEM; + goto out_accepted_proposal; + } + memset(learner, 0, + sizeof(struct learner) + + ps->number * sizeof(struct learned)); + learner->state = INIT; + pi->learner = learner; + } + + pi->ps = ps; + list_add_tail(&pi->list, &ps->pi_head); + + return (pi_handle_t)pi; + +out_accepted_proposal: + if (ps->role[myid] & ACCEPTOR) + free(acceptor->accepted_proposal); +out_acceptor: + if (ps->role[myid] & ACCEPTOR) + free(acceptor); +out_proposal: + if (ps->role[myid] & PROPOSER) + free(proposer->proposal); +out_proposer: + if (ps->role[myid] & PROPOSER) + free(proposer); +out_prio: + if (pi->prio) + free(pi->prio); +out_pi: + free(pi); +out: + return rv; +} + +int paxos_round_request(pi_handle_t handle, + void *value, + void (*end_request) (pi_handle_t handle, + int round, + int result)) +{ + struct paxos_instance *pi = (struct paxos_instance *)handle; + int myid = pi->ps->p_op->get_myid(); + int round; + + if (!(pi->ps->role[myid] & PROPOSER)) + return -EOPNOTSUPP; + + pi->proposer->state = PREPARING; + memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen); + + pi->end = end_request; + pi->ps->r_op->prepare(pi, &round); + + return round; +} + +int paxos_recovery_status_get(pi_handle_t handle) +{ + struct paxos_instance *pi = (struct paxos_instance *)handle; + int myid = pi->ps->p_op->get_myid(); + + if (!(pi->ps->role[myid] & ACCEPTOR)) + return -EOPNOTSUPP; + + if (pi->acceptor->state == RECOVERY) + return 1; + else + return 0; +} + +int paxos_recovery_status_set(pi_handle_t handle, int recovery) +{ + struct paxos_instance *pi = (struct paxos_instance *)handle; + int myid = pi->ps->p_op->get_myid(); + + if (!(pi->ps->role[myid] & ACCEPTOR)) + return -EOPNOTSUPP; + + if (recovery) + pi->acceptor->state = RECOVERY; + else + pi->acceptor->state = INIT; + + return 0; +} + +int paxos_propose(pi_handle_t handle, void *value, int round) +{ + struct paxos_instance *pi = (struct paxos_instance *)handle; + + strcpy(pi->proposer->proposal->value, value); + pi->round = round; + + pi->ps->r_op->propose(pi->ps, pi, NULL, 0); + + return 0; +} + +int paxos_recvmsg(void *msg, int msglen) +{ + struct paxos_msghdr *hdr = msg; + struct paxos_space *ps; + struct paxos_instance *pi; + int found = 0; + int myid = ps->p_op->get_myid(); + + list_for_each_entry(ps, &ps_head, list) { + if (!strcmp(ps->name, hdr->psname)) { + found = 1; + break; + } + } + if (!found) + return -EINVAL; + + found = 0; + list_for_each_entry(pi, &ps->pi_head, list) { + if (!strcmp(pi->name, hdr->piname)) { + found = 1; + break; + } + } + if (!found) + paxos_instance_init((ps_handle_t)ps, hdr->piname, NULL); + + switch (ntohl(hdr->state)) { + case PREPARING: + if (ps->role[myid] & ACCEPTOR) + ps->a_op->promise(ps, pi, msg, msglen); + break; + case PROMISING: + ps->r_op->propose(ps, pi, msg, msglen); + break; + case PROPOSING: + if (ps->role[myid] & ACCEPTOR) + ps->a_op->accepted(ps, pi, msg, msglen); + break; + case ACCEPTING: + if (ntohl(hdr->proposer_id) == myid) + ps->r_op->commit(ps, pi, msg, msglen); + else if (ps->role[myid] & LEARNER) + ps->l_op->response(ps, pi, msg, msglen); + break; + default: + break; + }; + + return 0; +} diff --git a/src/paxos.h b/src/paxos.h new file mode 100644 index 0000000..0079759 --- /dev/null +++ b/src/paxos.h @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _PAXOS_H +#define _PAXOS_H + +#define PAXOS_NAME_LEN 63 + +#define PROPOSER 0x4 +#define ACCEPTOR 0x2 +#define LEARNER 0x1 + +typedef int ps_handle_t; +typedef int pi_handle_t; + +struct paxos_operations { + int (*get_myid) (void); + int (*send) (unsigned long id, void *value, int len); + int (*broadcast) (void *value, int len); + int (*catchup) (const void *name); + int (*prepare) (pi_handle_t handle, void *extra); + int (*promise) (pi_handle_t handle, void *extra); + int (*propose) (pi_handle_t handle, void *extra, + int round, void *value); + int (*accepted) (pi_handle_t handle, void *extra, + int round, void *value); + int (*commit) (pi_handle_t handle, void *extra, int round); + int (*learned) (pi_handle_t handle, void *extra, int round); +}; + +int paxos_recvmsg(void *msg, int msglen); + +ps_handle_t paxos_space_init(const void *name, + unsigned int number, + unsigned int extralen, + unsigned int valuelen, + const unsigned char *role, + const struct paxos_operations *p_op); + +pi_handle_t paxos_instance_init(ps_handle_t handle, + const void *name, + int *prio); + +int paxos_round_request(pi_handle_t handle, + void *value, + void (*end_request) (pi_handle_t handle, + int round, + int result)); + +int paxos_round_discard(pi_handle_t handle, int round); + +int paxos_leader_get(pi_handle_t handle, int *round); + +int paxos_recovery_status_get(pi_handle_t handle); + +int paxos_recovery_status_set(pi_handle_t handle, int recovery); + +int paxos_propose(pi_handle_t handle, void *value, int round); + +int paxos_instance_exit(pi_handle_t handle); + +int paxos_space_exit(ps_handle_t handle); + +#endif /* _PAXOS_H */ diff --git a/src/paxos_lease.c b/src/paxos_lease.c new file mode 100644 index 0000000..a8af90f --- /dev/null +++ b/src/paxos_lease.c @@ -0,0 +1,441 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include "paxos.h" +#include "paxos_lease.h" +#include "transport.h" +#include "config.h" +#include "timer.h" +#include "list.h" + +#define PAXOS_LEASE_SPACE "paxoslease" +#define PLEASE_VALUE_LEN 1024 + +struct paxos_lease_msghdr { + int leased; +}; + +struct paxos_lease_value { + char name[PAXOS_NAME_LEN+1]; + int owner; + int expiry; +}; + +struct lease_state { + int round; + struct paxos_lease_value *plv; + unsigned long long expires; + struct timerlist *timer; +}; + +struct paxos_lease { + char name[PAXOS_NAME_LEN+1]; + pi_handle_t pih; + struct lease_state proposer; + struct lease_state acceptor; + int owner; + int expiry; + int relet; + int failover; + unsigned long long expires; + void (*end_lease) (pi_handle_t, int); + struct timerlist *timer; + struct list_head list; +}; + +static LIST_HEAD(lease_head); + +static int myid = -1; +static struct paxos_operations *px_op = NULL; +const struct paxos_lease_operations *p_l_op; +ps_handle_t ps_handle = 0; + +static void end_paxos_request(pi_handle_t handle, int round, int result) +{ + struct paxos_lease *pl; + int found = 0; + + list_for_each_entry(pl, &lease_head, list) { + if (pl->pih == handle) { + found = 1; + break; + } + } + if (!found) + return; + + if (round != pl->proposer.round) + return; + + pl->end_lease((pl_handle_t)pl, result); + + return; +} + +static void lease_expires(unsigned long data) +{ + struct paxos_lease *pl = (struct paxos_lease *)data; + pl_handle_t plh = (pl_handle_t)pl; + struct paxos_lease_result plr; + + if (pl->owner != myid) { + pl->owner = -1; + + strcpy(plr.name, pl->name); + plr.owner = -1; + plr.expires = 0; + p_l_op->notify(plh, &plr); + + if (pl->proposer.timer) + del_timer(pl->proposer.timer); + if (pl->acceptor.timer) + del_timer(pl->acceptor.timer); + + if (pl->failover) + paxos_lease_acquire(plh, 1, NULL); + } else if (pl->owner == myid && pl->relet) { + struct paxos_lease_value value; + strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); + value.owner = myid; + value.expiry = pl->expiry; + paxos_propose(pl->pih, &value, pl->proposer.round); + } else { + pl->owner = -1; + + strcpy(plr.name, pl->name); + plr.owner = -1; + plr.expires = 0; + p_l_op->notify(plh, &plr); + + if (pl->proposer.timer) + del_timer(pl->proposer.timer); + if (pl->acceptor.timer) + del_timer(pl->acceptor.timer); + } +} + +int paxos_lease_acquire(pl_handle_t handle, + int relet, + void (*end_acquire) (pl_handle_t handle, int result)) +{ + struct paxos_lease *pl = (struct paxos_lease *)handle; + struct paxos_lease_value value; + int round; + + strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); + value.owner = myid; + value.expiry = pl->expiry; + pl->relet = relet; + pl->end_lease = end_acquire; + + round = paxos_round_request(pl->pih, &value, end_paxos_request); + if (round <= 0) + return -1; + + pl->proposer.round = round; + return 0; +} + +static int lease_catchup(const void *name) +{ + struct paxos_lease *pl; + int found = 0; + + list_for_each_entry(pl, &lease_head, list) { + if (!strcmp(pl->name, name)) { + found = 1; + break; + } + } + if (!found) + return -1; + + p_l_op->catchup(name, &pl->owner, &pl->expires); + + return 0; +} + +static int lease_prepared(pi_handle_t handle __attribute__((unused)), + void *header) +{ + struct paxos_lease_msghdr *hdr = header; + + if (hdr->leased) + return 0; + else + return 1; +} + +static int handle_lease_request(pi_handle_t handle, void *header) +{ + struct paxos_lease_msghdr *hdr; + struct paxos_lease *pl; + int found = 0; + + hdr = header; + + list_for_each_entry(pl, &lease_head, list) { + if (pl->pih == handle) { + found = 1; + break; + } + } + if (!found) + return -1; + + if (pl->owner == -1) + hdr->leased = 0; + else + hdr->leased = 1; + + return 0; +} + +static int lease_propose(pi_handle_t handle, + void *extra __attribute__((unused)), + int round, void *value) +{ + struct paxos_lease *pl; + int found = 0; + + list_for_each_entry(pl, &lease_head, list) { + if (pl->pih == handle) { + found = 1; + break; + } + } + if (!found) + return -1; + + if (round != pl->proposer.round) + return -1; + + if (!pl->proposer.plv) { + pl->proposer.plv = malloc(sizeof(struct paxos_lease_value)); + if (!pl->proposer.plv) + return -ENOMEM; + } + memcpy(pl->proposer.plv, value, sizeof(struct paxos_lease_value)); + + if (pl->relet) { + pl->proposer.timer = add_timer(4 * pl->expiry / 5, + (unsigned long)pl, + lease_expires); + pl->proposer.expires = current_time() + 4 * pl->expiry / 5; + } else { + pl->proposer.timer = add_timer(pl->expiry, (unsigned long)pl, + lease_expires); + pl->proposer.expires = current_time() + pl->expiry; + } + + return 0; +} + +static int lease_accpeted(pi_handle_t handle, + void *extra __attribute__((unused)), + int round, void *value) +{ + struct paxos_lease *pl; + int found = 0; + + list_for_each_entry(pl, &lease_head, list) { + if (pl->pih == handle) { + found = 1; + break; + } + } + if (!found) + return -1; + + pl->acceptor.round = round; + if (!pl->acceptor.plv) { + pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); + if (!pl->acceptor.plv) + return -ENOMEM; + } + memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); + + pl->acceptor.timer = add_timer(pl->expiry, (unsigned long)pl, + lease_expires); + pl->acceptor.expires = current_time() + pl->expiry; + + return 0; +} + +static int lease_commit(pi_handle_t handle, + void *extra __attribute__((unused)), + int round) +{ + struct paxos_lease *pl; + pl_handle_t plh = (pl_handle_t)pl; + struct paxos_lease_result plr; + int found = 0; + + list_for_each_entry(pl, &lease_head, list) { + if (pl->pih == handle) { + found = 1; + break; + } + } + if (!found) + return -1; + + if (round != pl->proposer.round) + return -1; + + pl->owner = pl->proposer.plv->owner; + pl->expiry = pl->proposer.plv->expiry; + + strcpy(plr.name, pl->proposer.plv->name); + plr.owner = pl->proposer.plv->owner; + plr.expires = current_time() + pl->proposer.plv->expiry; + + p_l_op->notify(plh, &plr); + + return 0; +} + +static int lease_learned(pi_handle_t handle, + void *extra __attribute__((unused)), + int round) +{ + struct paxos_lease *pl; + pl_handle_t plh = (pl_handle_t)pl; + struct paxos_lease_result plr; + int found = 0; + + list_for_each_entry(pl, &lease_head, list) { + if (pl->pih == handle) { + found = 1; + break; + } + } + if (!found) + return -1; + + if (round != pl->acceptor.round) + return -1; + + pl->owner = pl->acceptor.plv->owner; + pl->expiry = pl->acceptor.plv->expiry; + + strcpy(plr.name, pl->acceptor.plv->name); + plr.owner = pl->acceptor.plv->owner; + plr.expires = current_time() + pl->acceptor.plv->expiry; + + p_l_op->notify(plh, &plr); + + return 0; +} + +pl_handle_t paxos_lease_init(const void *name, + unsigned int namelen, + int expiry, + int number, + int failover, + unsigned char *role, + int *prio, + const struct paxos_lease_operations *pl_op) +{ + ps_handle_t psh; + pi_handle_t pih; + struct paxos_lease *lease; + + if (namelen > PAXOS_NAME_LEN) + return -EINVAL; + + if (myid == -1) + myid = pl_op->get_myid(); + + if (!ps_handle) { + px_op = malloc(sizeof(struct paxos_operations)); + if (!px_op) + return -ENOMEM; + memset(px_op, 0, sizeof(struct paxos_operations)); + px_op->get_myid = pl_op->get_myid; + px_op->send = pl_op->send; + px_op->broadcast = pl_op->broadcast; + px_op->catchup = lease_catchup; + px_op->prepare = lease_prepared; + px_op->promise = handle_lease_request; + px_op->propose = lease_propose; + px_op->accepted = lease_accpeted; + px_op->commit = lease_commit; + px_op->learned = lease_learned; + p_l_op = pl_op; + + psh = paxos_space_init(PAXOS_LEASE_SPACE, + number, + sizeof(struct paxos_lease_msghdr), + PLEASE_VALUE_LEN, + role, + px_op); + if (psh <= 0) { + free(px_op); + px_op = NULL; + return psh; + } + ps_handle = psh; + } + + lease = malloc(sizeof(struct paxos_lease)); + if (!lease) + return -ENOMEM; + memset(lease, 0, sizeof(struct paxos_lease)); + strncpy(lease->name, name, PAXOS_NAME_LEN + 1); + lease->owner = -1; + lease->expiry = expiry; + lease->failover = failover; + list_add_tail(&lease->list, &lease_head); + + pih = paxos_instance_init(ps_handle, name, prio); + if (pih <= 0) { + free(lease); + return pih; + } + lease->pih = pih; + + return (pl_handle_t)lease; +} + +int paxos_lease_on_receive(void *msg, int msglen) +{ + return paxos_recvmsg(msg, msglen); +} + +int paxos_lease_exit(pl_handle_t handle) +{ + struct paxos_lease *pl = (struct paxos_lease *)handle; + + if (px_op) + free(px_op); + + if (pl->proposer.plv) + free(pl->proposer.plv); + if (pl->proposer.timer) + del_timer(pl->proposer.timer); + if (pl->acceptor.plv) + free(pl->acceptor.plv); + if (pl->acceptor.timer) + del_timer(pl->acceptor.timer); + + return 0; +} diff --git a/src/paxos_lease.h b/src/paxos_lease.h new file mode 100644 index 0000000..498070c --- /dev/null +++ b/src/paxos_lease.h @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _PAXOS_LEASE_H +#define _PAXOS_LEASE_H + +#define PLEASE_NAME_LEN 63 + +typedef int pl_handle_t; + +struct paxos_lease_result { + char name[PLEASE_NAME_LEN+1]; + int owner; + unsigned long long expires; +}; + +struct paxos_lease_operations { + int (*get_myid) (void); + int (*send) (unsigned long id, void *value, int len); + int (*broadcast) (void *value, int len); + int (*catchup) (const void *name, int *owner, + unsigned long long *expires); + int (*notify) (pl_handle_t handle, struct paxos_lease_result *result); +}; + +pl_handle_t paxos_lease_init(const void *name, + unsigned int namelen, + int expiry, + int number, + int failover, + unsigned char *role, + int *prio, + const struct paxos_lease_operations *pl_op); + +int paxos_lease_on_receive(void *msg, int msglen); + +int paxos_lease_acquire(pl_handle_t handle, + int relet, + void (*end_acquire) (pl_handle_t handle, int result)); +/* +int paxos_lease_owner_get(const void *name); + +int paxos_lease_epoch_get(const void *name); + +int paxos_lease_timeout(const void *name); +*/ +int paxos_lease_release(pl_handle_t handle); + +int paxos_lease_exit(pl_handle_t handle); + +#endif /* _PAXOS_LEASE_H */ diff --git a/src/ticket.c b/src/ticket.c new file mode 100644 index 0000000..ff82f73 --- /dev/null +++ b/src/ticket.c @@ -0,0 +1,340 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include "ticket.h" +#include "config.h" +#include "pacemaker.h" +#include "list.h" +#include "log.h" +#include "paxos_lease.h" +#include "paxos.h" + +#define PAXOS_MAGIC 0xDB12 + +struct booth_msghdr { + uint16_t magic; + uint16_t checksum; + uint32_t len; +} __attribute__((packed)); + +struct ticket { + char id[BOOTH_NAME_LEN+1]; + pl_handle_t handle; + int owner; + int expiry; + unsigned long long expires; + struct list_head list; +}; + +static LIST_HEAD(ticket_list); + +static unsigned char *role; + +int check_ticket(char *ticket) +{ + int i; + + if (!booth_conf) + return 0; + + for (i = 0; i < booth_conf->ticket_count; i++) { + if (!strcmp(booth_conf->ticket[i].name, ticket)) + return 1; + } + + return 0; +} + +int check_site(char *site, int *local) +{ + int i; + + if (!booth_conf) + return 0; + + for (i = 0; i < booth_conf->node_count; i++) { + if (booth_conf->node[i].type == SITE + && !strcmp(booth_conf->node[i].addr, site)) { + *local = booth_conf->node[i].local; + return 1; + } + } + + return 0; +} + +static int * ticket_priority(int i) +{ + int j; + + /* TODO: need more precise check */ + for (j = 0; j < booth_conf->node_count; j++) { + if (booth_conf->ticket[i].weight[j] == 0) + return NULL; + } + return booth_conf->ticket[i].weight; +} + +static int ticket_get_myid(void) +{ + return booth_transport[booth_conf->proto].get_myid(); +} + +static void end_acquire(pl_handle_t handle, int result) +{ + struct ticket *tk; + int found = 0; + + if (result == 0) { + list_for_each_entry(tk, &ticket_list, list) { + if (tk->handle == handle) { + tk->owner = ticket_get_myid(); + found = 1; + break; + } + } + if (!found) + log_error("BUG: ticket handle %d does not exist", + handle); + log_info("ticket %s acquired", tk->id); + log_info("ticket %s granted to local (id %d)", tk->id, + ticket_get_myid()); + } +} + +static int ticket_send(unsigned long id, void *value, int len) +{ + int i, rv = -1; + struct booth_node *to = NULL; + struct booth_msghdr *hdr; + void *buf; + + for (i = 0; i < booth_conf->node_count; i++) { + if (booth_conf->node[i].nodeid == id) + to = &booth_conf->node[i]; + } + if (!to) + return rv; + + buf = malloc(sizeof(struct booth_msghdr) + len); + if (!buf) + return -ENOMEM; + memset(buf, 0, sizeof(struct booth_msghdr) + len); + hdr = buf; + hdr->magic = htons(PAXOS_MAGIC); + hdr->len = htonl(sizeof(struct booth_msghdr) + len); + memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); + + rv = booth_transport[booth_conf->proto].send( + (unsigned long)to, buf, hdr->len); + + free(buf); + return rv; +} + +static int ticket_broadcast(void *value, int len) +{ + void *buf; + struct booth_msghdr *hdr; + int rv; + + buf = malloc(sizeof(struct booth_msghdr) + len); + if (!buf) + return -ENOMEM; + memset(buf, 0, sizeof(struct booth_msghdr) + len); + hdr = buf; + hdr->magic = htons(PAXOS_MAGIC); + hdr->len = htonl(sizeof(struct booth_msghdr) + len); + memcpy((char *)buf + sizeof(struct booth_msghdr), value, len); + + rv = booth_transport[booth_conf->proto].broadcast(buf, hdr->len); + + free(buf); + return rv; +} + +static int ticket_read(const void *name, int *owner, + unsigned long long *expires) +{ + struct ticket *tk; + int found = 0; + + list_for_each_entry(tk, &ticket_list, list) { + if (!strcmp(tk->id, name)) { + found = 1; + break; + } + } + if (!found) { + log_error("BUG: ticket_read failed (ticket %s does not exist)", + (char *)name); + return -1; + } + + pcmk_handler.load_ticket(tk->id, &tk->owner, &tk->expires); + *owner = tk->owner; + *expires = tk->expires; + + return 0; +} + +static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result) +{ + struct ticket *tk; + int found = 0; + + list_for_each_entry(tk, &ticket_list, list) { + if (tk->handle == handle) { + found = 1; + break; + } + } + if (!found) { + log_error("BUG: ticket_write failed " + "(ticket handle %d does not exist)", handle); + return -1; + } + + tk->owner = result->owner; + tk->expires = result->expires; + + if (tk->owner != ticket_get_myid()) + pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires); + else { + pcmk_handler.store_ticket(tk->id, tk->owner, tk->expires); + pcmk_handler.grant_ticket(tk->id); + } + + return 0; +} + +int ticket_recv(void *msg, int msglen) +{ + struct booth_msghdr *hdr; + char *data; + + hdr = msg; + if (ntohs(hdr->magic) != PAXOS_MAGIC || + ntohl(hdr->len) != msglen) { + log_error("message received error"); + return -1; + } + data = (char *)msg + sizeof(struct booth_msghdr); + + return paxos_lease_on_receive(data, + msglen - sizeof(struct booth_msghdr)); +} + +int grant_ticket(char *ticket, int force, int expiry) +{ + struct ticket *tk; + int found = 0; + + if (force) { + pcmk_handler.store_ticket(ticket, ticket_get_myid(), -1); + pcmk_handler.grant_ticket(ticket); + return BOOTHC_RLT_SYNC_SUCC; + } + if (!expiry) + expiry = DEFAULT_TICKET_EXPIRY; + + list_for_each_entry(tk, &ticket_list, list) { + if (!strcmp(tk->id, ticket)) { + found = 1; + break; + } + } + if (!found) { + log_error("ticket %s does not exist", ticket); + return BOOTHC_RLT_SYNC_FAIL; + } + + if (tk->owner == ticket_get_myid()) + return BOOTHC_RLT_SYNC_SUCC; + else { + paxos_lease_acquire(tk->handle, 1, end_acquire); + return BOOTHC_RLT_ASYNC; + } +} + +const struct paxos_lease_operations ticket_operations = { + .get_myid = ticket_get_myid, + .send = ticket_send, + .broadcast = ticket_broadcast, + .catchup = ticket_read, + .notify = ticket_write, +}; + +int setup_ticket(void) +{ + struct ticket *tk, *tmp; + int i, rv; + pl_handle_t plh; + + role = malloc(booth_conf->node_count * sizeof(unsigned char)); + if (!role) + return -ENOMEM; + memset(role, 0, booth_conf->node_count * sizeof(unsigned char)); + for (i = 0; i < booth_conf->node_count; i++) { + if (booth_conf->node[i].type == SITE) + role[i] = PROPOSER | ACCEPTOR | LEARNER; + else if (booth_conf->node[i].type == ARBITRATOR) + role[i] = ACCEPTOR; + } + + for (i = 0; i < booth_conf->ticket_count; i++) { + tk = malloc(sizeof(struct ticket)); + if (!tk) { + rv = -ENOMEM; + goto out; + } + memset(tk, 0, sizeof(struct ticket)); + strcpy(tk->id, booth_conf->ticket[i].name); + tk->expiry = booth_conf->ticket[i].expiry; + list_add_tail(&tk->list, &ticket_list); + + plh = paxos_lease_init(tk->id, + BOOTH_NAME_LEN, + tk->expiry, + booth_conf->node_count, + 1, + role, + ticket_priority(i), + &ticket_operations); + if (plh <= 0) { + log_error("paxos lease initialization failed"); + rv = plh; + goto out; + } + tk->handle = plh; + } + + return 0; + +out: + list_for_each_entry_safe(tk, tmp, &ticket_list, list) { + list_del(&tk->list); + } + free(role); + + return rv; +} diff --git a/src/ticket.h b/src/ticket.h new file mode 100644 index 0000000..12d1042 --- /dev/null +++ b/src/ticket.h @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _TICKET_H +#define _TICKET_H + +#define DEFAULT_TICKET_EXPIRY 600 + +int check_ticket(char *ticket); +int check_site(char *site, int *local); +int grant_ticket(char *ticket, int force, int expiry); +int ticket_recv(void *msg, int msglen); +int setup_ticket(void); + +#endif /* _TICKET_H */ diff --git a/src/timer.c b/src/timer.c new file mode 100644 index 0000000..0bc82c7 --- /dev/null +++ b/src/timer.c @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include "log.h" +#include "timer.h" + +#define USEC_IN_SEC 1000000 +#define MSEC_IN_SEC 1000 + +extern int poll_timeout; +static LIST_HEAD(timer_head); + +unsigned long long current_time(void) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + return (tv.tv_sec * USEC_IN_SEC + tv.tv_usec); +} + +struct timerlist * add_timer(unsigned long expires, + unsigned long data, + void (*function) (unsigned long data)) +{ + struct timerlist *timer; + + timer = malloc(sizeof(struct timerlist)); + if (!timer) { + log_error("failed to alloc mem for timer"); + return NULL; + } + memset(timer, 0, sizeof(struct timerlist)); + + timer->expires = current_time() + expires * USEC_IN_SEC; + timer->data = data; + timer->function = function; + list_add_tail(&timer->entry, &timer_head); + + return timer; +} + +int mod_timer(struct timerlist *timer, unsigned long expires) +{ + timer->expires = current_time() + expires * USEC_IN_SEC; + + return 0; +} + +int del_timer(struct timerlist *timer) +{ + list_del(&timer->entry); + free(timer); + + return 0; +} + +void process_timerlist(void) +{ + struct timerlist *timer; + + if (list_empty(&timer_head)) + return; + + list_for_each_entry(timer, &timer_head, entry) { + if (current_time() >= timer->expires) { + timer->function(timer->data); + timer->expires = -1; + } + } +} + +int timerlist_init(void) +{ + poll_timeout = MSEC_IN_SEC; + return 0; +} + +void timerlist_exit(void) +{ + struct timerlist *timer, *safe; + + list_for_each_entry_safe(timer, safe, &timer_head, entry) { + list_del(&timer->entry); + free(timer); + } +} diff --git a/src/timer.h b/src/timer.h new file mode 100644 index 0000000..31cb009 --- /dev/null +++ b/src/timer.h @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _TIMER_H +#define _TIMER_H + +#include "list.h" + +struct timerlist { + struct list_head entry; + unsigned long long expires; + void (*function) (unsigned long); + unsigned long data; +}; + +int timerlist_init(void); +struct timerlist * add_timer(unsigned long expires, + unsigned long data, + void (*function) (unsigned long data)); +int mod_timer(struct timerlist *timer, unsigned long expires); +int del_timer(struct timerlist *timer); +void timerlist_exit(void); +void process_timerlist(void); +unsigned long long current_time(void); + +#endif /* _TIMER_H */ diff --git a/src/transport.c b/src/transport.c new file mode 100644 index 0000000..43f3101 --- /dev/null +++ b/src/transport.c @@ -0,0 +1,592 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "list.h" +#include "booth.h" +#include "log.h" +#include "config.h" +#include "paxos_lease.h" +#include "transport.h" + +#define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) + +#define NETLINK_BUFSIZE 16384 +#define SOCKET_BUFFER_SIZE 160000 +#define FRAME_SIZE_MAX 10000 + +extern struct client *client; +extern struct pollfd *pollfd; + +static struct booth_node local; + +struct tcp_conn { + int s; + struct sockaddr to; + struct list_head list; +}; + +static LIST_HEAD(tcp); + +struct udp_context { + int s; + struct iovec iov_recv; + char iov_buffer[FRAME_SIZE_MAX]; +} udp; + +static int (*deliver_fn) (void *msg, int msglen); + +static int ipaddr_to_sockaddr(struct booth_node *ipaddr, + uint16_t port, + struct sockaddr_storage *saddr, + int *addrlen) +{ + int rv = -1; + + if (ipaddr->family == AF_INET) { + struct in_addr addr; + struct sockaddr_in *sin = (struct sockaddr_in *)saddr; + memset(sin, 0, sizeof(struct sockaddr_in)); + sin->sin_family = ipaddr->family; + sin->sin_port = htons(port); + inet_pton(AF_INET, ipaddr->addr, &addr); + memcpy(&sin->sin_addr, &addr, sizeof(struct in_addr)); + *addrlen = sizeof(struct sockaddr_in); + rv = 0; + } + + if (ipaddr->family == AF_INET6) { + struct in6_addr addr; + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)saddr; + memset(sin, 0, sizeof(struct sockaddr_in6)); + sin->sin6_family = ipaddr->family; + sin->sin6_port = htons(port); + sin->sin6_scope_id = 2; + inet_pton(AF_INET6, ipaddr->addr, &addr); + memcpy(&sin->sin6_addr, &addr, sizeof(struct in6_addr)); + *addrlen = sizeof(struct sockaddr_in6); + rv = 0; + } + + return rv; +} + +static void parse_rtattr(struct rtattr *tb[], + int max, struct rtattr *rta, int len) +{ + while (RTA_OK(rta, len)) { + if (rta->rta_type <= max) + tb[rta->rta_type] = rta; + rta = RTA_NEXT(rta,len); + } +} + +static int find_myself(struct booth_node *node) +{ + int fd, addrlen, found = 0; + struct sockaddr_nl nladdr; + unsigned char ndaddr[BOOTH_IPADDR_LEN]; + unsigned char ipaddr[BOOTH_IPADDR_LEN]; + static char rcvbuf[NETLINK_BUFSIZE]; + struct { + struct nlmsghdr nlh; + struct rtgenmsg g; + } req; + + memset(ipaddr, 0, BOOTH_IPADDR_LEN); + memset(ndaddr, 0, BOOTH_IPADDR_LEN); + if (node->family == AF_INET) { + inet_pton(AF_INET, node->addr, ndaddr); + addrlen = sizeof(struct in_addr); + } else if (node->family == AF_INET6) { + inet_pton(AF_INET6, node->addr, ndaddr); + addrlen = sizeof(struct in6_addr); + } else { + log_error("invalid INET family"); + return 0; + } + + fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); + if (fd < 0) { + log_error("failed to create netlink socket"); + return 0; + } + + setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + + memset(&req, 0, sizeof(req)); + req.nlh.nlmsg_len = sizeof(req); + req.nlh.nlmsg_type = RTM_GETADDR; + req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; + req.nlh.nlmsg_pid = 0; + req.nlh.nlmsg_seq = 1; + req.g.rtgen_family = AF_INET; + + if (sendto(fd, (void *)&req, sizeof(req), 0, + (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { + close(fd); + log_error("failed to send data to netlink socket"); + return 0; + } + + while (1) { + int status; + struct nlmsghdr *h; + struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; + struct msghdr msg = { + (void *)&nladdr, sizeof(nladdr), + &iov, 1, + NULL, 0, + 0 + }; + + status = recvmsg(fd, &msg, 0); + if (!status) { + close(fd); + log_error("failed to recvmsg from netlink socket"); + return 0; + } + + h = (struct nlmsghdr *)rcvbuf; + if (h->nlmsg_type == NLMSG_DONE) + break; + + if (h->nlmsg_type == NLMSG_ERROR) { + close(fd); + log_error("netlink socket recvmsg error"); + return 0; + } + + while (NLMSG_OK(h, status)) { + if (h->nlmsg_type == RTM_NEWADDR) { + struct ifaddrmsg *ifa = NLMSG_DATA(h); + struct rtattr *tb[IFA_MAX+1]; + int len = h->nlmsg_len + - NLMSG_LENGTH(sizeof(*ifa)); + + memset(tb, 0, sizeof(tb)); + parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); + memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), + BOOTH_IPADDR_LEN); + if (!memcmp(ipaddr, ndaddr, addrlen)) { + found = 1; + goto out; + } + + } + h = NLMSG_NEXT(h, status); + } + } + +out: + close(fd); + return found; +} + +static int load_myid(void) +{ + int i; + + for (i = 0; i < booth_conf->node_count; i++) { + if (find_myself(&booth_conf->node[i])) { + booth_conf->node[i].local = 1; + if (!local.family) + memcpy(&local, &booth_conf->node[i], + sizeof(struct booth_node)); + return booth_conf->node[i].nodeid; + } + } + + return -1; +} + +static int booth_get_myid(void) +{ + if (local.local) + return local.nodeid; + else + return -1; +} + +static void process_dead(int ci) +{ + struct tcp_conn *conn, *safe; + + list_for_each_entry_safe(conn, safe, &tcp, list) { + if (conn->s == client[ci].fd) { + list_del(&conn->list); + free(conn); + break; + } + } + close(client[ci].fd); + client[ci].workfn = NULL; + client[ci].fd = -1; + pollfd[ci].fd = -1; +} + +static void process_tcp_listener(int ci) +{ + int fd, i; + socklen_t addrlen; + struct sockaddr addr; + struct tcp_conn *conn; + + fd = accept(client[ci].fd, &addr, &addrlen); + if (fd < 0) { + log_error("process_tcp_listener: accept error %d %d", + fd, errno); + return; + } + + conn = malloc(sizeof(struct tcp_conn)); + if (!conn) { + log_error("failed to alloc mem"); + return; + } + memset(conn, 0, sizeof(struct tcp_conn)); + conn->s = fd; + memcpy(&conn->to, &addr, sizeof(struct sockaddr)); + list_add_tail(&conn->list, &tcp); + + i = client_add(fd, process_connection, process_dead); + + log_debug("client connection %d fd %d", i, fd); +} + +static int setup_tcp_listener(void) +{ + struct sockaddr_storage sockaddr; + int s, addrlen, rv; + + s = socket(local.family, SOCK_STREAM, 0); + if (s == -1) { + log_error("failed to create tcp socket %s", strerror(errno)); + return s; + } + + ipaddr_to_sockaddr(&local, BOOTH_CMD_PORT, &sockaddr, &addrlen); + rv = bind(s, (struct sockaddr *)&sockaddr, addrlen); + if (rv == -1) { + log_error("failed to bind socket %s", strerror(errno)); + return rv; + } + + rv = listen(s, 5); + if (rv == -1) { + log_error("failed to listen on socket %s", strerror(errno)); + return rv; + } + + return s; +} + +static int booth_tcp_init(void * unused __attribute__((unused))) +{ + int rv; + + if (!local.local) + return -1; + + rv = setup_tcp_listener(); + if (rv < 0) + return rv; + + client_add(rv, process_tcp_listener, NULL); + + return 0; +} + +static int booth_tcp_open(struct booth_node *to) +{ + struct sockaddr_storage sockaddr; + struct tcp_conn *conn; + int addrlen, rv, s, found = 0; + + ipaddr_to_sockaddr(to, BOOTH_CMD_PORT, &sockaddr, &addrlen); + list_for_each_entry(conn, &tcp, list) { + if (!memcmp(&conn->to, &sockaddr, sizeof(sockaddr))) { + found = 1; + break; + } + } + + if (!found) { + s = socket(BOOTH_PROTO_FAMILY, SOCK_STREAM, 0); + if (s == -1) + return -1; + + rv = connect(s, (struct sockaddr *)&sockaddr, addrlen); + if (rv == -1) { + close(s); + return rv; + } + + conn = malloc(sizeof(struct tcp_conn)); + if (!conn) { + log_error("failed to alloc mem"); + close(s); + return -ENOMEM; + } + memset(conn, 0, sizeof(struct tcp_conn)); + conn->s = s; + memcpy(&conn->to, &sockaddr, sizeof(struct sockaddr)); + list_add_tail(&conn->list, &tcp); + } + + return conn->s; +} + +static int booth_tcp_send(unsigned long to, void *buf, int len) +{ + return do_write(to, buf, len); +} + +static int booth_tcp_recv(unsigned long from, void *buf, int len) +{ + return do_read(from, buf, len); +} + +static int booth_tcp_close(unsigned long s) +{ + struct tcp_conn *conn; + + list_for_each_entry(conn, &tcp, list) { + if (conn->s == s) { + list_del(&conn->list); + close(s); + free(conn); + goto out; + } + } +out: + return 0; +} + +static int booth_tcp_exit(void) +{ + return 0; +} + +static int setup_udp_server(void) +{ + struct sockaddr_storage sockaddr; + int addrlen, rv; + unsigned int recvbuf_size; + + udp.s = socket(local.family, SOCK_DGRAM, 0); + if (udp.s == -1) { + log_error("failed to create udp socket %s", strerror(errno)); + return -1; + } + + rv = fcntl(udp.s, F_SETFL, O_NONBLOCK); + if (rv == -1) { + log_error("failed to set non-blocking operation " + "on udp socket: %s", strerror(errno)); + close(udp.s); + return -1; + } + + ipaddr_to_sockaddr(&local, booth_conf->port, &sockaddr, &addrlen); + + rv = bind(udp.s, (struct sockaddr *)&sockaddr, addrlen); + if (rv == -1) { + log_error("failed to bind socket %s", strerror(errno)); + close(udp.s); + return -1; + } + + recvbuf_size = SOCKET_BUFFER_SIZE; + rv = setsockopt(udp.s, SOL_SOCKET, SO_RCVBUF, + &recvbuf_size, sizeof(recvbuf_size)); + if (rv == -1) { + log_error("failed to set recvbuf size"); + close(udp.s); + return -1; + } + + return udp.s; +} + +static void process_recv(int ci) +{ + struct msghdr msg_recv; + struct sockaddr_storage system_from; + int received; + unsigned char *msg_offset; + + msg_recv.msg_name = &system_from; + msg_recv.msg_namelen = sizeof (struct sockaddr_storage); + msg_recv.msg_iov = &udp.iov_recv; + msg_recv.msg_iovlen = 1; + msg_recv.msg_control = 0; + msg_recv.msg_controllen = 0; + msg_recv.msg_flags = 0; + + received = recvmsg(client[ci].fd, &msg_recv, + MSG_NOSIGNAL | MSG_DONTWAIT); + if (received == -1) + return; + + msg_offset = udp.iov_recv.iov_base; + + deliver_fn(msg_offset, received); +} + +static int booth_udp_init(void *f) +{ + int myid = -1; + + memset(&local, 0, sizeof(struct booth_node)); + + myid = load_myid(); + if (myid < 0) { + log_error("can't find myself in config file"); + return -1; + } + + memset(&udp, 0, sizeof(struct udp_context)); + udp.iov_recv.iov_base = udp.iov_buffer; + udp.iov_recv.iov_len = FRAME_SIZE_MAX; + + udp.s = setup_udp_server(); + if (udp.s == -1) + return -1; + + deliver_fn = f; + + client_add(udp.s, process_recv, NULL); + + return 0; +} + +static int booth_udp_send(unsigned long to, void *buf, int len) +{ + struct msghdr msg; + struct sockaddr_storage sockaddr; + struct iovec iovec; + unsigned int iov_len; + int addrlen, rv; + + iovec.iov_base = (void *)buf; + iovec.iov_len = len; + iov_len = 1; + + ipaddr_to_sockaddr((struct booth_node *)to, booth_conf->port, + &sockaddr, &addrlen); + + msg.msg_name = &sockaddr; + msg.msg_namelen = addrlen; + msg.msg_iov = (void *)&iovec; + msg.msg_iovlen = iov_len; + msg.msg_control = 0; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + rv = sendmsg(udp.s, &msg, MSG_NOSIGNAL); + if (rv < 0) + return rv; + + return 0; +} + +static int booth_udp_broadcast(void *buf, int len) +{ + int i; + + if (!booth_conf || !booth_conf->node_count) + return -1; + + for (i = 0; i < booth_conf->node_count; i++) + booth_udp_send((unsigned long)&booth_conf->node[i], buf, len); + + return 0; +} + +static int booth_udp_exit(void) +{ + return 0; +} + +/* SCTP transport layer has not been developed yet */ +static int booth_sctp_init(void *f __attribute__((unused))) +{ + return 0; +} + +static int booth_sctp_send(unsigned long to __attribute__((unused)), + void *buf __attribute__((unused)), + int len __attribute__((unused))) +{ + return 0; +} + +static int booth_sctp_broadcast(void *buf __attribute__((unused)), + int len __attribute__((unused))) +{ + return 0; +} + +static int booth_sctp_exit(void) +{ + return 0; +} + +struct booth_transport booth_transport[] = { + { + .name = "TCP", + .init = booth_tcp_init, + .get_myid = booth_get_myid, + .open = booth_tcp_open, + .send = booth_tcp_send, + .recv = booth_tcp_recv, + .close = booth_tcp_close, + .exit = booth_tcp_exit + }, + { + .name = "UDP", + .init = booth_udp_init, + .get_myid = booth_get_myid, + .send = booth_udp_send, + .broadcast = booth_udp_broadcast, + .exit = booth_udp_exit + }, + { + .name = "SCTP", + .init = booth_sctp_init, + .get_myid = booth_get_myid, + .send = booth_sctp_send, + .broadcast = booth_sctp_broadcast, + .exit = booth_sctp_exit + } +}; + diff --git a/src/transport.h b/src/transport.h new file mode 100644 index 0000000..1b4767b --- /dev/null +++ b/src/transport.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2011 Jiaju Zhang + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef _TRANSPORT_H +#define _TRANSPORT_H + +#include "booth.h" + +struct booth_node { + int nodeid; + int type; + int local; + unsigned short family; + char addr[BOOTH_NAME_LEN]; +} __attribute__((packed)); + +typedef enum { + TCP = 0, + UDP = 1, + SCTP = 2, + TRANSPORT_ENTRIES = 3, +} transport_layer_t; + +typedef enum { + ARBITRATOR = 1, + SITE, +} node_type_t; + +struct booth_transport { + const char *name; + int (*init) (void *); + int (*get_myid) (void); + int (*open) (struct booth_node *); + int (*send) (unsigned long, void *, int); + int (*recv) (unsigned long, void *, int); + int (*broadcast) (void *, int); + int (*close) (unsigned long); + int (*exit) (void); +}; + +struct booth_transport booth_transport[TRANSPORT_ENTRIES]; + +#endif /* _TRANSPORT_H */