diff --git a/src/request.c b/src/request.c index 2354490..9c9ee8e 100644 --- a/src/request.c +++ b/src/request.c @@ -1,84 +1,85 @@ /* * Copyright (C) 2015 Dejan Muhamedagic * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include "booth.h" #include "ticket.h" #include "request.h" #include "log.h" static GList *req_l = NULL; static int req_id_cnt; /* add request to the queue; it is up to the caller to manage * memory for the three parameters */ void *add_req( struct ticket_config *tk, struct client *req_client, struct boothc_ticket_msg *msg) { struct request *rp; rp = g_new(struct request, 1); if (!rp) return NULL; rp->id = req_id_cnt++; rp->tk = tk; rp->client_fd = req_client->fd; rp->msg = msg; req_l = g_list_append(req_l, rp); return rp; } +/* XXX UNUSED */ int get_req_id(const void *rp) { if (!rp) return -1; return ((struct request *)rp)->id; } static void del_req(GList *lp) { if (!lp) return; req_l = g_list_delete_link(req_l, lp); } void foreach_tkt_req(struct booth_config *conf_ptr, struct ticket_config *tk, req_fp f) { GList *lp, *next; struct request *rp; lp = g_list_first(req_l); while (lp) { next = g_list_next(lp); rp = (struct request *)lp->data; if (rp->tk == tk && (f)(conf_ptr, rp->tk, rp->client_fd, rp->msg) == 0) { log_debug("remove request for client %d", rp->client_fd); del_req(lp); /* don't need this request anymore */ } lp = next; } } diff --git a/src/request.h b/src/request.h index aee8985..bfddec2 100644 --- a/src/request.h +++ b/src/request.h @@ -1,67 +1,69 @@ /* * Copyright (C) 2015 Dejan Muhamedagic * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef _REQUEST_H #define _REQUEST_H #include "booth.h" #include "config.h" /* Requests are coming from clients and get queued in a * round-robin queue (fixed size) * * This is one way to make the server more responsive and less * dependent on misbehaving clients. The requests are queued and * later served from the server loop. */ struct request { /** Request ID */ int id; /** The ticket. */ struct ticket_config *tk; /** The client which sent the request */ int client_fd; /** The message containing the request */ void *msg; }; typedef int (*req_fp)(struct booth_config *conf_ptr, struct ticket_config *, int, struct boothc_ticket_msg *); void *add_req(struct ticket_config *tk, struct client *req_client, struct boothc_ticket_msg *msg); /** * @internal * Handle all pendign requests for given ticket using function @p f * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * @param[in] f handling function * * @return 1 on success, 0 when not done with the message, yet */ void foreach_tkt_req(struct booth_config *conf_ptr, struct ticket_config *tk, req_fp f); + +/* XXX UNUSED */ int get_req_id(const void *rp); #endif /* _REQUEST_H */ diff --git a/src/ticket.c b/src/ticket.c index cfc2826..ed2e808 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,1517 +1,1518 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include #include #include #include "b_config.h" #ifndef RANGE2RANDOM_GLIB #include #else #include "alt/range2random_glib.h" #endif #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #include "request.h" #include "manual.h" #define TK_LINE 256 extern int TIME_RES; /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iname, ticket, sizeof(tk->name))) { if (found) *found = tk; return 1; } } return 0; } int check_ticket(struct booth_config *conf_ptr, const char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (conf_ptr == NULL) return 0; if (!check_max_len_valid(ticket, sizeof(conf_ptr->ticket[0].name))) return 0; return find_ticket_by_name(conf_ptr, ticket, found); } /* XXX UNUSED */ int check_site(struct booth_config *conf_ptr, const char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(conf_ptr, site, &node, 0)) { *is_local = node->local; return 1; } return 0; } /* is it safe to commit the grant? * if we didn't hear from all sites on the initial grant, we may * need to delay the commit * * TODO: investigate possibility to devise from history whether a * missing site could be holding a ticket or not */ static int ticket_dangerous(struct booth_config *conf_ptr, struct ticket_config *tk) { int tdiff; /* we may be invoked often, don't spam the log unnecessarily */ static int no_log_delay_msg; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (!is_time_set(&tk->delay_commit)) return 0; if (is_past(&tk->delay_commit) || all_sites_replied(conf_ptr, tk)) { if (tk->leader == conf_ptr->local) { tk_log_info("%s, committing to CIB", is_past(&tk->delay_commit) ? "ticket delay expired" : "all sites replied"); } time_reset(&tk->delay_commit); no_log_delay_msg = 0; return 0; } tdiff = time_left(&tk->delay_commit); tk_log_debug("delay ticket commit for another " intfmt(tdiff)); if (!no_log_delay_msg) { tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff)); tk_log_info("(or all sites are reached)"); no_log_delay_msg = 1; } return 1; } int ticket_write(struct booth_config *conf_ptr, struct ticket_config *tk) { assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (conf_ptr->local->type != SITE) return -EINVAL; if (ticket_dangerous(conf_ptr, tk)) return 1; if (tk->leader == conf_ptr->local) { if (tk->state != ST_LEADER) { tk_log_info("ticket state not yet consistent, " "delaying ticket grant to CIB"); return 1; } conf_ptr->ticket_handler->grant_ticket(tk); } else { conf_ptr->ticket_handler->revoke_ticket(tk); } tk->update_cib = 0; return 0; } void save_committed_tkt(struct ticket_config *tk) { if (!tk->last_valid_tk) { tk->last_valid_tk = malloc(sizeof(struct ticket_config)); if (!tk->last_valid_tk) { log_error("out of memory"); return; } } memcpy(tk->last_valid_tk, tk, sizeof(struct ticket_config)); } static void ext_prog_failed(struct booth_config *conf_ptr, struct ticket_config *tk, int start_election) { assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (!is_manual(tk)) { /* Give it to somebody else. * Just send a VOTE_FOR message, so the * others can start elections. */ if (leader_and_valid(tk, conf_ptr->local)) { save_committed_tkt(tk); reset_ticket(tk); ticket_write(conf_ptr, tk); if (start_election) { ticket_broadcast(conf_ptr, tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL); } } } else { /* There is not much we can do now because * the manual ticket cannot be relocated. * Just warn the user. */ if (tk->leader == conf_ptr->local) { save_committed_tkt(tk); reset_ticket(tk); ticket_write(conf_ptr, tk); log_error("external test failed on the specified machine, cannot acquire a manual ticket"); } } } #define attr_found(geo_ap, ap) \ ((geo_ap) && !strcmp((geo_ap)->val, (ap)->attr_val)) int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type) { GList *el; struct attr_prereq *ap; struct geo_attr *geo_ap; for (el = g_list_first(tk->attr_prereqs); el; el = g_list_next(el)) { ap = (struct attr_prereq *)el->data; if (ap->grant_type != grant_type) continue; geo_ap = (struct geo_attr *)g_hash_table_lookup(tk->attr, ap->attr_name); switch(ap->op) { case ATTR_OP_EQ: if (!attr_found(geo_ap, ap)) goto fail; break; case ATTR_OP_NE: if (attr_found(geo_ap, ap)) goto fail; break; default: break; } } return 0; fail: tk_log_warn("'%s' attr-prereq failed", ap->attr_name); return 1; } /* do we need to run the external program? * or we already done that and waiting for the outcome * or program exited and we can collect the status * return codes * 0: no program defined * RUNCMD_MORE: program forked, results later * != 0: executing program failed (or some other failure) */ static int do_ext_prog(struct booth_config *conf_ptr, struct ticket_config *tk, int start_election) { int rv = 0; if (!tk_test.path) return 0; switch(tk_test.progstate) { case EXTPROG_IDLE: rv = run_handler(conf_ptr, tk); if (rv == RUNCMD_ERR) { tk_log_warn("couldn't run external test, not allowed to acquire ticket"); ext_prog_failed(conf_ptr, tk, start_election); } break; case EXTPROG_RUNNING: /* should never get here, but just in case */ rv = RUNCMD_MORE; break; case EXTPROG_EXITED: rv = tk_test_exit_status(tk); if (rv) { ext_prog_failed(conf_ptr, tk, start_election); } break; case EXTPROG_IGNORE: /* nothing to do here */ break; } return rv; } /* Try to acquire a ticket * Could be manual grant or after start (if the ticket is granted * and still valid in the CIB) * If the external program needs to run, this is run twice, once * to start the program, and then to get the result and start * elections. */ static int acquire_ticket(struct booth_config *conf_ptr, struct ticket_config *tk, cmd_reason_t reason) { int rv; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (reason == OR_ADMIN && check_attr_prereq(tk, GRANT_MANUAL)) return RLT_ATTR_PREREQ; switch(do_ext_prog(conf_ptr, tk, 0)) { case 0: /* everything fine */ break; case RUNCMD_MORE: /* need to wait for the outcome before starting elections */ return 0; default: return RLT_EXT_FAILED; } if (is_manual(tk)) { rv = manual_selection(conf_ptr, tk, conf_ptr->local, 1, reason); } else { rv = new_election(conf_ptr, tk, conf_ptr->local, 1, reason); } return rv ? RLT_SYNC_FAIL : 0; } /** Try to get the ticket for the local site. * */ static int do_grant_ticket(struct booth_config *conf_ptr, struct ticket_config *tk, int options) { int rv; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); tk_log_info("granting ticket"); if (tk->leader == conf_ptr->local) return RLT_SUCCESS; if (is_owned(tk)) { if (is_manual(tk) && (options & OPT_IMMEDIATE)) { /* -F flag has been used while granting a manual ticket. * The ticket will be granted and may end up being granted * on multiple sites */ tk_log_warn("manual ticket forced to be granted! be aware that " "you may end up having two sites holding the same manual " "ticket! revoke the ticket from the unnecessary site!"); } else { return RLT_OVERGRANT; } } set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after); if (options & OPT_IMMEDIATE) { tk_log_warn("granting ticket immediately! If there are " "unreachable sites, _hope_ you are sure that they don't " "have the ticket!"); time_reset(&tk->delay_commit); } rv = acquire_ticket(conf_ptr, tk, OR_ADMIN); if (rv) { time_reset(&tk->delay_commit); return rv; } else { return RLT_MORE; } } static void start_revoke_ticket(struct booth_config *conf_ptr, struct ticket_config *tk) { tk_log_info("revoking ticket"); save_committed_tkt(tk); reset_ticket_and_set_no_leader(tk); ticket_write(conf_ptr, tk); ticket_broadcast(conf_ptr, tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN); } /** Ticket revoke. * Only to be started from the leader. */ static int do_revoke_ticket(struct booth_config *conf_ptr, struct ticket_config *tk) { if (tk->acks_expected) { tk_log_info("delay ticket revoke until the current operation finishes"); set_next_state(tk, ST_INIT); return RLT_MORE; } else { start_revoke_ticket(conf_ptr, tk); return RLT_SUCCESS; } } static int number_sites_marked_as_granted(struct booth_config *conf_ptr, struct ticket_config *tk) { struct booth_site *ignored __attribute__((unused)); int i, result = 0; assert(conf_ptr != NULL); FOREACH_NODE(conf_ptr, i, ignored) { result += tk->sites_where_granted[i]; } return result; } static int list_ticket(struct booth_config *conf_ptr, char **pdata, unsigned int *len) { struct ticket_config *tk; struct booth_site *site; char timeout_str[64]; char pending_str[64]; char *data, *cp; int i, alloc, site_index; time_t ts; int multiple_grant_warning_length = 0; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); *pdata = NULL; *len = 0; alloc = conf_ptr->ticket_count * (BOOTH_NAME_LEN * 2 + 128 + 16); FOREACH_TICKET(conf_ptr, i, tk) { multiple_grant_warning_length = \ number_sites_marked_as_granted(conf_ptr, tk); if (multiple_grant_warning_length > 1) { // 164: 55 + 45 + 2*number_of_multiple_sites + some margin alloc += 164 + BOOTH_NAME_LEN * (1+multiple_grant_warning_length); } } data = malloc(alloc); if (!data) return -ENOMEM; cp = data; FOREACH_TICKET(conf_ptr, i, tk) { if ((!is_manual(tk)) && is_time_set(&tk->term_expires)) { /* Manual tickets doesn't have term_expires defined */ ts = wall_ts(&tk->term_expires); strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&ts)); } else strcpy(timeout_str, "INF"); if (tk->leader == conf_ptr->local && is_time_set(&tk->delay_commit) && !is_past(&tk->delay_commit)) { ts = wall_ts(&tk->delay_commit); strcpy(pending_str, " (commit pending until "); strftime(pending_str + strlen(" (commit pending until "), sizeof(pending_str) - strlen(" (commit pending until ") - 1, "%F %T", localtime(&ts)); strcat(pending_str, ")"); } else *pending_str = '\0'; cp += snprintf(cp, alloc - (cp - data), "ticket: %s, leader: %s", tk->name, ticket_leader_string(tk)); if (is_owned(tk)) { cp += snprintf(cp, alloc - (cp - data), ", expires: %s%s", timeout_str, pending_str); } if (is_manual(tk)) { cp += snprintf(cp, alloc - (cp - data), " [manual mode]"); } cp += snprintf(cp, alloc - (cp - data), "\n"); if (alloc - (cp - data) <= 0) { free(data); return -ENOMEM; } } FOREACH_TICKET(conf_ptr, i, tk) { multiple_grant_warning_length = \ number_sites_marked_as_granted(conf_ptr, tk); if (multiple_grant_warning_length > 1) { cp += snprintf(cp, alloc - (cp - data), "\nWARNING: The ticket %s is granted to multiple sites: ", // ~55 characters tk->name); FOREACH_NODE(conf_ptr, site_index, site) { if (tk->sites_where_granted[site_index] > 0) { cp += snprintf(cp, alloc - (cp - data), "%s", site_string(site)); if (--multiple_grant_warning_length > 0) { cp += snprintf(cp, alloc - (cp - data), ", "); } } } cp += snprintf(cp, alloc - (cp - data), ". Revoke the ticket from the faulty sites.\n"); // ~45 characters } } *pdata = data; *len = cp - data; return 0; } void disown_ticket(struct ticket_config *tk) { set_leader(tk, NULL); tk->is_granted = 0; get_time(&tk->term_expires); } +/* XXX UNUSED */ int disown_if_expired(struct ticket_config *tk) { if (is_past(&tk->term_expires) || !tk->leader) { disown_ticket(tk); return 1; } return 0; } void reset_ticket(struct ticket_config *tk) { ignore_ext_test(tk); disown_ticket(tk); no_resends(tk); set_state(tk, ST_INIT); set_next_state(tk, 0); tk->voted_for = NULL; } void reset_ticket_and_set_no_leader(struct ticket_config *tk) { mark_ticket_as_revoked_from_leader(tk); reset_ticket(tk); tk->leader = no_leader; tk_log_debug("ticket leader set to no_leader"); } static void log_reacquire_reason(struct booth_config *conf_ptr, struct ticket_config *tk) { int valid; const char *where_granted = "\0"; char buff[75]; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires); if (tk->leader == conf_ptr->local) { where_granted = "granted here"; } else { snprintf(buff, sizeof(buff), "granted to %s", site_string(tk->leader)); where_granted = buff; } if (!valid) { tk_log_warn("%s, but not valid " "anymore (will try to reacquire)", where_granted); } if (tk->is_granted && tk->leader != conf_ptr->local) { if (tk->leader && tk->leader != no_leader) { tk_log_error("granted here, but also %s, " "that's really too bad (will try to reacquire)", where_granted); } else { tk_log_warn("granted here, but we're " "not recorded as the grantee (will try to reacquire)"); } } } void update_ticket_state(struct booth_config *conf_ptr, struct ticket_config *tk, struct booth_site *sender) { assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (tk->state == ST_CANDIDATE) { tk_log_info("learned from %s about " "newer ticket, stopping elections", site_string(sender)); /* there could be rejects coming from others; don't log * warnings unnecessarily */ tk->expect_more_rejects = 1; } if (tk->leader == conf_ptr->local || tk->is_granted) { /* message from a live leader with valid ticket? */ if (sender == tk->leader && term_time_left(tk)) { if (tk->is_granted) { tk_log_warn("ticket was granted here, " "but it's live at %s (revoking here)", site_string(sender)); } else { tk_log_info("ticket live at %s", site_string(sender)); } disown_ticket(tk); ticket_write(conf_ptr, tk); set_state(tk, ST_FOLLOWER); set_next_state(tk, ST_FOLLOWER); } else { if (tk->state == ST_CANDIDATE) { set_state(tk, ST_FOLLOWER); } set_next_state(tk, ST_LEADER); } } else { if (!tk->leader || tk->leader == no_leader) { if (sender) tk_log_info("ticket is not granted"); else tk_log_info("ticket is not granted (from CIB)"); set_state(tk, ST_INIT); } else { if (sender) tk_log_info("ticket granted to %s (says %s)", site_string(tk->leader), tk->leader == sender ? "they" : site_string(sender)); else tk_log_info("ticket granted to %s (from CIB)", site_string(tk->leader)); set_state(tk, ST_FOLLOWER); /* just make sure that we check the ticket soon */ set_next_state(tk, ST_FOLLOWER); } } } int setup_ticket(struct booth_config *conf_ptr) { struct ticket_config *tk; int i; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); FOREACH_TICKET(conf_ptr, i, tk) { reset_ticket(tk); if (conf_ptr->local->type == SITE) { if (!conf_ptr->ticket_handler->load_ticket(conf_ptr, tk)) { update_ticket_state(conf_ptr, tk, NULL); } tk->update_cib = 1; } tk_log_info("broadcasting state query"); /* wait until all send their status (or the first * timeout) */ tk->start_postpone = 1; ticket_broadcast(conf_ptr, tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0); } return 0; } int ticket_answer_list(struct booth_config *conf_ptr, int fd) { char *data; int rv; unsigned int olen; struct boothc_hdr_msg hdr; rv = list_ticket(conf_ptr, &data, &olen); if (rv < 0) goto out; init_header(conf_ptr, &hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen); rv = send_header_plus(conf_ptr, fd, &hdr, data, olen); out: if (data) free(data); return rv; } int process_client_request(struct booth_config *conf_ptr, struct client *req_client, void *buf) { int rv, rc = 1; struct ticket_config *tk; int cmd; struct boothc_ticket_msg omsg; struct boothc_ticket_msg *msg; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); msg = (struct boothc_ticket_msg *)buf; cmd = ntohl(msg->header.cmd); if (!check_ticket(conf_ptr, msg->ticket.id, &tk)) { log_warn("client referenced unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply_now; } /* Perform the initial check before granting * an already granted non-manual ticket */ if ((!is_manual(tk) && (cmd == CMD_GRANT) && is_owned(tk))) { log_warn("client wants to grant an (already granted!) ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply_now; } if ((cmd == CMD_REVOKE) && !is_owned(tk)) { log_info("client wants to revoke a free ticket %s", msg->ticket.id); rv = RLT_TICKET_IDLE; goto reply_now; } if ((cmd == CMD_REVOKE) && tk->leader != conf_ptr->local) { tk_log_info("not granted here, redirect to %s", ticket_leader_string(tk)); rv = RLT_REDIRECT; goto reply_now; } if (cmd == CMD_REVOKE) rv = do_revoke_ticket(conf_ptr, tk); else rv = do_grant_ticket(conf_ptr, tk, ntohl(msg->header.options)); if (rv == RLT_MORE) { /* client may receive further notifications, save the * request for further processing */ add_req(tk, req_client, msg); tk_log_debug("queue request %s for client %d", state_to_string(cmd), req_client->fd); rc = 0; /* we're not yet done with the message */ } reply_now: init_ticket_msg(conf_ptr, &omsg, CL_RESULT, 0, rv, 0, tk); send_client_msg(conf_ptr, req_client->fd, &omsg); return rc; } int notify_client(struct booth_config *conf_ptr, struct ticket_config *tk, int client_fd, struct boothc_ticket_msg *msg) { struct boothc_ticket_msg omsg; void (*deadfn) (int ci); int rv, rc, ci; int cmd, options; struct client *req_client; cmd = ntohl(msg->header.cmd); options = ntohl(msg->header.options); rv = tk->outcome; ci = find_client_by_fd(client_fd); if (ci < 0) { tk_log_info("client %d (request %s) left before being notified", client_fd, state_to_string(cmd)); return 0; } tk_log_debug("notifying client %d (request %s)", client_fd, state_to_string(cmd)); init_ticket_msg(conf_ptr, &omsg, CL_RESULT, 0, rv, 0, tk); rc = send_client_msg(conf_ptr, client_fd, &omsg); if (rc == 0 && ((rv == RLT_MORE) || (rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) { /* more to do here, keep the request */ return 1; } else { /* we sent a definite answer or there was a write error, drop * the client */ if (rc) { tk_log_debug("failed to notify client %d (request %s)", client_fd, state_to_string(cmd)); } else { tk_log_debug("client %d (request %s) got final notification", client_fd, state_to_string(cmd)); } req_client = clients + ci; deadfn = req_client->deadfn; if(deadfn) { deadfn(ci); } return 0; /* we're done with this request */ } } int ticket_broadcast(struct booth_config *conf_ptr, struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason) { struct boothc_ticket_msg msg; assert(conf_ptr != NULL); init_ticket_msg(conf_ptr, &msg, cmd, 0, res, reason, tk); tk_log_debug("broadcasting '%s' (term=%d, valid=%d)", state_to_string(cmd), ntohl(msg.ticket.term), msg_term_time(&msg)); tk->last_request = cmd; if (expected_reply) { expect_replies(tk, expected_reply, conf_ptr->local); } ticket_activate_timeout(tk); return transport(conf_ptr)->broadcast_auth(conf_ptr, &msg, sendmsglen(&msg)); } /* update the ticket on the leader, write it to the CIB, and send out the update message to others with the new expiry time */ int leader_update_ticket(struct booth_config *conf_ptr, struct ticket_config *tk) { int rv = 0, rv2; timetype now; if (tk->ticket_updated >= 2) return 0; /* for manual tickets, we don't set time expiration */ if (!is_manual(tk)) { if (tk->ticket_updated < 1) { tk->ticket_updated = 1; get_time(&now); copy_time(&now, &tk->last_renewal); set_future_time(&tk->term_expires, tk->term_duration); rv = ticket_broadcast(conf_ptr, tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0); } } if (tk->ticket_updated < 2) { rv2 = ticket_write(conf_ptr, tk); switch(rv2) { case 0: tk->ticket_updated = 2; tk->outcome = RLT_SUCCESS; foreach_tkt_req(conf_ptr, tk, notify_client); break; case 1: if (tk->outcome != RLT_CIB_PENDING) { tk->outcome = RLT_CIB_PENDING; foreach_tkt_req(conf_ptr, tk, notify_client); } break; default: break; } } return rv; } static void log_lost_servers(struct booth_config *conf_ptr, struct ticket_config *tk) { struct booth_site *n; int i; assert(conf_ptr != NULL); if (tk->retry_number > 1) /* log those that we couldn't reach, but do * that only on the first retry */ return; FOREACH_NODE(conf_ptr, i, n) { if (!(tk->acks_received & n->bitmask)) { tk_log_warn("%s %s didn't acknowledge our %s, " "will retry %d times", (n->type == ARBITRATOR ? "arbitrator" : "site"), site_string(n), state_to_string(tk->last_request), tk->retries); } } } static void resend_msg(struct booth_config *conf_ptr, struct ticket_config *tk) { struct booth_site *n; int i; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (!(tk->acks_received ^ conf_ptr->local->bitmask)) { ticket_broadcast(conf_ptr, tk, tk->last_request, 0, RLT_SUCCESS, 0); } else { FOREACH_NODE(conf_ptr, i, n) { if (!(tk->acks_received & n->bitmask)) { n->resend_cnt++; tk_log_debug("resending %s to %s", state_to_string(tk->last_request), site_string(n) ); send_msg(conf_ptr, tk->last_request, tk, n, NULL); } } ticket_activate_timeout(tk); } } static void handle_resends(struct booth_config *conf_ptr, struct ticket_config *tk) { int ack_cnt; if (++tk->retry_number > tk->retries) { tk_log_info("giving up on sending retries"); no_resends(tk); set_ticket_wakeup(conf_ptr, tk); return; } /* try to reach some sites again if we just stepped down */ if (tk->last_request == OP_VOTE_FOR) { tk_log_warn("no answers to our VtFr request to step down (try #%d), " "we are alone", tk->retry_number); goto just_resend; } if (!majority_of_bits(conf_ptr, tk, tk->acks_received)) { ack_cnt = count_bits(tk->acks_received) - 1; if (!ack_cnt) { tk_log_warn("no answers to our request (try #%d), " "we are alone", tk->retry_number); } else { tk_log_warn("not enough answers to our request (try #%d): " "only got %d answers", tk->retry_number, ack_cnt); } } else { log_lost_servers(conf_ptr, tk); } just_resend: resend_msg(conf_ptr, tk); } -int postpone_ticket_processing(struct ticket_config *tk) +static int postpone_ticket_processing(struct ticket_config *tk) { extern timetype start_time; return tk->start_postpone && (-time_left(&start_time) < tk->timeout); } #define has_extprog_exited(tk) ((tk)->clu_test.progstate == EXTPROG_EXITED) static void process_next_state(struct booth_config *conf_ptr, struct ticket_config *tk) { int rv; switch(tk->next_state) { case ST_LEADER: if (has_extprog_exited(tk)) { if (tk->state != ST_LEADER) { rv = acquire_ticket(conf_ptr, tk, OR_ADMIN); if (rv != 0) { /* external program failed */ tk->outcome = rv; foreach_tkt_req(conf_ptr, tk, notify_client); } } } else { log_reacquire_reason(conf_ptr, tk); acquire_ticket(conf_ptr, tk, OR_REACQUIRE); } break; case ST_INIT: no_resends(tk); start_revoke_ticket(conf_ptr, tk); tk->outcome = RLT_SUCCESS; foreach_tkt_req(conf_ptr, tk, notify_client); break; /* wanting to be follower is not much of an ambition; no * processing, just return; don't reset start_postpone until * we got some replies to status */ case ST_FOLLOWER: return; default: break; } tk->start_postpone = 0; } static void ticket_lost(struct booth_config *conf_ptr, struct ticket_config *tk) { int reason = OR_TKT_LOST; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (tk->leader != conf_ptr->local) { tk_log_warn("lost at %s", site_string(tk->leader)); } else { if (is_ext_prog_running(tk)) { ext_prog_timeout(tk); reason = OR_LOCAL_FAIL; } else { tk_log_warn("lost majority (revoking locally)"); reason = tk->election_reason ? tk->election_reason : OR_REACQUIRE; } } tk->lost_leader = tk->leader; save_committed_tkt(tk); mark_ticket_as_revoked_from_leader(tk); reset_ticket(tk); set_state(tk, ST_FOLLOWER); if (conf_ptr->local->type == SITE) { ticket_write(conf_ptr, tk); schedule_election(conf_ptr, tk, reason); } } static void next_action(struct booth_config *conf_ptr, struct ticket_config *tk) { int rv; switch(tk->state) { case ST_INIT: /* init state, handle resends for ticket revoke */ /* and rebroadcast if stepping down */ /* try to acquire ticket on grant */ if (has_extprog_exited(tk)) { rv = acquire_ticket(conf_ptr, tk, OR_ADMIN); if (rv != 0) { /* external program failed */ tk->outcome = rv; foreach_tkt_req(conf_ptr, tk, notify_client); } } else { if (tk->acks_expected) { handle_resends(conf_ptr, tk); } } break; case ST_FOLLOWER: if (!is_manual(tk)) { /* leader/ticket lost? and we didn't vote yet */ tk_log_debug("leader: %s, voted_for: %s", site_string(tk->leader), site_string(tk->voted_for)); if (!tk->leader) { if (!tk->voted_for || !tk->in_election) { disown_ticket(tk); if (!new_election(conf_ptr, tk, NULL, 1, OR_AGAIN)) { ticket_activate_timeout(tk); } } else { /* we should restart elections in case nothing * happens in the meantime */ tk->in_election = 0; ticket_activate_timeout(tk); } } } else { /* for manual tickets, also try to acquire ticket on grant * in the Follower state (because we may end up having * two Leaders) */ if (has_extprog_exited(tk)) { rv = acquire_ticket(conf_ptr, tk, OR_ADMIN); if (rv != 0) { /* external program failed */ tk->outcome = rv; foreach_tkt_req(conf_ptr, tk, notify_client); } } else { /* Otherwise, just send ACKs if needed */ if (tk->acks_expected) { handle_resends(conf_ptr, tk); } } } break; case ST_CANDIDATE: /* elections timed out? */ elections_end(conf_ptr, tk); break; case ST_LEADER: /* timeout or ticket renewal? */ if (tk->acks_expected) { handle_resends(conf_ptr, tk); if (majority_of_bits(conf_ptr, tk, tk->acks_received)) { leader_update_ticket(conf_ptr, tk); } } else { /* this is ticket renewal, run local test */ if (!do_ext_prog(conf_ptr, tk, 1)) { ticket_broadcast(conf_ptr, tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0); tk->ticket_updated = 0; } } break; default: break; } } static void ticket_cron(struct booth_config *conf_ptr, struct ticket_config *tk) { /* don't process the tickets too early after start */ if (postpone_ticket_processing(tk)) { tk_log_debug("ticket processing postponed (start_postpone=%d)", tk->start_postpone); /* but run again soon */ ticket_activate_timeout(tk); return; } /* no need for status resends, we hope we got at least one * my_index back */ if (tk->acks_expected == OP_MY_INDEX) { no_resends(tk); } /* after startup, we need to decide what to do based on the * current ticket state; tk->next_state has a hint * also used for revokes which had to be delayed */ if (tk->next_state) { process_next_state(conf_ptr, tk); goto out; } /* Has an owner, has an expiry date, and expiry date in the past? * For automatic tickets, losing the ticket must happen * in _every_ state. */ if ((!is_manual(tk)) && is_owned(tk) && is_time_set(&tk->term_expires) && is_past(&tk->term_expires)) { ticket_lost(conf_ptr, tk); goto out; } next_action(conf_ptr, tk); out: tk->next_state = 0; if (!tk->in_election && tk->update_cib) ticket_write(conf_ptr, tk); } void process_tickets(struct booth_config *conf_ptr) { struct ticket_config *tk; int i; timetype last_cron; assert(conf_ptr != NULL); FOREACH_TICKET(conf_ptr, i, tk) { if (!has_extprog_exited(tk) && is_time_set(&tk->next_cron) && !is_past(&tk->next_cron)) continue; tk_log_debug("ticket cron"); copy_time(&tk->next_cron, &last_cron); ticket_cron(conf_ptr, tk); if (time_cmp(&last_cron, &tk->next_cron, ==)) { tk_log_debug("nobody set ticket wakeup"); set_ticket_wakeup(conf_ptr, tk); } } } void tickets_log_info(struct booth_config *conf_ptr) { struct ticket_config *tk; int i; time_t ts; assert(conf_ptr != NULL); FOREACH_TICKET(conf_ptr, i, tk) { ts = wall_ts(&tk->term_expires); tk_log_info("state '%s' " "term %d " "leader %s " "expires %-24.24s", state_to_string(tk->state), tk->current_term, ticket_leader_string(tk), ctime(&ts)); } } static void update_acks(struct booth_config *conf_ptr, struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t cmd; uint32_t req; cmd = ntohl(msg->header.cmd); req = ntohl(msg->header.request); if (req != tk->last_request || (tk->acks_expected != cmd && tk->acks_expected != OP_REJECTED)) return; /* got an ack! */ tk->acks_received |= sender->bitmask; if (all_replied(conf_ptr, tk) || /* we just stepped down, need only one site to start * elections */ (cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) { no_resends(tk); tk->start_postpone = 0; set_ticket_wakeup(conf_ptr, tk); } } /* read ticket message */ int ticket_recv(struct booth_config *conf_ptr, void *buf, struct booth_site *source) { struct boothc_ticket_msg *msg; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; msg = (struct boothc_ticket_msg *)buf; if (!check_ticket(conf_ptr, msg->ticket.id, &tk)) { log_warn("got invalid ticket name %s from %s", msg->ticket.id, site_string(source)); source->invalid_cnt++; return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(conf_ptr, leader_u, &leader)) { tk_log_error("message with unknown leader %u received", leader_u); source->invalid_cnt++; return -EINVAL; } update_acks(conf_ptr, tk, source, leader, msg); return raft_answer(conf_ptr, tk, source, leader, msg); } static void log_next_wakeup(struct ticket_config *tk) { int left; left = time_left(&tk->next_cron); tk_log_debug("set ticket wakeup in " intfmt(left)); } /* New vote round; ยง5.2 */ /* delay the next election start for some random time * (up to 1 second) */ void add_random_delay(struct ticket_config *tk) { timetype tv; interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv); ticket_next_cron_at(tk, &tv); if (ANYDEBUG) { log_next_wakeup(tk); } } void set_ticket_wakeup(struct booth_config *conf_ptr, struct ticket_config *tk) { timetype near_future, tv, next_vote; set_future_time(&near_future, 10); if (!is_manual(tk)) { /* At least every hour, perhaps sooner (default) */ tk_log_debug("ticket will be woken up after up to one hour"); ticket_next_cron_in(tk, 3600*TIME_RES); switch (tk->state) { case ST_LEADER: assert(tk->leader == conf_ptr->local); get_next_election_time(tk, &next_vote, conf_ptr->local); /* If timestamp is in the past, wakeup in * near future */ if (!is_time_set(&next_vote)) { tk_log_debug("next ts unset, wakeup soon"); ticket_next_cron_at(tk, &near_future); } else if (is_past(&next_vote)) { int tdiff = time_left(&next_vote); tk_log_debug("next ts in the past " intfmt(tdiff)); ticket_next_cron_at(tk, &near_future); } else { ticket_next_cron_at(tk, &next_vote); } break; case ST_CANDIDATE: assert(is_time_set(&tk->election_end)); ticket_next_cron_at(tk, &tk->election_end); break; case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on it later on. * If no one is interested - don't care. */ if (is_owned(tk)) { interval_add(&tk->term_expires, tk->acquire_after, &tv); ticket_next_cron_at(tk, &tv); } break; default: tk_log_error("unknown ticket state: %d", tk->state); } if (tk->next_state) { /* we need to do something soon here */ if (!tk->acks_expected) { ticket_next_cron_at(tk, &near_future); } else { ticket_activate_timeout(tk); } } } else { /* At least six minutes, to make sure that multi-leader situations * will be solved promptly. */ tk_log_debug("manual ticket will be woken up after up to six minutes"); ticket_next_cron_in(tk, 60*TIME_RES); /* For manual tickets, no earlier timeout could be set in a similar * way as it is done in a switch above for automatic tickets. * The reason is that term's timeout is INF and no Raft-based elections * are performed. */ } if (ANYDEBUG) { log_next_wakeup(tk); } } void schedule_election(struct booth_config *conf_ptr, struct ticket_config *tk, cmd_reason_t reason) { assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (conf_ptr->local->type != SITE) return; tk->election_reason = reason; get_time(&tk->next_cron); /* introduce a short delay before starting election */ add_random_delay(tk); } int is_manual(struct ticket_config *tk) { return (tk->mode == TICKET_MODE_MANUAL) ? 1 : 0; } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_config *conf_ptr, struct booth_site *dest, struct ticket_config *tk, cmd_result_t code, struct boothc_ticket_msg *in_msg) { int req = ntohl(in_msg->header.cmd); struct boothc_ticket_msg msg; tk_log_debug("sending reject to %s", site_string(dest)); init_ticket_msg(conf_ptr, &msg, OP_REJECTED, req, code, 0, tk); return booth_udp_send_auth(conf_ptr, dest, &msg, sendmsglen(&msg)); } int send_msg(struct booth_config *conf_ptr, int cmd, struct ticket_config *tk, struct booth_site *dest, struct boothc_ticket_msg *in_msg) { int req = 0; struct ticket_config *valid_tk = tk; struct boothc_ticket_msg msg; /* if we want to send the last valid ticket, then if we're in * the ST_CANDIDATE state, the last valid ticket is in * tk->last_valid_tk */ if (cmd == OP_MY_INDEX) { if (tk->state == ST_CANDIDATE && tk->last_valid_tk) { valid_tk = tk->last_valid_tk; } tk_log_info("sending status to %s", site_string(dest)); } if (in_msg) req = ntohl(in_msg->header.cmd); init_ticket_msg(conf_ptr, &msg, cmd, req, RLT_SUCCESS, 0, valid_tk); return booth_udp_send_auth(conf_ptr, dest, &msg, sendmsglen(&msg)); } diff --git a/src/ticket.h b/src/ticket.h index 7258888..496f4b3 100644 --- a/src/ticket.h +++ b/src/ticket.h @@ -1,354 +1,351 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef _TICKET_H #define _TICKET_H #include #include #include #include "timer.h" #include "config.h" #include "log.h" extern int TIME_RES; #define DEFAULT_TICKET_EXPIRY (600*TIME_RES) #define DEFAULT_TICKET_TIMEOUT (5*TIME_RES) #define DEFAULT_RETRIES 10 #define FOREACH_TICKET(b_, i_, t_) \ for (i_ = 0; \ (t_ = (b_)->ticket + i_, i_ < (b_)->ticket_count); \ i_++) #define FOREACH_NODE(b_, i_, n_) \ for (i_ = 0; \ (n_ = (b_)->site + i_, i_ < (b_)->site_count); \ i_++) #define set_leader(tk, who) do { \ if (who == NULL) { \ mark_ticket_as_revoked_from_leader(tk); \ } \ \ tk->leader = who; \ tk_log_debug("ticket leader set to %s", ticket_leader_string(tk)); \ \ if (tk->leader) { \ mark_ticket_as_granted(tk, tk->leader); \ } \ } while(0) #define mark_ticket_as_granted(tk, who) do { \ if (is_manual(tk) && (who->index > -1)) { \ tk->sites_where_granted[who->index] = 1; \ tk_log_debug("manual ticket marked as granted to %s", ticket_leader_string(tk)); \ } \ } while(0) #define mark_ticket_as_revoked(tk, who) do { \ if (is_manual(tk) && who && (who->index > -1)) { \ tk->sites_where_granted[who->index] = 0; \ tk_log_debug("manual ticket marked as revoked from %s", site_string(who)); \ } \ } while(0) #define mark_ticket_as_revoked_from_leader(tk) do { \ if (tk->leader) { \ mark_ticket_as_revoked(tk, tk->leader); \ } \ } while(0) #define set_state(tk, newst) do { \ tk_log_debug("state transition: %s -> %s", \ state_to_string(tk->state), state_to_string(newst)); \ tk->state = newst; \ } while(0) #define set_next_state(tk, newst) do { \ if (!(newst)) tk_log_debug("next state reset"); \ else tk_log_debug("next state set to %s", state_to_string(newst)); \ tk->next_state = newst; \ } while(0) #define is_term_invalid(tk, term) \ ((tk)->last_valid_tk && (tk)->last_valid_tk->current_term > (term)) void save_committed_tkt(struct ticket_config *tk); void disown_ticket(struct ticket_config *tk); + +/* XXX UNUSED */ int disown_if_expired(struct ticket_config *tk); /** * @internal * Pick a ticket structure based on given name, with some apriori sanity checks * * @param[inout] conf_ptr config object to refer to * @param[in] ticket name of the ticket to search for * @param[out] found place the reference here when found * * @return 0 on failure, see @find_ticket_by_name otherwise */ int check_ticket(struct booth_config *conf_ptr, const char *ticket, struct ticket_config **tc); /** * @internal * Check whether given site is valid * * @param[inout] conf_ptr config object to refer to * @param[in] site which member to look for * @param[out] is_local store whether the member is local on success * * @note XXX UNUSED * * @return 1 on success (found and valid), 0 otherwise */ int check_site(struct booth_config *conf_ptr, const char *site, int *local); -int grant_ticket(struct ticket_config *ticket); -int revoke_ticket(struct ticket_config *ticket); - /** * @internal * Second stage of incoming datagram handling (after authentication) * * @param[inout] conf_ptr config object to refer to * @param[in] buf raw message to act upon * @param[in] source member originating this message * * @return 0 on success or negative value (-1 or -errno) on error */ int ticket_recv(struct booth_config *conf_ptr, void *buf, struct booth_site *source); void reset_ticket(struct ticket_config *tk); void reset_ticket_and_set_no_leader(struct ticket_config *tk); /** * @internal * Based on the current state and circumstances, make a state transition * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * @param[in] sender site structure of the sender */ void update_ticket_state(struct booth_config *conf_ptr, struct ticket_config *tk, struct booth_site *sender); /** * @internal * Initial "consult local pacemaker and booth peers" inquiries * * @param[inout] conf_ptr config object to use as a starting point * * @return 0 (for the time being) */ int setup_ticket(struct booth_config *conf_ptr); int check_max_len_valid(const char *s, int max); /** * @internal * Pick a ticket structure based on given name * * @param[inout] conf_ptr config object to refer to * @param[in] ticket name of the ticket to search for * @param[out] found place the reference here when found * * @return see @list_ticket and @send_header_plus */ int find_ticket_by_name(struct booth_config *conf_ptr, const char *ticket, struct ticket_config **found); /** * @internal * Apply the next step with the ticket if possible. * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand */ void set_ticket_wakeup(struct booth_config *conf_ptr, struct ticket_config *tk); -int postpone_ticket_processing(struct ticket_config *tk); - /** * @internal * Implementation of the ticket listing * * @param[inout] conf_ptr config object to refer to * @param[in] file descriptor of the socket to respond to * * @return see @list_ticket and @send_header_plus */ int ticket_answer_list(struct booth_config *conf_ptr, int fd); /** * @internal * Process request from the client (as opposed to peer daemon) * * @param[inout] conf_ptr config object to refer to * @param[in] req_client client structure of the sender * @param[in] buf message itself * * @return 1 on success, 0 when not done with the message, yet */ int process_client_request(struct booth_config *conf_ptr, struct client *req_client, void *buf); /** * @internal * Cause the ticket storage backend to persist the ticket * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * * @return 0 on success, 1 when not carried out for being dangerous */ int ticket_write(struct booth_config *conf_ptr, struct ticket_config *tk); /** * @internal * Mainloop of booth ticket handling * * @param[inout] conf_ptr config object to refer to */ void process_tickets(struct booth_config *conf_ptr); /** * @internal * For each ticket, log some notable properties * * @param[inout] conf_ptr config object to refer to */ void tickets_log_info(struct booth_config *conf_ptr); char *state_to_string(uint32_t state_ho); /** * @internal * For a given ticket and recipient site, send a rejection * * @param[inout] conf_ptr config object to refer to * @param[in] dest site structure of the recipient * @param[in] tk ticket at hand * @param[in] code further detail for the rejection * @param[in] in_msg message this is going to be a response to */ int send_reject(struct booth_config *conf_ptr, struct booth_site *dest, struct ticket_config *tk, cmd_result_t code, struct boothc_ticket_msg *in_msg); /** * @internal * For a given ticket, recipient site and possibly its message, send a response * * @param[inout] conf_ptr config object to refer to * @param[in] cmd what type of message is to be sent * @param[in] dest site structure of the recipient * @param[in] in_msg message this is going to be a response to */ int send_msg(struct booth_config *conf_ptr, int cmd, struct ticket_config *tk, struct booth_site *dest, struct boothc_ticket_msg *in_msg); /** * @internal * Notify client at particular socket, regarding particular ticket * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * @param[in] fd file descriptor of the socket to respond to * @param[in] msg input message being responded to */ int notify_client(struct booth_config *conf_ptr, struct ticket_config *tk, int client_fd, struct boothc_ticket_msg *msg); /** * @internal * Broadcast the current state of the ticket as seen from local perspective * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * @param[in] cmd what type of message is to be sent * @param[in] expected_reply what to expect in response * @param[in] res may carry further detail with cmd == OP_REJECTED * @param[in] reason trigger of this broadcast */ int ticket_broadcast(struct booth_config *conf_ptr, struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason); /** * @internal * Update the ticket (+broadcast to that effect) and/or write it to the backend * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * * @return 0 or see #ticket_broadcast */ int leader_update_ticket(struct booth_config *conf_ptr, struct ticket_config *tk); void add_random_delay(struct ticket_config *tk); /** * @internal * Make it so the nearest ticket swipe will start election * * @param[inout] conf_ptr config object to refer to * @param[in] tk ticket at hand * @param[in] reason explains why new election is conducted */ void schedule_election(struct booth_config *conf_ptr, struct ticket_config *tk, cmd_reason_t reason); int is_manual(struct ticket_config *tk); int check_attr_prereq(struct ticket_config *tk, grant_type_e grant_type); static inline void ticket_next_cron_at(struct ticket_config *tk, timetype *when) { copy_time(when, &tk->next_cron); } static inline void ticket_next_cron_in(struct ticket_config *tk, int interval) { timetype tv; set_future_time(&tv, interval); ticket_next_cron_at(tk, &tv); } static inline void ticket_activate_timeout(struct ticket_config *tk) { /* TODO: increase timeout when no answers */ tk_log_debug("activate ticket timeout in %d", tk->timeout); ticket_next_cron_in(tk, tk->timeout); } #endif /* _TICKET_H */ diff --git a/src/transport.c b/src/transport.c index f7685b8..158ec51 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,1153 +1,1151 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include #include /* getnameinfo */ #include #include #include #include #include #include #include #include /* getnameinfo */ #include "b_config.h" #include "config.h" #include "transport.h" #include "attr.h" #include "auth.h" #include "booth.h" #include "inline-fn.h" #include "log.h" #include "ticket.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 /* function to be called when handling booth-group-internal messages; * it's expected to return 0 to indicate success, negative integer * to indicate silent (or possibly already complained about) error, * or positive integer to indicate sender's ID that will then be * emitted in the error log message together with the real source * address if this is available */ static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } enum match_type { NO_MATCH = 0, FUZZY_MATCH, EXACT_MATCH, }; static int find_address(struct booth_config *conf_ptr, unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me, int *address_bits_matched) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; int matched; enum match_type did_match = NO_MATCH; assert(conf_ptr != NULL); bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); FOREACH_NODE(conf_ptr, i, node) { if (family != node->family) continue; n_a = node_to_addr_pointer(node); for(matched = 0; matched < node->addrlen; matched++) if (ipaddr[matched] != n_a[matched]) break; if (matched == node->addrlen) { *address_bits_matched = matched * 8; *me = node; did_match = EXACT_MATCH; break; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (matched < bytes) continue; if (matched * 8 < *address_bits_matched) continue; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) { /* _At_least_ prefixlen bits matched. */ if (did_match < EXACT_MATCH) { *address_bits_matched = prefixlen; *me = node; did_match = FUZZY_MATCH; } } } return did_match; } static int _find_myself(struct booth_config *conf_ptr, int family, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me = NULL; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; int address_bits_matched; assert(conf_ptr != NULL); if (conf_ptr->local != NULL) goto found; address_bits_matched = 0; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); /* prefer IFA_LOCAL if it exists, for p-t-p * interfaces, otherwise use IFA_ADDRESS */ if (tb[IFA_LOCAL]) { memcpy(ipaddr, RTA_DATA(tb[IFA_LOCAL]), BOOTH_IPADDR_LEN); } else { memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); } /* Try to find the exact address or the address with subnet matching. * The function find_address will be called for each address received * from NLMSG_DATA above. * The exact match will be preferred. If no exact match is found, * the function find_address will try to return another, most similar * address (with the longest possible number of same bytes). */ if (ifa->ifa_prefixlen > address_bits_matched) { find_address(conf_ptr, ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me, &address_bits_matched); if (me != NULL) { log_debug("found myself at %s (%d bits matched)", site_string(me), address_bits_matched); } } /* If the previous NLMSG_DATA calls have already allowed us * to find an address with address_bits_matched matching bits, * then no other better non-exact address can be found. * But we can still try to find an exact match, so let us * call the function find_address with disabled searching of * similar addresses (fuzzy_allowed == 0) */ else if (ifa->ifa_prefixlen == address_bits_matched) { find_address(conf_ptr, ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, 0 /* fuzzy_allowed */, &me, &address_bits_matched); if (me != NULL) { log_debug("found myself at %s (exact match)", site_string(me)); } } } h = NLMSG_NEXT(h, status); } } close(fd); if (me == NULL) return 0; me->local = 1; conf_ptr->local = me; found: return 1; } int find_myself(struct booth_config *conf_ptr, int fuzzy_allowed) { return _find_myself(conf_ptr, AF_INET6, fuzzy_allowed) || _find_myself(conf_ptr, AF_INET, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", len_incl_data, l); return -EINVAL; } return len_incl_data; } static int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1 && errno == EWOULDBLOCK) break; if (rv == -1) return -1; off += rv; } return off; } static int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = send(fd, (char *)buf + off, count, MSG_NOSIGNAL); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("send failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } - /* Only used for client requests (tcp) */ -int read_client(struct client *req_cl) +static int read_client(struct client *req_cl) { char *msg; struct boothc_header *header; int rv, fd; int len = MAX_MSG_LEN; if (!req_cl->msg) { msg = malloc(MAX_MSG_LEN); if (!msg) { log_error("out of memory for client messages"); return -1; } req_cl->msg = (void *)msg; } else { msg = (char *)req_cl->msg; } header = (struct boothc_header *)msg; /* update len if we read enough */ if (req_cl->offset >= sizeof(*header)) { len = min(ntohl(header->length), MAX_MSG_LEN); } fd = req_cl->fd; rv = do_read(fd, msg+req_cl->offset, len-req_cl->offset); if (rv < 0) { if (errno == ECONNRESET) log_debug("client connection reset for fd %d", fd); return -1; } req_cl->offset += rv; /* update len if we read enough */ if (req_cl->offset >= sizeof(*header)) { len = min(ntohl(header->length), MAX_MSG_LEN); } if (req_cl->offset < len) { /* client promised to send more */ return 1; } if (check_boothc_header(header, len) < 0) { return -1; } return 0; } - /* Only used for client requests (tcp) */ static void process_connection(struct booth_config *conf_ptr, int ci) { struct client *req_cl; void *msg = NULL; struct boothc_header *header; struct boothc_hdr_msg err_reply; cmd_result_t errc; void (*deadfn) (int ci); req_cl = clients + ci; switch (read_client(req_cl)) { case -1: /* error */ goto kill; case 1: /* more to read */ return; case 0: /* we can process the request now */ msg = req_cl->msg; } header = (struct boothc_header *)msg; if (check_auth(conf_ptr, NULL, msg, ntohl(header->length))) { errc = RLT_AUTH; goto send_err; } /* For CMD_GRANT and CMD_REVOKE: * Don't close connection immediately, but send * result a second later? */ switch (ntohl(header->cmd)) { case CMD_LIST: ticket_answer_list(conf_ptr, req_cl->fd); goto kill; case CMD_PEERS: list_peers(conf_ptr, req_cl->fd); goto kill; case CMD_GRANT: case CMD_REVOKE: if (process_client_request(conf_ptr, req_cl, msg) == 1) goto kill; /* request processed definitely, close connection */ else return; case ATTR_LIST: case ATTR_GET: case ATTR_SET: case ATTR_DEL: if (process_attr_request(conf_ptr, req_cl, msg) == 1) goto kill; /* request processed definitely, close connection */ else return; default: log_error("connection %d cmd %x unknown", ci, ntohl(header->cmd)); errc = RLT_INVALID_ARG; goto send_err; } assert(0); return; send_err: init_header(conf_ptr, &err_reply.header, CL_RESULT, 0, 0, errc, 0, sizeof(err_reply)); send_client_msg(conf_ptr, req_cl->fd, &err_reply); kill: deadfn = req_cl->deadfn; if(deadfn) { deadfn(ci); } return; } static void process_tcp_listener(struct booth_config *conf_ptr, int ci) { int fd, i, flags, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } int setup_tcp_listener(struct booth_site *local, int test_only) { int s, rv; int one = 1; assert(local != NULL); s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { close(s); log_error("failed to set the SO_REUSEADDR option"); return rv; } rv = bind(s, &local->sa6, local->saddrlen); if (test_only) { rv = (rv == -1) ? errno : 0; close(s); return rv; } if (rv == -1) { close(s); log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { close(s); log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(struct booth_config *conf_ptr, void * unused __attribute__((unused))) { int rv; assert(conf_ptr != NULL && conf_ptr->transport != NULL); if (get_local_id(conf_ptr) < 0) return -1; rv = setup_tcp_listener(conf_ptr->local, 0); if (rv < 0) return rv; client_add(rv, *conf_ptr->transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } static int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connect to %s got a timeout", site_string(to)); else log_error("connect to %s got an error: %s", site_string(to), strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } /* data + (datalen-sizeof(struct hmac)) points to struct hmac * i.e. struct hmac is always tacked on the payload */ static int add_hmac(struct booth_config *conf_ptr, void *data, int len) { int rv = 0; #if HAVE_LIBGCRYPT || HAVE_LIBMHASH int payload_len; struct hmac *hp; assert(conf_ptr != NULL); if (!is_auth_req(conf_ptr)) return 0; payload_len = len - sizeof(struct hmac); hp = (struct hmac *)((unsigned char *)data + payload_len); hp->hid = htonl(BOOTH_HASH); memset(hp->hash, 0, BOOTH_MAC_SIZE); rv = calc_hmac(data, payload_len, BOOTH_HASH, hp->hash, conf_ptr->authkey, conf_ptr->authkey_len); if (rv < 0) { log_error("internal error: cannot calculate mac"); } #endif return rv; } static int booth_tcp_send(struct booth_config *conf_ptr, struct booth_site *to, void *buf, int len) { int rv; rv = add_hmac(conf_ptr, buf, len); if (!rv) rv = do_write(to->tcp_fd, buf, len); return rv; } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) { log_error("read failed (%d): %s", errno, strerror(errno)); return got; } return got; } static int booth_tcp_recv_auth(struct booth_config *conf_ptr, struct booth_site *from, void *buf, int len) { int got, total; int payload_len; /* Needs timeouts! */ payload_len = len - sizeof(struct hmac); got = booth_tcp_recv(from, buf, payload_len); if (got < 0) { return got; } total = got; if (is_auth_req(conf_ptr)) { got = booth_tcp_recv(from, (unsigned char *)buf+payload_len, sizeof(struct hmac)); if (got != sizeof(struct hmac) || check_auth(conf_ptr, from, buf, len)) { return -1; } total += got; } return total; } static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } static int setup_udp_server(struct booth_site *local) { int rv, fd; int one = 1; unsigned int recvbuf_size; assert(local != NULL); fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { log_error("failed to set the SO_REUSEADDR option"); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", site_string(local), site_port(local), strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(struct booth_config *conf_ptr, int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; /* beware, the buffer needs to be large enough to accept * a packet */ char buffer[MAX_MSG_LEN]; /* Used for unit tests */ struct boothc_ticket_msg *msg; sa_len = sizeof(sa); msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; rv = deliver_fn((void*)msg, rv); if (rv > 0) { if (getnameinfo((struct sockaddr *)&sa, sa_len, buffer, sizeof(buffer), NULL, 0, NI_NUMERICHOST) == 0) log_error("unknown sender: %08x (real: %s)", rv, buffer); else log_error("unknown sender: %08x", rv); } } static int booth_udp_init(struct booth_config *conf_ptr, void *f) { int rv; assert(conf_ptr != NULL && conf_ptr->transport != NULL); assert(conf_ptr->local != NULL); rv = setup_udp_server(conf_ptr->local); if (rv < 0) return rv; deliver_fn = f; client_add(conf_ptr->local->udp_fd, *conf_ptr->transport + UDP, process_recv, NULL); return 0; } static int booth_udp_send(struct booth_config *conf_ptr, struct booth_site *to, void *buf, int len) { int rv; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); to->sent_cnt++; rv = sendto(conf_ptr->local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); if (rv == len) { rv = 0; } else if (rv < 0) { to->sent_err_cnt++; log_error("Cannot send to %s: %d %s", site_string(to), errno, strerror(errno)); } else { rv = -1; to->sent_err_cnt++; log_error("Packet sent to %s got truncated", site_string(to)); } return rv; } int booth_udp_send_auth(struct booth_config *conf_ptr, struct booth_site *to, void *buf, int len) { int rv; rv = add_hmac(conf_ptr, buf, len); if (rv < 0) return rv; return booth_udp_send(conf_ptr, to, buf, len); } static int booth_udp_broadcast_auth(struct booth_config *conf_ptr, void *buf, int len) { int i, rv, rvs; struct booth_site *site; assert(conf_ptr != NULL); assert(conf_ptr->local != NULL); if (conf_ptr == NULL || !conf_ptr->site_count) return -1; rv = add_hmac(conf_ptr, buf, len); if (rv < 0) return rv; rvs = 0; FOREACH_NODE(conf_ptr, i, site) { if (site != conf_ptr->local) { rv = booth_udp_send(conf_ptr, site, buf, len); if (!rvs) rvs = rv; } } return rvs; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(struct booth_config *conf_ptr __attribute__((unused)), void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_config *conf_ptr __attribute__((unused)), struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } /* semi-hidden, only main.c to have a knowledge about this */ const booth_transport_table_t booth__transport = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .recv_auth = booth_tcp_recv_auth, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, .send_auth = booth_udp_send_auth, .close = return_0_booth_site, .broadcast_auth = booth_udp_broadcast_auth, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; #if HAVE_LIBGCRYPT || HAVE_LIBMHASH /* TODO: we need some client identification for logging */ #define peer_string(p) (p ? site_string(p) : "client") /* verify the validity of timestamp from the header * the timestamp needs to be either greater than the one already * recorded for the site or, and this is checked for clients, * not to be older than conf_ptr->maxtimeskew * update the timestamp for the site, if this packet is from a * site */ static int verify_ts(struct booth_config *conf_ptr, struct booth_site *from, void *buf, int len) { struct boothc_header *h; struct timeval tv, curr_tv, now; assert(conf_ptr != NULL); if (len < sizeof(*h)) { log_error("%s: packet too short", peer_string(from)); return -1; } h = (struct boothc_header *)buf; tv.tv_sec = ntohl(h->secs); tv.tv_usec = ntohl(h->usecs); if (from) { curr_tv.tv_sec = from->last_secs; curr_tv.tv_usec = from->last_usecs; if (timercmp(&tv, &curr_tv, >)) goto accept; log_warn("%s: packet timestamp older than previous one", site_string(from)); } gettimeofday(&now, NULL); now.tv_sec -= conf_ptr->maxtimeskew; if (timercmp(&tv, &now, >)) goto accept; log_error("%s: packet timestamp older than %d seconds", peer_string(from), conf_ptr->maxtimeskew); return -1; accept: if (from) { from->last_secs = tv.tv_sec; from->last_usecs = tv.tv_usec; } return 0; } #endif int check_auth(struct booth_config *conf_ptr, struct booth_site *from, void *buf, int len) { int rv = 0; #if HAVE_LIBGCRYPT || HAVE_LIBMHASH int payload_len; struct hmac *hp; assert(conf_ptr != NULL); if (!is_auth_req(conf_ptr)) return 0; payload_len = len - sizeof(struct hmac); if (payload_len < 0) { log_error("%s: failed to authenticate, packet too short (size:%d)", peer_string(from), len); return -1; } hp = (struct hmac *)((unsigned char *)buf + payload_len); rv = verify_hmac(buf, payload_len, ntohl(hp->hid), hp->hash, conf_ptr->authkey, conf_ptr->authkey_len); if (!rv) { rv = verify_ts(conf_ptr, from, buf, len); } if (rv != 0) { log_error("%s: failed to authenticate", peer_string(from)); } #endif return rv; } int send_data(struct booth_config *conf_ptr, int fd, void *data, int datalen) { int rv = 0; rv = add_hmac(conf_ptr, data, datalen); if (!rv) rv = do_write(fd, data, datalen); return rv; } int send_header_plus(struct booth_config *conf_ptr, int fd, struct boothc_hdr_msg *msg, void *data, int len) { int rv; rv = send_data(conf_ptr, fd, msg, sendmsglen(msg)-len); if (rv >= 0 && len) rv = do_write(fd, data, len); return rv; } /* UDP message receiver (see also deliver_fn declaration's comment) */ int message_recv(struct booth_config *conf_ptr, void *msg, int msglen) { uint32_t from; struct boothc_header *header; struct booth_site *source; header = (struct boothc_header *)msg; from = ntohl(header->from); if (!find_site_by_id(conf_ptr, from, &source)) { /* caller knows the actual source address, pass the (assuredly) positive number and let it report */ from = from ? from : ~from; /* avoid 0 (success) */ return from & (~0U >> 1); /* avoid negative (error code} */ } time(&source->last_recv); source->recv_cnt++; if (check_boothc_header(header, msglen) < 0) { log_error("message from %s receive error", site_string(source)); source->recv_err_cnt++; return -1; } if (check_auth(conf_ptr, source, msg, msglen)) { log_error("%s failed to authenticate", site_string(source)); source->sec_cnt++; return -1; } if (ntohl(header->opts) & BOOTH_OPT_ATTR) { /* not used, clients send/retrieve attributes directly * from sites */ return attr_recv(conf_ptr, msg, source); } else { return ticket_recv(conf_ptr, msg, source); } } diff --git a/src/transport.h b/src/transport.h index 7df8d6c..4350736 100644 --- a/src/transport.h +++ b/src/transport.h @@ -1,165 +1,164 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef _TRANSPORT_H #define _TRANSPORT_H #include "b_config.h" #include "booth.h" typedef enum { TCP = 1, UDP, SCTP, TRANSPORT_ENTRIES, } transport_layer_t; typedef enum { ARBITRATOR = 0x50, SITE, CLIENT, DAEMON, STATUS, GEOSTORE, } action_t; /* when allocating space for messages */ #define MAX_MSG_LEN 1024 struct booth_transport { const char *name; int (*init) (struct booth_config *, void *); int (*open) (struct booth_site *); int (*send) (struct booth_config *, struct booth_site *, void *, int); int (*send_auth) (struct booth_config *, struct booth_site *, void *, int); int (*recv) (struct booth_site *, void *, int); int (*recv_auth) (struct booth_config *, struct booth_site *, void *, int); int (*broadcast) (void *, int); int (*broadcast_auth) (struct booth_config *, void *, int); int (*close) (struct booth_site *); int (*exit) (void); }; typedef struct booth_transport booth_transport_table_t[TRANSPORT_ENTRIES]; /** * @internal * Attempts to pick identity of self from config-tracked enumeration of sites * * @param[inout] conf_ptr config object to refer to * @param[in] fuzzy_allowed whether it's OK to approximate the match * * @return 0 on success or negative value (-1 or -errno) on error */ int find_myself(struct booth_config *conf_ptr, int fuzzy_allowed); -int read_client(struct client *req_cl); int check_boothc_header(struct boothc_header *data, int len_incl_data); /** * @internal * Setup the TCP listener/server * * @param[in] local thix verysite * @param[in] test_only whether to just check if binding is clear * * @return 0 on success or -1 or errno on error */ int setup_tcp_listener(struct booth_site *local, int test_only); /** * @internal * Send data, with authentication added * * @param[inout] conf_ptr config object to refer to * @param[in] to site structure of the recipient * @param[in] buf message itself * @param[in] len lenght of #buf * * @return see @add_hmac and @booth_udp_send */ int booth_udp_send_auth(struct booth_config *conf_ptr, struct booth_site *to, void *buf, int len); /** * @internal * First stage of incoming datagram handling (authentication) * * @param[inout] conf_ptr config object to refer to * @param[in] msg raw message to act upon * @param[in] msglen lenght of #msg * * @return 0 on success or negative value (-1 or -errno) on error */ int message_recv(struct booth_config *conf_ptr, void *msg, int msglen); inline static void * node_to_addr_pointer(struct booth_site *node) { switch (node->family) { case AF_INET: return &node->sa4.sin_addr; case AF_INET6: return &node->sa6.sin6_addr; } return NULL; } /** * @internal * Send data, with authentication added * * @param[inout] conf_ptr config object to refer to * @param[in] fd descriptor of the socket to respond to * @param[in] data message itself * @param[in] datalen lenght of #data * * @return 0 on success or negative value (-1 or -errno) on error */ int send_data(struct booth_config *conf_ptr, int fd, void *data, int datalen); /** * @internal * First stage of incoming datagram handling (authentication) * * @param[inout] conf_ptr config object to refer to * @param[in] fd descriptor of the socket to respond to * @param[in] hdr message header * @param[in] data message itself * @param[in] len lengh of @data * * @return see #send_data and #do_write */ int send_header_plus(struct booth_config *conf_ptr, int fd, struct boothc_hdr_msg *hdr, void *data, int len); #define send_client_msg(bc, fd, msg) send_data(bc, fd, msg, sendmsglen(msg)) /** * @internal * First stage of incoming datagram handling (authentication) * * @param[inout] conf_ptr config object to refer to * @param[in] from site structure of the sender * @param[in] buf message to check * @param[in] len lengh of @buf * * @return see #send_data and #do_write */ int check_auth(struct booth_config *conf_ptr, struct booth_site *from, void *buf, int len); #endif /* _TRANSPORT_H */