diff --git a/src/attr.c b/src/attr.c index 43dae91..d9e5c91 100644 --- a/src/attr.c +++ b/src/attr.c @@ -1,471 +1,471 @@ /* * Copyright (C) 2015 Dejan Muhamedagic * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include "attr.h" #include "booth.h" #include "ticket.h" #include "pacemaker.h" void print_geostore_usage(void) { printf( "Usage:\n" " geostore {list|set|get|delete} [-t ticket] [options] attr [value]\n" "\n" " list: List all attributes\n" " set: Set attribute to a value\n" " get: Get attribute's value\n" " delete: Delete attribute\n" "\n" " -t Ticket where attribute resides\n" " (required, if more than one ticket is configured)\n" "\n" "Options:\n" " -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n" " Can be a path or just a name without \".conf\" suffix\n" " -s Connect to a different site\n" " -h Print this help\n" "\n" "Examples:\n" "\n" " # geostore list -t ticket-A -s 10.121.8.183\n" " # geostore set -s 10.121.8.183 sr_status ACTIVE\n" " # geostore get -t ticket-A -s 10.121.8.183 sr_status\n" " # geostore delete -s 10.121.8.183 sr_status\n" "\n" "See the geostore(8) man page for more details.\n" ); } /* * the client side */ /* cl has all the input parameters: * ticket, attr name, attr value */ int test_attr_reply(cmd_result_t reply_code, cmd_request_t cmd) { int rv = 0; const char *op_str = ""; switch (cmd) { case ATTR_SET: op_str = "set"; break; case ATTR_GET: op_str = "get"; break; case ATTR_LIST: op_str = "list"; break; case ATTR_DEL: op_str = "delete"; break; default: log_error("internal error reading reply result!"); return -1; } switch (reply_code) { case RLT_ASYNC: log_info("%s command sent, result will be returned " "asynchronously.", op_str); rv = 0; break; case RLT_SYNC_SUCC: case RLT_SUCCESS: if (cmd == ATTR_SET) log_info("%s succeeded!", op_str); rv = 0; break; case RLT_SYNC_FAIL: log_info("%s failed!", op_str); rv = -1; break; case RLT_INVALID_ARG: log_error("ticket \"%s\" does not exist", cl.attr_msg.attr.tkt_id); rv = 1; break; case RLT_NO_SUCH_ATTR: log_error("attribute \"%s\" not set", cl.attr_msg.attr.name); rv = 1; break; case RLT_AUTH: log_error("authentication error"); rv = -1; break; default: log_error("got an error code: %x", rv); rv = -1; } return rv; } /* read the server's reply * need to first get the header which contains the length of the * reply * return codes: * -2: header not received * -1: header received, but message too short * >=0: success */ static int read_server_reply( struct booth_transport const *tpt, struct booth_site *site, char *msg) { struct boothc_header *header; int rv; int len; header = (struct boothc_header *)msg; rv = tpt->recv(site, header, sizeof(*header)); if (rv < 0) { return -2; } len = ntohl(header->length); rv = tpt->recv(site, msg+len, len-sizeof(*header)); if (rv < 0) { return -1; } return rv; } int do_attr_command(cmd_request_t cmd) { struct booth_site *site = NULL; struct boothc_header *header; struct booth_transport const *tpt; int len, rv = -1; char *msg = NULL; if (!*cl.site) site = local; else { if (!find_site_by_name(cl.site, &site, 1)) { log_error("Site \"%s\" not configured.", cl.site); goto out_close; } } if (site->type == ARBITRATOR) { if (site == local) { log_error("We're just an arbitrator, no attributes here."); } else { log_error("%s is just an arbitrator, no attributes there.", cl.site); } goto out_close; } tpt = booth_transport + TCP; init_header(&cl.attr_msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.attr_msg)); rv = tpt->open(site); if (rv < 0) goto out_close; rv = tpt->send(site, &cl.attr_msg, sendmsglen(&cl.attr_msg)); if (rv < 0) goto out_close; msg = malloc(MAX_MSG_LEN); if (!msg) { log_error("out of memory"); rv = -1; goto out_close; } rv = read_server_reply(tpt, site, msg); header = (struct boothc_header *)msg; if (rv < 0) { if (rv == -1) (void)test_attr_reply(ntohl(header->result), cmd); goto out_close; } len = ntohl(header->length); if (check_boothc_header(header, len) < 0) { log_error("message from %s receive error", site_string(site)); rv = -1; goto out_close; } if (check_auth(site, msg, len)) { log_error("%s failed to authenticate", site_string(site)); rv = -1; goto out_close; } rv = test_attr_reply(ntohl(header->result), cmd); out_close: if (site) tpt->close(site); if (msg) free(msg); return rv; } /* * the server side */ /* need to invert gboolean, our success is 0 */ #define gbool2rlt(i) (i ? RLT_SUCCESS : RLT_SYNC_FAIL) static void free_geo_attr(gpointer data) { struct geo_attr *a = (struct geo_attr *)data; if (!a) return; g_free(a->val); g_free(a); } int store_geo_attr(struct ticket_config *tk, const char *name, const char *val, int notime) { struct geo_attr *a; GDestroyNotify free_geo_attr_notify = free_geo_attr; if (!tk) return -1; /* * allocate new, if attr doesn't already exist * copy the attribute value * send status */ if (!tk->attr) tk->attr = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, free_geo_attr_notify); if (!tk->attr) { log_error("out of memory"); return -1; } if (strnlen(name, BOOTH_NAME_LEN) == BOOTH_NAME_LEN) tk_log_warn("name of the attribute too long (%d+ bytes), skipped", BOOTH_NAME_LEN); else if (strnlen(val, BOOTH_ATTRVAL_LEN) == BOOTH_ATTRVAL_LEN) tk_log_warn("value of the attribute too long (%d+ bytes), skipped", BOOTH_ATTRVAL_LEN); else { a = (struct geo_attr *)calloc(1, sizeof(struct geo_attr)); if (!a) { log_error("out of memory"); return -1; } a->val = g_strdup(val); if (!notime) get_time(&a->update_ts); g_hash_table_insert(tk->attr, g_strdup(name), a); } return 0; } static cmd_result_t attr_set(struct ticket_config *tk, struct boothc_attr_msg *msg) { int rc; rc = store_geo_attr(tk, msg->attr.name, msg->attr.val, 0); if (rc) { return RLT_SYNC_FAIL; } (void)pcmk_handler.set_attr(tk, msg->attr.name, msg->attr.val); return RLT_SUCCESS; } static cmd_result_t attr_del(struct ticket_config *tk, struct boothc_attr_msg *msg) { gboolean rv; gpointer orig_key, value; /* * lookup attr * deallocate, if found * send status */ if (!tk->attr) return RLT_NO_SUCH_ATTR; rv = g_hash_table_lookup_extended(tk->attr, msg->attr.name, &orig_key, &value); if (!rv) return RLT_NO_SUCH_ATTR; rv = g_hash_table_remove(tk->attr, msg->attr.name); (void)pcmk_handler.del_attr(tk, msg->attr.name); return gbool2rlt(rv); } static void append_attr(gpointer key, gpointer value, gpointer user_data) { char *attr_name = (char *)key; struct geo_attr *a = (struct geo_attr *)value; GString *data = (GString *)user_data; char time_str[64]; time_t ts; if (is_time_set(&a->update_ts)) { ts = wall_ts(&a->update_ts); strftime(time_str, sizeof(time_str), "%F %T", localtime(&ts)); } else { time_str[0] = '\0'; } g_string_append_printf(data, "%s %s %s\n", attr_name, a->val, time_str); } static cmd_result_t attr_get(struct ticket_config *tk, int fd, struct boothc_attr_msg *msg) { cmd_result_t rv = RLT_SUCCESS; struct boothc_hdr_msg hdr; struct geo_attr *a; GString *attr_val; /* * lookup attr * send value */ a = (struct geo_attr *)g_hash_table_lookup(tk->attr, msg->attr.name); if (!a) return RLT_NO_SUCH_ATTR; attr_val = g_string_new(NULL); if (!attr_val) { log_error("out of memory"); return RLT_SYNC_FAIL; } g_string_printf(attr_val, "%s\n", a->val); init_header(&hdr.header, ATTR_GET, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + attr_val->len); if (send_header_plus(fd, &hdr, attr_val->str, attr_val->len)) rv = RLT_SYNC_FAIL; if (attr_val) g_string_free(attr_val, FALSE); return rv; } static cmd_result_t attr_list(struct ticket_config *tk, int fd, struct boothc_attr_msg *msg) { GString *data; cmd_result_t rv; struct boothc_hdr_msg hdr; /* * list all attributes for the ticket * send the list */ data = g_string_sized_new(512); if (!data) { log_error("out of memory"); return RLT_SYNC_FAIL; } g_hash_table_foreach(tk->attr, append_attr, data); init_header(&hdr.header, ATTR_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + data->len); rv = send_header_plus(fd, &hdr, data->str, data->len); if (data) g_string_free(data, FALSE); return rv; } int process_attr_request(struct client *req_client, void *buf) { cmd_result_t rv = RLT_SYNC_FAIL; struct ticket_config *tk; int cmd; struct boothc_attr_msg *msg; struct boothc_hdr_msg hdr; msg = (struct boothc_attr_msg *)buf; cmd = ntohl(msg->header.cmd); if (!check_ticket(msg->attr.tkt_id, &tk)) { log_warn("client referenced unknown ticket %s", msg->attr.tkt_id); rv = RLT_INVALID_ARG; goto reply_now; } switch (cmd) { case ATTR_LIST: rv = attr_list(tk, req_client->fd, msg); if (rv) goto reply_now; return 1; case ATTR_GET: rv = attr_get(tk, req_client->fd, msg); if (rv) goto reply_now; return 1; case ATTR_SET: rv = attr_set(tk, msg); break; case ATTR_DEL: rv = attr_del(tk, msg); break; } reply_now: init_header(&hdr.header, CL_RESULT, 0, 0, rv, 0, sizeof(hdr)); send_header_plus(req_client->fd, &hdr, NULL, 0); return 1; } /* read attr message from another site */ /* this is a NOOP and it should never be invoked * only clients retrieve/manage attributes and they connect * directly to the target site */ int attr_recv(void *buf, struct booth_site *source) { struct boothc_attr_msg *msg; struct ticket_config *tk; msg = (struct boothc_attr_msg *)buf; log_warn("unexpected attribute message from %s", site_string(source)); if (!check_ticket(msg->attr.tkt_id, &tk)) { log_warn("got invalid ticket name %s from %s", msg->attr.tkt_id, site_string(source)); source->invalid_cnt++; - return 1; + return -1; } return 0; } diff --git a/src/raft.c b/src/raft.c index bc87329..914de3a 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,1000 +1,1000 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include "booth.h" #include "timer.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "request.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; tk_log_debug("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } inline static void record_vote(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { tk_log_debug("site %s votes for %s", site_string(who), site_string(vote)); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) tk_log_warn("%s voted previously " "for %s and now wants to vote for %s (ignored)", site_string(who), site_string(tk->votes_for[who->index]), site_string(vote)); } } static void update_term_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { uint32_t i; i = ntohl(msg->ticket.term); /* if we failed to start the election, then accept the term * from the leader * */ if (tk->state == ST_CANDIDATE) { tk->current_term = i; } else { tk->current_term = max(i, tk->current_term); } } static void set_ticket_expiry(struct ticket_config *tk, int duration) { set_future_time(&tk->term_expires, duration); } static void update_ticket_from_msg(struct ticket_config *tk, struct booth_site *sender, struct boothc_ticket_msg *msg) { int duration; tk_log_info("updating from %s (%d/%d)", site_string(sender), ntohl(msg->ticket.term), msg_term_time(msg)); duration = min(tk->term_duration, msg_term_time(msg)); set_ticket_expiry(tk, duration); update_term_from_msg(tk, msg); } static void copy_ticket_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { set_ticket_expiry(tk, msg_term_time(msg)); tk->current_term = ntohl(msg->ticket.term); } static void become_follower(struct ticket_config *tk, struct boothc_ticket_msg *msg) { copy_ticket_from_msg(tk, msg); set_state(tk, ST_FOLLOWER); time_reset(&tk->delay_commit); tk->in_election = 0; /* if we're following and the ticket was granted here * then commit to CIB right away (we're probably restarting) */ if (tk->is_granted) { disown_ticket(tk); ticket_write(tk); } } static void won_elections(struct ticket_config *tk) { set_leader(tk, local); set_state(tk, ST_LEADER); set_ticket_expiry(tk, tk->term_duration); time_reset(&tk->election_end); tk->voted_for = NULL; if (is_time_set(&tk->delay_commit) && all_sites_replied(tk)) { time_reset(&tk->delay_commit); tk_log_debug("reset delay commit as all sites replied"); } save_committed_tkt(tk); ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0); tk->ticket_updated = 0; } /* if more than one member got the same (and maximum within that * election) number of votes, then that is a tie */ static int is_tie(struct ticket_config *tk) { int i; struct booth_site *v; int count[MAX_NODES] = { 0, }; int max_votes = 0, max_cnt = 0; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; count[v->index]++; max_votes = max(max_votes, count[v->index]); } for(i=0; isite_count; i++) { if (count[i] == max_votes) max_cnt++; } return max_cnt > 1; } static struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v || v == no_leader) continue; n = v->index; count[n]++; tk_log_debug("Majority: %d %s wants %d %s => %d", i, site_string(&booth_conf->site[i]), n, site_string(v), count[n]); if (count[n]*2 <= booth_conf->site_count) continue; tk_log_debug("Majority reached: %d of %d for %s", count[n], booth_conf->site_count, site_string(v)); return v; } return NULL; } void elections_end(struct ticket_config *tk) { struct booth_site *new_leader; if (is_past(&tk->election_end)) { /* This is previous election timed out */ tk_log_info("elections finished"); } tk->in_election = 0; new_leader = majority_votes(tk); if (new_leader == local) { won_elections(tk); tk_log_info("granted successfully here"); } else if (new_leader) { tk_log_info("ticket granted at %s", site_string(new_leader)); } else { tk_log_info("nobody won elections, new elections"); tk->outcome = RLT_MORE; foreach_tkt_req(tk, notify_client); if (!new_election(tk, NULL, is_tie(tk) ? 2 : 0, OR_AGAIN)) { ticket_activate_timeout(tk); } } } static int newer_term(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg, int in_election) { uint32_t term; /* it may happen that we hear about our newer term */ if (leader == local) return 0; term = ntohl(msg->ticket.term); /* §5.1 */ if (term > tk->current_term) { set_state(tk, ST_FOLLOWER); if (!in_election) { set_leader(tk, leader); tk_log_info("from %s: higher term %d vs. %d, following %s", site_string(sender), term, tk->current_term, ticket_leader_string(tk)); } else { tk_log_debug("from %s: higher term %d vs. %d (election)", site_string(sender), term, tk->current_term); } tk->current_term = term; return 1; } return 0; } static int msg_term_invalid(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (is_term_invalid(tk, term)) { tk_log_info("got invalid term from %s " "(%d), ignoring", site_string(sender), term); return 1; } return 0; } static int term_too_low(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term < tk->current_term) { tk_log_info("sending reject to %s, its term too low " "(%d vs. %d)", site_string(sender), term, tk->current_term ); send_reject(sender, tk, RLT_TERM_OUTDATED, msg); return 1; } return 0; } /* For follower. */ static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; term = ntohl(msg->ticket.term); tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); if (term < tk->current_term) { if (sender == tk->leader) { tk_log_info("trusting leader %s with a lower term (%d vs %d)", site_string(leader), term, tk->current_term); } else if (is_owned(tk)) { tk_log_warn("different leader %s with a lower term " "(%d vs %d), sending reject", site_string(leader), term, tk->current_term); return send_reject(sender, tk, RLT_TERM_OUTDATED, msg); } } /* got heartbeat, no rejects expected anymore */ tk->expect_more_rejects = 0; /* Needed? */ newer_term(tk, sender, leader, msg, 0); become_follower(tk, msg); /* Racy??? */ assert(sender == leader || !leader); set_leader(tk, leader); /* Ack the heartbeat (we comply). */ return send_msg(OP_ACK, tk, sender, msg); } static int process_UPDATE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (is_owned(tk) && sender != tk->leader) { tk_log_warn("different leader %s wants to update " "our ticket, sending reject", site_string(leader)); return send_reject(sender, tk, RLT_TERM_OUTDATED, msg); } tk_log_debug("leader %s wants to update our ticket", site_string(leader)); become_follower(tk, msg); set_leader(tk, leader); ticket_write(tk); /* run ticket_cron if the ticket expires */ set_ticket_wakeup(tk); return send_msg(OP_ACK, tk, sender, msg); } static int process_REVOKE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int rv; if (tk->state == ST_INIT && tk->leader == no_leader) { /* assume that our ack got lost */ rv = send_msg(OP_ACK, tk, sender, msg); } else if (tk->leader != sender) { tk_log_error("%s wants to revoke ticket, " "but it is not granted there (ignoring)", site_string(sender)); - return 1; + return -1; } else if (tk->state != ST_FOLLOWER) { tk_log_error("unexpected ticket revoke from %s " "(in state %s) (ignoring)", site_string(sender), state_to_string(tk->state)); - return 1; + return -1; } else { tk_log_info("%s revokes ticket", site_string(tk->leader)); save_committed_tkt(tk); reset_ticket(tk); set_leader(tk, no_leader); ticket_write(tk); rv = send_msg(OP_ACK, tk, sender, msg); } return rv; } /* For leader. */ static int process_ACK( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; int req; term = ntohl(msg->ticket.term); if (newer_term(tk, sender, leader, msg, 0)) { /* unexpected higher term */ tk_log_warn("got higher term from %s (%d vs. %d)", site_string(sender), term, tk->current_term); return 0; } /* Don't send a reject. */ if (term < tk->current_term) { /* Doesn't know what he's talking about - perhaps * doesn't receive our packets? */ tk_log_warn("unexpected term " "from %s (%d vs. %d) (ignoring)", site_string(sender), term, tk->current_term); return 0; } /* if the ticket is to be revoked, further processing is not * interesting (and dangerous) */ if (tk->next_state == ST_INIT || tk->state == ST_INIT) return 0; req = ntohl(msg->header.request); if ((req == OP_UPDATE || req == OP_HEARTBEAT) && term == tk->current_term && leader == tk->leader) { if (majority_of_bits(tk, tk->acks_received)) { /* OK, at least half of the nodes are reachable; * Update the ticket and send update messages out */ return leader_update_ticket(tk); } } return 0; } static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (leader == no_leader) { /* leader wants to step down? */ if (sender == tk->leader && (tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) { tk_log_info("%s wants to give the ticket away (ticket release)", site_string(tk->leader)); save_committed_tkt(tk); reset_ticket(tk); set_state(tk, ST_FOLLOWER); if (local->type == SITE) { ticket_write(tk); schedule_election(tk, OR_STEPDOWN); } } else { tk_log_info("%s votes for none, ignoring (duplicate ticket release?)", site_string(sender)); } return 0; } if (tk->state != ST_CANDIDATE) { /* lost candidate status, somebody rejected our proposal */ tk_log_info("candidate status lost, ignoring VtFr from %s", site_string(sender)); return 0; } if (term_too_low(tk, sender, leader, msg)) return 0; if (newer_term(tk, sender, leader, msg, 0)) { clear_election(tk); } record_vote(tk, sender, leader); /* only if all voted can we take the ticket now, otherwise * wait for timeout in ticket_cron */ if (!tk->acks_expected) { /* §5.2 */ elections_end(tk); } return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t rv; rv = ntohl(msg->header.result); if (tk->state == ST_CANDIDATE && leader == local) { /* the sender has us as the leader (!) * the elections will time out, then we can try again */ tk_log_warn("ticket was granted to us " "(and we didn't know)"); tk->expect_more_rejects = 1; return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_OUTDATED) { tk_log_warn("ticket outdated (term %d), granted to %s", ntohl(msg->ticket.term), site_string(leader) ); set_leader(tk, leader); tk->expect_more_rejects = 1; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_STILL_VALID) { if (tk->lost_leader == leader) { if (tk->election_reason == OR_TKT_LOST) { tk_log_warn("%s still has the ticket valid, " "we'll backup a bit", site_string(sender)); } else { tk_log_warn("%s unexpectedly rejects elections", site_string(sender)); } } else { tk_log_warn("ticket was granted to %s " "(and we didn't know)", site_string(leader)); } set_leader(tk, leader); become_follower(tk, msg); tk->expect_more_rejects = 1; return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_YOU_OUTDATED) { set_leader(tk, leader); tk->expect_more_rejects = 1; if (leader && leader != no_leader) { tk_log_warn("our ticket is outdated, granted to %s", site_string(leader)); become_follower(tk, msg); } else { tk_log_warn("our ticket is outdated and revoked"); update_ticket_from_msg(tk, sender, msg); set_state(tk, ST_INIT); } return 0; } if (!tk->expect_more_rejects) { tk_log_warn("from %s: in state %s, got %s (unexpected reject)", site_string(sender), state_to_string(tk->state), state_to_string(rv)); } return 0; } static int ticket_seems_ok(struct ticket_config *tk) { int left; left = term_time_left(tk); if (!left) return 0; /* quite sure */ if (tk->state == ST_CANDIDATE) return 0; /* in state of flux */ if (tk->state == ST_LEADER) return 1; /* quite sure */ if (tk->state == ST_FOLLOWER && left >= tk->term_duration/3) return 1; /* almost quite sure */ return 0; } static int test_reason( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int reason; reason = ntohl(msg->header.reason); if (reason == OR_TKT_LOST) { if (tk->state == ST_INIT && tk->leader == no_leader) { tk_log_warn("%s claims that the ticket is lost, " "but it's in %s state (reject sent)", site_string(sender), state_to_string(tk->state) ); return RLT_YOU_OUTDATED; } if (ticket_seems_ok(tk)) { tk_log_warn("%s claims that the ticket is lost, " "but it is ok here (reject sent)", site_string(sender)); return RLT_TERM_STILL_VALID; } } return 0; } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int valid; struct boothc_ticket_msg omsg; cmd_result_t inappr_reason; int reason; inappr_reason = test_reason(tk, sender, leader, msg); if (inappr_reason) return send_reject(sender, tk, inappr_reason, msg); valid = term_time_left(tk); reason = ntohl(msg->header.reason); /* valid tickets are not allowed only if the sender thinks * the ticket got lost */ if (sender != tk->leader && valid && reason != OR_STEPDOWN) { tk_log_warn("election from %s with reason %s rejected " "(we have %s as ticket owner), ticket still valid for %ds", site_string(sender), state_to_string(reason), site_string(tk->leader), valid); return send_reject(sender, tk, RLT_TERM_STILL_VALID, msg); } if (term_too_low(tk, sender, leader, msg)) return 0; /* set this, so that we know not to send status for the * ticket */ tk->in_election = 1; /* reset ticket's leader on not valid tickets */ if (!valid) set_leader(tk, NULL); /* if it's a newer term or ... */ if (newer_term(tk, sender, leader, msg, 1)) { clear_election(tk); goto vote_for_sender; } /* ... we didn't vote yet, then vote for the sender */ /* §5.2, §5.4 */ if (!tk->voted_for) { vote_for_sender: tk->voted_for = sender; record_vote(tk, sender, leader); } init_ticket_msg(&omsg, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, 0, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); return booth_udp_send_auth(sender, &omsg, sendmsglen(&omsg)); } #define is_reason(r, tk) \ (reason == (r) || (reason == OR_AGAIN && (tk)->election_reason == (r))) int new_election(struct ticket_config *tk, struct booth_site *preference, int update_term, cmd_reason_t reason) { struct booth_site *new_leader; if (local->type != SITE) return 0; if ((is_reason(OR_TKT_LOST, tk) || is_reason(OR_STEPDOWN, tk)) && check_attr_prereq(tk, GRANT_AUTO)) { tk_log_info("attribute prerequisite not met, " "not starting elections"); return 0; } /* elections were already started, but not yet finished/timed out */ if (is_time_set(&tk->election_end) && !is_past(&tk->election_end)) return 1; if (ANYDEBUG) { int tdiff; if (is_time_set(&tk->election_end)) { tdiff = -time_left(&tk->election_end); tk_log_debug("starting elections, previous finished since " intfmt(tdiff)); } else { tk_log_debug("starting elections"); } tk_log_debug("elections caused by %s %s", state_to_string(reason), reason == OR_AGAIN ? state_to_string(tk->election_reason) : "" ); } /* §5.2 */ /* If there was _no_ answer, don't keep incrementing the term number * indefinitely. If there was no peer, there'll probably be no one * listening now either. However, we don't know if we were * invoked due to a timeout (caller does). */ /* increment the term only if either the current term was * valid or if there was a tie (in that case update_term > 1) */ if ((update_term > 1) || (update_term && tk->last_valid_tk && tk->last_valid_tk->current_term >= tk->current_term)) { /* save the previous term, we may need to send out the * MY_INDEX message */ if (tk->state != ST_CANDIDATE) { save_committed_tkt(tk); } tk->current_term++; } set_future_time(&tk->election_end, tk->timeout); tk->in_election = 1; tk_log_info("starting new election (term=%d)", tk->current_term); clear_election(tk); new_leader = preference ? preference : local; record_vote(tk, local, new_leader); tk->voted_for = new_leader; set_state(tk, ST_CANDIDATE); /* some callers may want just to repeat on timeout */ if (reason == OR_AGAIN) { reason = tk->election_reason; } else { tk->election_reason = reason; } ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason); add_random_delay(tk); return 0; } /* we were a leader and somebody says that they have a more up * to date ticket * there was probably connectivity loss * tricky */ static int leader_handle_newer_ticket( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { update_term_from_msg(tk, msg); if (leader != no_leader && leader && leader != local) { /* eek, two leaders, split brain */ /* normally shouldn't happen; run election */ tk_log_error("from %s: ticket granted to %s! (revoking locally)", site_string(sender), site_string(leader) ); } else if (term_time_left(tk)) { /* eek, two leaders, split brain */ /* normally shouldn't happen; run election */ tk_log_error("from %s: ticket granted to %s! (revoking locally)", site_string(sender), site_string(leader) ); } set_next_state(tk, ST_LEADER); return 0; } /* reply to STATUS */ static int process_MY_INDEX ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int i; int expired; expired = !msg_term_time(msg); /* test against the last valid(!) ticket we have */ i = my_last_term(tk) - ntohl(msg->ticket.term); if (i > 0) { /* let them know about our newer ticket */ send_msg(OP_MY_INDEX, tk, sender, msg); if (tk->state == ST_LEADER) { tk_log_info("sending ticket update to %s", site_string(sender)); return send_msg(OP_UPDATE, tk, sender, msg); } } /* we have a newer or equal ticket and theirs is expired, * nothing more to do here */ if (i >= 0 && expired) { return 0; } if (tk->state == ST_LEADER) { /* we're the leader, thread carefully */ if (expired) { /* if their ticket is expired, * nothing more to do */ return 0; } if (i < 0) { /* they have a newer ticket, trouble if we're already leader * for it */ tk_log_warn("from %s: more up to date ticket at %s", site_string(sender), site_string(leader) ); return leader_handle_newer_ticket(tk, sender, leader, msg); } else { /* we have the ticket and we don't care */ return 0; } } else if (tk->state == ST_CANDIDATE) { if (leader == local) { /* a belated MY_INDEX, we're already trying to get the * ticket */ return 0; } } /* their ticket is either newer or not expired, don't * ignore it */ update_ticket_from_msg(tk, sender, msg); set_leader(tk, leader); update_ticket_state(tk, sender); save_committed_tkt(tk); set_ticket_wakeup(tk); return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd, req; int rv; rv = 0; cmd = ntohl(msg->header.cmd); req = ntohl(msg->header.request); if (req) tk_log_debug("got %s (req %s) from %s", state_to_string(cmd), state_to_string(req), site_string(sender)); else tk_log_debug("got %s from %s", state_to_string(cmd), site_string(sender)); /* don't process tickets with invalid term */ if (cmd != OP_STATUS && msg_term_invalid(tk, sender, leader, msg)) return 0; switch (cmd) { case OP_REQ_VOTE: rv = answer_REQ_VOTE(tk, sender, leader, msg); break; case OP_VOTE_FOR: rv = process_VOTE_FOR(tk, sender, leader, msg); break; case OP_ACK: if (tk->leader == local && tk->state == ST_LEADER) rv = process_ACK(tk, sender, leader, msg); break; case OP_HEARTBEAT: if ((tk->leader != local || !term_time_left(tk)) && (tk->state == ST_INIT || tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) rv = answer_HEARTBEAT(tk, sender, leader, msg); else { tk_log_warn("unexpected message %s, from %s", state_to_string(cmd), site_string(sender)); if (ticket_seems_ok(tk)) send_reject(sender, tk, RLT_TERM_STILL_VALID, msg); rv = -EINVAL; } break; case OP_UPDATE: if (((tk->leader != local && tk->leader == leader) || !is_owned(tk)) && (tk->state == ST_INIT || tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) { rv = process_UPDATE(tk, sender, leader, msg); } else { tk_log_warn("unexpected message %s, from %s", state_to_string(cmd), site_string(sender)); if (ticket_seems_ok(tk)) send_reject(sender, tk, RLT_TERM_STILL_VALID, msg); rv = -EINVAL; } break; case OP_REJECTED: rv = process_REJECTED(tk, sender, leader, msg); break; case OP_REVOKE: rv = process_REVOKE(tk, sender, leader, msg); break; case OP_MY_INDEX: rv = process_MY_INDEX(tk, sender, leader, msg); break; case OP_STATUS: if (!tk->in_election) rv = send_msg(OP_MY_INDEX, tk, sender, msg); break; default: tk_log_error("unknown message %s, from %s", state_to_string(cmd), site_string(sender)); rv = -EINVAL; } return rv; } diff --git a/src/transport.c b/src/transport.c index 866c132..b3e4432 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,1092 +1,1109 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include +#include /* getnameinfo */ #include #include #include #include #include #include #include +#include /* getnameinfo */ #include "b_config.h" #include "attr.h" #include "auth.h" #include "booth.h" #include "config.h" #include "inline-fn.h" #include "log.h" #include "ticket.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 struct booth_site *local = NULL; - +/* function to be called when handling booth-group-internal messages; + * it's expected to return 0 to indicate success, negative integer + * to indicate silent (or possibly already complained about) error, + * or positive integer to indicate sender's ID that will then be + * emitted in the error log message together with the real source + * address if this is available */ static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } enum match_type { NO_MATCH = 0, FUZZY_MATCH, EXACT_MATCH, }; static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me, int *address_bits_matched) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; int matched; enum match_type did_match = NO_MATCH; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); for (i = 0; i < booth_conf->site_count; i++) { node = booth_conf->site + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); for(matched = 0; matched < node->addrlen; matched++) if (ipaddr[matched] != n_a[matched]) break; if (matched == node->addrlen) { *address_bits_matched = matched * 8; *me = node; did_match = EXACT_MATCH; break; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (matched < bytes) continue; if (matched * 8 < *address_bits_matched) continue; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) { /* _At_least_ prefixlen bits matched. */ if (did_match < EXACT_MATCH) { *address_bits_matched = prefixlen; *me = node; did_match = FUZZY_MATCH; } } } return did_match; } int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; int address_bits_matched; if (local) goto found; me = NULL; address_bits_matched = 0; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); /* prefer IFA_LOCAL if it exists, for p-t-p * interfaces, otherwise use IFA_ADDRESS */ if (tb[IFA_LOCAL]) { memcpy(ipaddr, RTA_DATA(tb[IFA_LOCAL]), BOOTH_IPADDR_LEN); } else { memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); } /* First try with exact addresses, then optionally with subnet matching. */ if (ifa->ifa_prefixlen > address_bits_matched) { find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me, &address_bits_matched); if (me) { log_debug("found myself at %s (%d bits matched)", site_string(me), address_bits_matched); } } } h = NLMSG_NEXT(h, status); } } close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_site **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", len_incl_data, l); return -EINVAL; } return len_incl_data; } static int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1 && errno == EWOULDBLOCK) break; if (rv == -1) return -1; off += rv; } return off; } static int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = send(fd, (char *)buf + off, count, MSG_NOSIGNAL); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("send failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } /* Only used for client requests (tcp) */ int read_client(struct client *req_cl) { char *msg; struct boothc_header *header; int rv, fd; int len = MAX_MSG_LEN; if (!req_cl->msg) { msg = malloc(MAX_MSG_LEN); if (!msg) { log_error("out of memory for client messages"); return -1; } req_cl->msg = (void *)msg; } else { msg = (char *)req_cl->msg; } header = (struct boothc_header *)msg; /* update len if we read enough */ if (req_cl->offset >= sizeof(*header)) { len = min(ntohl(header->length), MAX_MSG_LEN); } fd = req_cl->fd; rv = do_read(fd, msg+req_cl->offset, len-req_cl->offset); if (rv < 0) { if (errno == ECONNRESET) log_debug("client connection reset for fd %d", fd); return -1; } req_cl->offset += rv; /* update len if we read enough */ if (req_cl->offset >= sizeof(*header)) { len = min(ntohl(header->length), MAX_MSG_LEN); } if (req_cl->offset < len) { /* client promised to send more */ return 1; } if (check_boothc_header(header, len) < 0) { return -1; } return 0; } /* Only used for client requests (tcp) */ static void process_connection(int ci) { struct client *req_cl; void *msg = NULL; struct boothc_header *header; struct boothc_hdr_msg err_reply; cmd_result_t errc; void (*deadfn) (int ci); req_cl = clients + ci; switch (read_client(req_cl)) { case -1: /* error */ goto kill; case 1: /* more to read */ return; case 0: /* we can process the request now */ msg = req_cl->msg; } header = (struct boothc_header *)msg; if (check_auth(NULL, msg, ntohl(header->length))) { errc = RLT_AUTH; goto send_err; } /* For CMD_GRANT and CMD_REVOKE: * Don't close connection immediately, but send * result a second later? */ switch (ntohl(header->cmd)) { case CMD_LIST: ticket_answer_list(req_cl->fd); goto kill; case CMD_PEERS: list_peers(req_cl->fd); goto kill; case CMD_GRANT: case CMD_REVOKE: if (process_client_request(req_cl, msg) == 1) goto kill; /* request processed definitely, close connection */ else return; case ATTR_LIST: case ATTR_GET: case ATTR_SET: case ATTR_DEL: if (process_attr_request(req_cl, msg) == 1) goto kill; /* request processed definitely, close connection */ else return; default: log_error("connection %d cmd %x unknown", ci, ntohl(header->cmd)); errc = RLT_INVALID_ARG; goto send_err; } assert(0); return; send_err: init_header(&err_reply.header, CL_RESULT, 0, 0, errc, 0, sizeof(err_reply)); send_client_msg(req_cl->fd, &err_reply); kill: deadfn = req_cl->deadfn; if(deadfn) { deadfn(ci); } return; } static void process_tcp_listener(int ci) { int fd, i, flags, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } int setup_tcp_listener(int test_only) { int s, rv; int one = 1; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { close(s); log_error("failed to set the SO_REUSEADDR option"); return rv; } rv = bind(s, &local->sa6, local->saddrlen); if (test_only) { rv = (rv == -1) ? errno : 0; close(s); return rv; } if (rv == -1) { close(s); log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { close(s); log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (get_local_id() < 0) return -1; rv = setup_tcp_listener(0); if (rv < 0) return rv; client_add(rv, booth_transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connect to %s got a timeout", site_string(to)); else log_error("connect to %s got an error: %s", site_string(to), strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_site *to, void *buf, int len) { int rv; rv = add_hmac(buf, len); if (!rv) rv = do_write(to->tcp_fd, buf, len); return rv; } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) { log_error("read failed (%d): %s", errno, strerror(errno)); return got; } return got; } static int booth_tcp_recv_auth(struct booth_site *from, void *buf, int len) { int got, total; int payload_len; /* Needs timeouts! */ payload_len = len - sizeof(struct hmac); got = booth_tcp_recv(from, buf, payload_len); if (got < 0) { return got; } total = got; if (is_auth_req()) { got = booth_tcp_recv(from, (unsigned char *)buf+payload_len, sizeof(struct hmac)); if (got != sizeof(struct hmac) || check_auth(from, buf, len)) { return -1; } total += got; } return total; } static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } static int setup_udp_server(void) { int rv, fd; int one = 1; unsigned int recvbuf_size; fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { log_error("failed to set the SO_REUSEADDR option"); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", site_string(local), booth_conf->port, strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; /* beware, the buffer needs to be large enough to accept * a packet */ char buffer[MAX_MSG_LEN]; /* Used for unit tests */ struct boothc_ticket_msg *msg; sa_len = sizeof(sa); msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; - deliver_fn((void*)msg, rv); + rv = deliver_fn((void*)msg, rv); + if (rv > 0) { + if (getnameinfo((struct sockaddr *)&sa, sa_len, + buffer, sizeof(buffer), NULL, 0, + NI_NUMERICHOST) == 0) + log_error("unknown sender: %08x (real: %s)", rv, buffer); + else + log_error("unknown sender: %08x", rv); + } } static int booth_udp_init(void *f) { int rv; rv = setup_udp_server(); if (rv < 0) return rv; deliver_fn = f; client_add(local->udp_fd, booth_transport + UDP, process_recv, NULL); return 0; } int booth_udp_send(struct booth_site *to, void *buf, int len) { int rv; to->sent_cnt++; rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); if (rv == len) { rv = 0; } else if (rv < 0) { to->sent_err_cnt++; log_error("Cannot send to %s: %d %s", site_string(to), errno, strerror(errno)); } else { rv = -1; to->sent_err_cnt++; log_error("Packet sent to %s got truncated", site_string(to)); } return rv; } int booth_udp_send_auth(struct booth_site *to, void *buf, int len) { int rv; rv = add_hmac(buf, len); if (rv < 0) return rv; return booth_udp_send(to, buf, len); } static int booth_udp_broadcast_auth(void *buf, int len) { int i, rv, rvs; struct booth_site *site; if (!booth_conf || !booth_conf->site_count) return -1; rv = add_hmac(buf, len); if (rv < 0) return rv; rvs = 0; foreach_node(i, site) { if (site != local) { rv = booth_udp_send(site, buf, len); if (!rvs) rvs = rv; } } return rvs; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .recv_auth = booth_tcp_recv_auth, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, .send_auth = booth_udp_send_auth, .close = return_0_booth_site, .broadcast_auth = booth_udp_broadcast_auth, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; /* data + (datalen-sizeof(struct hmac)) points to struct hmac * i.e. struct hmac is always tacked on the payload */ int add_hmac(void *data, int len) { int rv = 0; #if HAVE_LIBGCRYPT || HAVE_LIBMHASH int payload_len; struct hmac *hp; if (!is_auth_req()) return 0; payload_len = len - sizeof(struct hmac); hp = (struct hmac *)((unsigned char *)data + payload_len); hp->hid = htonl(BOOTH_HASH); memset(hp->hash, 0, BOOTH_MAC_SIZE); rv = calc_hmac(data, payload_len, BOOTH_HASH, hp->hash, booth_conf->authkey, booth_conf->authkey_len); if (rv < 0) { log_error("internal error: cannot calculate mac"); } #endif return rv; } #if HAVE_LIBGCRYPT || HAVE_LIBMHASH /* TODO: we need some client identification for logging */ #define peer_string(p) (p ? site_string(p) : "client") /* verify the validity of timestamp from the header * the timestamp needs to be either greater than the one already * recorded for the site or, and this is checked for clients, * not to be older than booth_conf->maxtimeskew * update the timestamp for the site, if this packet is from a * site */ static int verify_ts(struct booth_site *from, void *buf, int len) { struct boothc_header *h; struct timeval tv, curr_tv, now; if (len < sizeof(*h)) { log_error("%s: packet too short", peer_string(from)); return -1; } h = (struct boothc_header *)buf; tv.tv_sec = ntohl(h->secs); tv.tv_usec = ntohl(h->usecs); if (from) { curr_tv.tv_sec = from->last_secs; curr_tv.tv_usec = from->last_usecs; if (timercmp(&tv, &curr_tv, >)) goto accept; log_warn("%s: packet timestamp older than previous one", site_string(from)); } gettimeofday(&now, NULL); now.tv_sec -= booth_conf->maxtimeskew; if (timercmp(&tv, &now, >)) goto accept; log_error("%s: packet timestamp older than %d seconds", peer_string(from), booth_conf->maxtimeskew); return -1; accept: if (from) { from->last_secs = tv.tv_sec; from->last_usecs = tv.tv_usec; } return 0; } #endif int check_auth(struct booth_site *from, void *buf, int len) { int rv = 0; #if HAVE_LIBGCRYPT || HAVE_LIBMHASH int payload_len; struct hmac *hp; if (!is_auth_req()) return 0; payload_len = len - sizeof(struct hmac); if (payload_len < 0) { log_error("%s: failed to authenticate, packet too short (size:%d)", peer_string(from), len); return -1; } hp = (struct hmac *)((unsigned char *)buf + payload_len); rv = verify_hmac(buf, payload_len, ntohl(hp->hid), hp->hash, booth_conf->authkey, booth_conf->authkey_len); if (!rv) { rv = verify_ts(from, buf, len); } if (rv != 0) { log_error("%s: failed to authenticate", peer_string(from)); } #endif return rv; } int send_data(int fd, void *data, int datalen) { int rv = 0; rv = add_hmac(data, datalen); if (!rv) rv = do_write(fd, data, datalen); return rv; } int send_header_plus(int fd, struct boothc_hdr_msg *msg, void *data, int len) { int rv; rv = send_data(fd, msg, sendmsglen(msg)-len); if (rv >= 0 && len) rv = do_write(fd, data, len); return rv; } -/* UDP message receiver. */ +/* UDP message receiver (see also deliver_fn declaration's comment) */ int message_recv(void *msg, int msglen) { uint32_t from; struct boothc_header *header; struct booth_site *source; header = (struct boothc_header *)msg; from = ntohl(header->from); if (!find_site_by_id(from, &source)) { - log_error("unknown sender: %08x", from); - return -1; + /* caller knows the actual source address, pass + the (assuredly) positive number and let it report */ + from = from ? from : ~from; /* avoid 0 (success) */ + return from & (~0U >> 1); /* avoid negative (error code} */ } time(&source->last_recv); source->recv_cnt++; if (check_boothc_header(header, msglen) < 0) { log_error("message from %s receive error", site_string(source)); source->recv_err_cnt++; return -1; } if (check_auth(source, msg, msglen)) { log_error("%s failed to authenticate", site_string(source)); source->sec_cnt++; return -1; } if (ntohl(header->opts) & BOOTH_OPT_ATTR) { /* not used, clients send/retrieve attributes directly * from sites */ return attr_recv(msg, source); } else { return ticket_recv(msg, source); } }