diff --git a/doc/Pacemaker_Explained/en-US/Ch-Alerts.txt b/doc/Pacemaker_Explained/en-US/Ch-Alerts.txt
index 648bff70b6..f5abddf871 100644
--- a/doc/Pacemaker_Explained/en-US/Ch-Alerts.txt
+++ b/doc/Pacemaker_Explained/en-US/Ch-Alerts.txt
@@ -1,335 +1,339 @@
= Alerts =
////
We prefer [[ch-alerts]], but older versions of asciidoc don't deal well
with that construct for chapter headings
////
anchor:ch-alerts[Chapter 7, Alerts]
indexterm:[Resource,Alerts]
'Alerts' may be configured to take some external action when a cluster event
occurs (node failure, resource starting or stopping, etc.).
== Alert Agents ==
As with resource agents, the cluster calls an external program (an
'alert agent') to handle alerts. The cluster passes information about the event
to the agent via environment variables. Agents can do anything
desired with this information (send an e-mail, log to a file,
update a monitoring system, etc.).
.Simple alert configuration
=====
[source,XML]
-----
-----
=====
In the example above, the cluster will call +my-script.sh+ for each event.
Multiple alert agents may be configured; the cluster will call all of them for
each event.
Alert agents will be called only on cluster nodes. They will be called for
events involving Pacemaker Remote nodes, but they will never be called _on_
those nodes.
== Alert Recipients ==
Usually alerts are directed towards a recipient. Thus each alert may be additionally configured with one or more recipients.
The cluster will call the agent separately for each recipient.
.Alert configuration with recipient
=====
[source,XML]
-----
-----
=====
In the above example, the cluster will call +my-script.sh+ for each event,
passing the recipient +some-address+ as an environment variable.
The recipient may be anything the alert agent can recognize --
an IP address, an e-mail address, a file name, whatever the particular
agent supports.
== Alert Meta-Attributes ==
As with resource agents, meta-attributes can be configured for alert agents
to affect how Pacemaker calls them.
.Meta-Attributes of an Alert
[width="95%",cols="m,1,2
-----
=====
In the above example, the +my-script.sh+ will get called twice for each event,
with each call using a 15-second timeout. One call will be passed the recipient
+someuser@example.com+ and a timestamp in the format +%D %H:%M+, while the
other call will be passed the recipient +otheruser@example.com+ and a timestamp
in the format +%c+.
== Alert Instance Attributes ==
As with resource agents, agent-specific configuration values may be configured
as instance attributes. These will be passed to the agent as additional
environment variables. The number, names and allowed values of these
instance attributes are completely up to the particular agent.
.Alert configuration with instance attributes
=====
[source,XML]
-----
-----
=====
== Using the Sample Alert Agents ==
Pacemaker provides several sample alert agents, installed in
+/usr/share/pacemaker/alerts+ by default.
While these sample scripts may be copied and used as-is, they are provided
mainly as templates to be edited to suit your purposes.
See their source code for the full set of instance attributes they support.
.Sending cluster events as SNMP traps
=====
[source,XML]
-----
-----
=====
.Sending cluster events as e-mails
=====
[source,XML]
-----
-----
=====
== Writing an Alert Agent ==
.Environment variables passed to alert agents
[width="95%",cols="m,2
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USAA
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "plugin.h"
#include "utils.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
struct corosync_api_v1 *pcmk_api = NULL;
uint32_t plugin_has_votes = 0;
uint32_t plugin_expected_votes = 2;
int use_mgmtd = 0;
int plugin_log_level = LOG_DEBUG;
char *local_uname = NULL;
int local_uname_len = 0;
char *local_cname = NULL;
int local_cname_len = 0;
uint32_t local_nodeid = 0;
char *ipc_channel_name = NULL;
static uint64_t local_born_on = 0;
uint64_t membership_seq = 0;
pthread_t pcmk_wait_thread;
gboolean use_mcp = FALSE;
gboolean wait_active = TRUE;
gboolean have_reliable_membership_id = FALSE;
GHashTable *ipc_client_list = NULL;
GHashTable *membership_list = NULL;
GHashTable *membership_notify_list = NULL;
#define MAX_RESPAWN 100
#define LOOPBACK_ID 16777343
#define crm_flag_none 0x00000000
#define crm_flag_members 0x00000001
struct crm_identify_msg_s {
cs_ipc_header_request_t header __attribute__ ((aligned(8)));
uint32_t id;
uint32_t pid;
int32_t votes;
uint32_t processes;
char uname[256];
char version[256];
uint64_t born_on;
} __attribute__ ((packed));
/* *INDENT-OFF* */
static crm_child_t pcmk_children[] = {
{ 0, crm_proc_none, crm_flag_none, 0, 0, FALSE, "none", NULL, NULL, NULL, NULL },
{ 0, crm_proc_plugin, crm_flag_none, 0, 0, FALSE, "ais", NULL, NULL, NULL, NULL },
{ 0, crm_proc_lrmd, crm_flag_none, 3, 0, TRUE, "lrmd", NULL, CRM_DAEMON_DIR"/lrmd", NULL, NULL },
{ 0, crm_proc_cib, crm_flag_members, 1, 0, TRUE, "cib", CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib", NULL, NULL },
{ 0, crm_proc_crmd, crm_flag_members, 6, 0, TRUE, "crmd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd", NULL, NULL },
{ 0, crm_proc_attrd, crm_flag_none, 4, 0, TRUE, "attrd", CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd", NULL, NULL },
{ 0, crm_proc_stonithd, crm_flag_none, 0, 0, TRUE, "stonithd", NULL, "/bin/false", NULL, NULL },
{ 0, crm_proc_pe, crm_flag_none, 5, 0, TRUE, "pengine", CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine", NULL, NULL },
{ 0, crm_proc_mgmtd, crm_flag_none, 7, 0, TRUE, "mgmtd", NULL, HB_DAEMON_DIR"/mgmtd", NULL, NULL },
{ 0, crm_proc_stonith_ng, crm_flag_members, 2, 0, TRUE, "stonith-ng", NULL, CRM_DAEMON_DIR"/stonithd", NULL, NULL },
};
/* *INDENT-ON* */
void send_cluster_id(void);
int send_plugin_msg_raw(const AIS_Message * ais_msg);
char *pcmk_generate_membership_data(void);
gboolean check_message_sanity(const AIS_Message * msg, const char *data);
typedef const void ais_void_ptr;
int pcmk_shutdown(void);
void pcmk_peer_update(enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id);
int pcmk_startup(struct corosync_api_v1 *corosync_api);
int pcmk_config_init(struct corosync_api_v1 *corosync_api);
int pcmk_ipc_exit(void *conn);
int pcmk_ipc_connect(void *conn);
void pcmk_ipc(void *conn, ais_void_ptr * msg);
void pcmk_exec_dump(void);
void pcmk_cluster_swab(void *msg);
void pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid);
void pcmk_nodeid(void *conn, ais_void_ptr * msg);
void pcmk_nodes(void *conn, ais_void_ptr * msg);
void pcmk_notify(void *conn, ais_void_ptr * msg);
void pcmk_remove_member(void *conn, ais_void_ptr * msg);
void pcmk_quorum(void *conn, ais_void_ptr * msg);
void pcmk_cluster_id_swab(void *msg);
void pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid);
void ais_remove_peer(char *node_id);
void ais_remove_peer_by_name(const char *node_name);
static uint32_t
get_process_list(void)
{
int lpc = 0;
uint32_t procs = crm_proc_plugin;
if (use_mcp) {
return 0;
}
for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) {
if (pcmk_children[lpc].pid != 0) {
procs |= pcmk_children[lpc].flag;
}
}
return procs;
}
static struct corosync_lib_handler pcmk_lib_service[] = {
{ /* 0 - crm_class_cluster */
.lib_handler_fn = pcmk_ipc,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
},
{ /* 1 - crm_class_members */
.lib_handler_fn = pcmk_nodes,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
},
{ /* 2 - crm_class_notify */
.lib_handler_fn = pcmk_notify,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
},
{ /* 3 - crm_class_nodeid */
.lib_handler_fn = pcmk_nodeid,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
},
{ /* 4 - crm_class_rmpeer */
.lib_handler_fn = pcmk_remove_member,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
},
{ /* 5 - crm_class_quorum */
.lib_handler_fn = pcmk_quorum,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
},
};
static struct corosync_exec_handler pcmk_exec_service[] = {
{ /* 0 */
.exec_handler_fn = pcmk_cluster_callback,
.exec_endian_convert_fn = pcmk_cluster_swab},
{ /* 1 */
.exec_handler_fn = pcmk_cluster_id_callback,
.exec_endian_convert_fn = pcmk_cluster_id_swab}
};
/*
* Exports the interface for the service
*/
/* *INDENT-OFF* */
struct corosync_service_engine pcmk_service_handler = {
.name = (char *)"Pacemaker Cluster Manager "PACEMAKER_VERSION,
.id = PCMK_SERVICE_ID,
.private_data_size = 0,
.flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
.allow_inquorate = CS_LIB_ALLOW_INQUORATE,
.lib_init_fn = pcmk_ipc_connect,
.lib_exit_fn = pcmk_ipc_exit,
.exec_init_fn = pcmk_startup,
.exec_exit_fn = pcmk_shutdown,
.config_init_fn = pcmk_config_init,
.priority = 50,
.lib_engine = pcmk_lib_service,
.lib_engine_count = sizeof (pcmk_lib_service) / sizeof (struct corosync_lib_handler),
.exec_engine = pcmk_exec_service,
.exec_engine_count = sizeof (pcmk_exec_service) / sizeof (struct corosync_exec_handler),
.confchg_fn = pcmk_peer_update,
.exec_dump_fn = pcmk_exec_dump,
/* void (*sync_init) (void); */
/* int (*sync_process) (void); */
/* void (*sync_activate) (void); */
/* void (*sync_abort) (void); */
};
/*
* Dynamic Loader definition
*/
struct corosync_service_engine *pcmk_get_handler_ver0 (void);
struct corosync_service_engine_iface_ver0 pcmk_service_handler_iface = {
.corosync_get_service_engine_ver0 = pcmk_get_handler_ver0
};
static struct lcr_iface openais_pcmk_ver0[2] = {
{
.name = "pacemaker",
.version = 0,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = NULL
},
{
.name = "pacemaker",
.version = 1,
.versions_replace = 0,
.versions_replace_count = 0,
.dependencies = 0,
.dependency_count = 0,
.constructor = NULL,
.destructor = NULL,
.interfaces = NULL
}
};
static struct lcr_comp pcmk_comp_ver0 = {
.iface_count = 2,
.ifaces = openais_pcmk_ver0
};
/* *INDENT-ON* */
struct corosync_service_engine *
pcmk_get_handler_ver0(void)
{
return (&pcmk_service_handler);
}
__attribute__ ((constructor))
static void
register_this_component(void)
{
lcr_interfaces_set(&openais_pcmk_ver0[0], &pcmk_service_handler_iface);
lcr_interfaces_set(&openais_pcmk_ver0[1], &pcmk_service_handler_iface);
lcr_component_register(&pcmk_comp_ver0);
}
static int
plugin_has_quorum(void)
{
if ((plugin_expected_votes >> 1) < plugin_has_votes) {
return 1;
}
return 0;
}
static void
update_expected_votes(int value)
{
if (value < plugin_has_votes) {
/* Never drop below the number of connected nodes */
ais_info("Cannot update expected quorum votes %d -> %d:"
" value cannot be less that the current number of votes",
plugin_expected_votes, value);
} else if (plugin_expected_votes != value) {
ais_info("Expected quorum votes %d -> %d", plugin_expected_votes, value);
plugin_expected_votes = value;
}
}
/* Create our own local copy of the config so we can navigate it */
static void
process_ais_conf(void)
{
char *value = NULL;
gboolean any_log = FALSE;
hdb_handle_t top_handle = 0;
hdb_handle_t local_handle = 0;
ais_info("Reading configure");
top_handle = config_find_init(pcmk_api, "logging");
local_handle = config_find_next(pcmk_api, "logging", top_handle);
get_config_opt(pcmk_api, local_handle, "debug", &value, "on");
if (ais_get_boolean(value)) {
plugin_log_level = LOG_DEBUG;
pcmk_env.debug = "1";
} else {
plugin_log_level = LOG_INFO;
pcmk_env.debug = "0";
}
get_config_opt(pcmk_api, local_handle, "to_logfile", &value, "off");
if (ais_get_boolean(value)) {
get_config_opt(pcmk_api, local_handle, "logfile", &value, NULL);
if (value == NULL) {
ais_err("Logging to a file requested but no log file specified");
} else {
uid_t pcmk_uid = geteuid();
uid_t pcmk_gid = getegid();
FILE *logfile = fopen(value, "a");
if (logfile) {
int ignore = 0;
int logfd = fileno(logfile);
pcmk_env.logfile = value;
/* Ensure the file has the correct permissions */
ignore = fchown(logfd, pcmk_uid, pcmk_gid);
ignore = fchmod(logfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
if (ignore < 0) {
fprintf(logfile, "Could not set r/w permissions for uid=%d, gid=%d on %s\n",
pcmk_uid, pcmk_gid, value);
} else {
fprintf(logfile, "Set r/w permissions for uid=%d, gid=%d on %s\n",
pcmk_uid, pcmk_gid, value);
}
fflush(logfile);
fsync(logfd);
fclose(logfile);
any_log = TRUE;
} else {
ais_err("Couldn't create logfile: %s", value);
}
}
}
get_config_opt(pcmk_api, local_handle, "to_syslog", &value, "on");
if (any_log && ais_get_boolean(value) == FALSE) {
ais_info("User configured file based logging and explicitly disabled syslog.");
value = "none";
} else {
if (ais_get_boolean(value) == FALSE) {
ais_err
("Please enable some sort of logging, either 'to_file: on' or 'to_syslog: on'.");
ais_err("If you use file logging, be sure to also define a value for 'logfile'");
}
get_config_opt(pcmk_api, local_handle, "syslog_facility", &value, "daemon");
}
pcmk_env.syslog = value;
config_find_done(pcmk_api, local_handle);
top_handle = config_find_init(pcmk_api, "quorum");
local_handle = config_find_next(pcmk_api, "quorum", top_handle);
get_config_opt(pcmk_api, local_handle, "provider", &value, NULL);
if (value && ais_str_eq("quorum_cman", value)) {
pcmk_env.quorum = "cman";
} else {
pcmk_env.quorum = "pcmk";
}
top_handle = config_find_init(pcmk_api, "service");
local_handle = config_find_next(pcmk_api, "service", top_handle);
while (local_handle) {
value = NULL;
pcmk_api->object_key_get(local_handle, "name", strlen("name"), (void **)&value, NULL);
if (ais_str_eq("pacemaker", value)) {
break;
}
local_handle = config_find_next(pcmk_api, "service", top_handle);
}
get_config_opt(pcmk_api, local_handle, "ver", &value, "0");
if (ais_str_eq(value, "1")) {
ais_info("Enabling MCP mode: Use the Pacemaker init script to complete Pacemaker startup");
use_mcp = TRUE;
}
get_config_opt(pcmk_api, local_handle, "clustername", &local_cname, "pcmk");
local_cname_len = strlen(local_cname);
get_config_opt(pcmk_api, local_handle, "use_logd", &value, "no");
pcmk_env.use_logd = value;
get_config_opt(pcmk_api, local_handle, "use_mgmtd", &value, "no");
if (ais_get_boolean(value) == FALSE) {
int lpc = 0;
for (; lpc < SIZEOF(pcmk_children); lpc++) {
if (crm_proc_mgmtd & pcmk_children[lpc].flag) {
/* Disable mgmtd startup */
pcmk_children[lpc].start_seq = 0;
break;
}
}
}
config_find_done(pcmk_api, local_handle);
}
int
pcmk_config_init(struct corosync_api_v1 *unused)
{
return 0;
}
static void *
pcmk_wait_dispatch(void *arg)
{
struct timespec waitsleep = {
.tv_sec = 1,
.tv_nsec = 0
};
while (wait_active) {
int lpc = 0;
for (; lpc < SIZEOF(pcmk_children); lpc++) {
if (pcmk_children[lpc].pid > 0) {
int status;
pid_t pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
if (pid == 0) {
continue;
} else if (pid < 0) {
ais_perror("Call to wait4(%s) failed", pcmk_children[lpc].name);
continue;
}
/* cleanup */
pcmk_children[lpc].pid = 0;
pcmk_children[lpc].conn = NULL;
pcmk_children[lpc].async_conn = NULL;
if (WIFSIGNALED(status)) {
int sig = WTERMSIG(status);
ais_err("Child process %s terminated with signal %d"
" (pid=%d, core=%s)",
pcmk_children[lpc].name, sig, pid,
WCOREDUMP(status) ? "true" : "false");
} else if (WIFEXITED(status)) {
int rc = WEXITSTATUS(status);
do_ais_log(rc == 0 ? LOG_NOTICE : LOG_ERR,
"Child process %s exited (pid=%d, rc=%d)", pcmk_children[lpc].name,
pid, rc);
if (rc == 100) {
ais_notice("Child process %s no longer wishes"
" to be respawned", pcmk_children[lpc].name);
pcmk_children[lpc].respawn = FALSE;
}
}
/* Broadcast the fact that one of our processes died
*
* Try to get some logging of the cause out first though
* because we're probably about to get fenced
*
* Potentially do this only if respawn_count > N
* to allow for local recovery
*/
send_cluster_id();
pcmk_children[lpc].respawn_count += 1;
if (pcmk_children[lpc].respawn_count > MAX_RESPAWN) {
ais_err("Child respawn count exceeded by %s", pcmk_children[lpc].name);
pcmk_children[lpc].respawn = FALSE;
}
if (pcmk_children[lpc].respawn) {
ais_notice("Respawning failed child process: %s", pcmk_children[lpc].name);
spawn_child(&(pcmk_children[lpc]));
}
send_cluster_id();
}
}
sched_yield();
nanosleep(&waitsleep, 0);
}
return 0;
}
static uint32_t
pcmk_update_nodeid(void)
{
int last = local_nodeid;
local_nodeid = pcmk_api->totem_nodeid_get();
if (last != local_nodeid) {
if (last == 0) {
ais_info("Local node id: %u", local_nodeid);
} else {
char *last_s = NULL;
ais_malloc0(last_s, 32);
ais_warn("Detected local node id change: %u -> %u", last, local_nodeid);
snprintf(last_s, 31, "%u", last);
ais_remove_peer(last_s);
ais_free(last_s);
}
update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL);
}
return local_nodeid;
}
static void
build_path(const char *path_c, mode_t mode)
{
int offset = 1, len = 0;
char *path = ais_strdup(path_c);
AIS_CHECK(path != NULL, return);
for (len = strlen(path); offset < len; offset++) {
if (path[offset] == '/') {
path[offset] = 0;
if (mkdir(path, mode) < 0 && errno != EEXIST) {
ais_perror("Could not create directory '%s'", path);
break;
}
path[offset] = '/';
}
}
if (mkdir(path, mode) < 0 && errno != EEXIST) {
ais_perror("Could not create directory '%s'", path);
}
ais_free(path);
}
int
pcmk_startup(struct corosync_api_v1 *init_with)
{
int rc = 0;
int lpc = 0;
int start_seq = 1;
struct utsname us;
struct rlimit cores;
static int max = SIZEOF(pcmk_children);
uid_t pcmk_uid = 0;
gid_t pcmk_gid = 0;
uid_t root_uid = -1;
uid_t cs_uid = geteuid();
pcmk_user_lookup("root", &root_uid, NULL);
pcmk_api = init_with;
pcmk_env.debug = "0";
pcmk_env.logfile = NULL;
pcmk_env.use_logd = "false";
pcmk_env.syslog = "daemon";
if (cs_uid != root_uid) {
ais_err("Corosync must be configured to start as 'root',"
" otherwise Pacemaker cannot manage services."
" Expected %d got %d", root_uid, cs_uid);
return -1;
}
process_ais_conf();
membership_list = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, destroy_ais_node);
membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal);
ipc_client_list = g_hash_table_new(g_direct_hash, g_direct_equal);
ais_info("CRM: Initialized");
log_printf(LOG_INFO, "Logging: Initialized %s\n", __FUNCTION__);
rc = getrlimit(RLIMIT_CORE, &cores);
if (rc < 0) {
ais_perror("Cannot determine current maximum core size.");
} else {
if (cores.rlim_max == 0 && geteuid() == 0) {
cores.rlim_max = RLIM_INFINITY;
} else {
ais_info("Maximum core file size is: %lu", cores.rlim_max);
}
cores.rlim_cur = cores.rlim_max;
rc = setrlimit(RLIMIT_CORE, &cores);
if (rc < 0) {
ais_perror("Core file generation will remain disabled."
" Core files are an important diagnositic tool,"
" please consider enabling them by default.");
}
#if 0
/* system() is not thread-safe, can't call from here
* Actually, its a pretty hacky way to try and achieve this anyway
*/
if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) {
ais_perror("Could not enable /proc/sys/kernel/core_uses_pid");
}
#endif
}
if (pcmk_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) {
ais_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER);
return TRUE;
}
rc = mkdir(CRM_STATE_DIR, 0750);
rc = chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid);
/* Used by stonithd */
build_path(HA_STATE_DIR "/heartbeat", 0755);
/* Used by RAs - Leave owned by root */
build_path(CRM_RSCTMP_DIR, 0755);
rc = uname(&us);
AIS_ASSERT(rc == 0);
local_uname = ais_strdup(us.nodename);
local_uname_len = strlen(local_uname);
ais_info("Service: %d", PCMK_SERVICE_ID);
ais_info("Local hostname: %s", local_uname);
pcmk_update_nodeid();
if (use_mcp == FALSE) {
pthread_create(&pcmk_wait_thread, NULL, pcmk_wait_dispatch, NULL);
for (start_seq = 1; start_seq < max; start_seq++) {
/* dont start anything with start_seq < 1 */
for (lpc = 0; lpc < max; lpc++) {
if (start_seq == pcmk_children[lpc].start_seq) {
spawn_child(&(pcmk_children[lpc]));
}
}
}
}
return 0;
}
/*
static void ais_print_node(const char *prefix, struct totem_ip_address *host)
{
int len = 0;
char *buffer = NULL;
ais_malloc0(buffer, INET6_ADDRSTRLEN+1);
inet_ntop(host->family, host->addr, buffer, INET6_ADDRSTRLEN);
len = strlen(buffer);
ais_info("%s: %.*s", prefix, len, buffer);
ais_free(buffer);
}
*/
#if 0
/* copied here for reference from exec/totempg.c */
char *
totempg_ifaces_print(unsigned int nodeid)
{
static char iface_string[256 * INTERFACE_MAX];
char one_iface[64];
struct totem_ip_address interfaces[INTERFACE_MAX];
char **status;
unsigned int iface_count;
unsigned int i;
int res;
iface_string[0] = '\0';
res = totempg_ifaces_get(nodeid, interfaces, &status, &iface_count);
if (res == -1) {
return ("no interface found for nodeid");
}
for (i = 0; i < iface_count; i++) {
sprintf(one_iface, "r(%d) ip(%s), ", i, totemip_print(&interfaces[i]));
strcat(iface_string, one_iface);
}
return (iface_string);
}
#endif
static void
ais_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data)
{
int *changed = user_data;
crm_node_t *node = value;
if (node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) {
ais_info("Node %s was not seen in the previous transition", node->uname);
*changed += update_member(node->id, 0, membership_seq, node->votes,
node->processes, node->uname, CRM_NODE_LOST, NULL);
}
}
void
pcmk_peer_update(enum totem_configuration_type configuration_type,
const unsigned int *member_list, size_t member_list_entries,
const unsigned int *left_list, size_t left_list_entries,
const unsigned int *joined_list, size_t joined_list_entries,
const struct memb_ring_id *ring_id)
{
int lpc = 0;
int changed = 0;
int do_update = 0;
AIS_ASSERT(ring_id != NULL);
switch (configuration_type) {
case TOTEM_CONFIGURATION_REGULAR:
do_update = 1;
break;
case TOTEM_CONFIGURATION_TRANSITIONAL:
break;
}
membership_seq = ring_id->seq;
ais_notice("%s membership event on ring %lld: memb=%ld, new=%ld, lost=%ld",
do_update ? "Stable" : "Transitional", ring_id->seq,
(long)member_list_entries, (long)joined_list_entries, (long)left_list_entries);
if (do_update == 0) {
for (lpc = 0; lpc < joined_list_entries; lpc++) {
const char *prefix = "new: ";
uint32_t nodeid = joined_list[lpc];
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
for (lpc = 0; lpc < member_list_entries; lpc++) {
const char *prefix = "memb:";
uint32_t nodeid = member_list[lpc];
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
for (lpc = 0; lpc < left_list_entries; lpc++) {
const char *prefix = "lost:";
uint32_t nodeid = left_list[lpc];
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
return;
}
for (lpc = 0; lpc < joined_list_entries; lpc++) {
const char *prefix = "NEW: ";
uint32_t nodeid = joined_list[lpc];
crm_node_t *node = NULL;
changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid));
if (node->addr == NULL) {
const char *addr = totempg_ifaces_print(nodeid);
node->addr = ais_strdup(addr);
ais_debug("Node %u has address %s", nodeid, node->addr);
}
}
for (lpc = 0; lpc < member_list_entries; lpc++) {
const char *prefix = "MEMB:";
uint32_t nodeid = member_list[lpc];
changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
for (lpc = 0; lpc < left_list_entries; lpc++) {
const char *prefix = "LOST:";
uint32_t nodeid = left_list[lpc];
changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL);
ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
}
if (changed && joined_list_entries == 0 && left_list_entries == 0) {
ais_err("Something strange happened: %d", changed);
changed = 0;
}
ais_trace("Reaping unseen nodes...");
g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed);
if (member_list_entries > 1) {
/* Used to set born-on in send_cluster_id())
* We need to wait until we have at least one peer since first
* membership id is based on the one before we stopped and isn't reliable
*/
have_reliable_membership_id = TRUE;
}
if (changed) {
ais_debug("%d nodes changed", changed);
pcmk_update_nodeid();
send_member_notification();
}
send_cluster_id();
}
int
pcmk_ipc_exit(void *conn)
{
int lpc = 0;
const char *client = NULL;
void *async_conn = conn;
for (; lpc < SIZEOF(pcmk_children); lpc++) {
if (pcmk_children[lpc].conn == conn) {
if (wait_active == FALSE) {
/* Make sure the shutdown loop exits */
pcmk_children[lpc].pid = 0;
}
pcmk_children[lpc].conn = NULL;
pcmk_children[lpc].async_conn = NULL;
client = pcmk_children[lpc].name;
break;
}
}
g_hash_table_remove(membership_notify_list, async_conn);
g_hash_table_remove(ipc_client_list, async_conn);
if (client) {
do_ais_log(LOG_INFO, "Client %s (conn=%p, async-conn=%p) left", client, conn, async_conn);
} else {
do_ais_log((LOG_DEBUG + 1), "Client %s (conn=%p, async-conn=%p) left",
"unknown-transient", conn, async_conn);
}
return (0);
}
int
pcmk_ipc_connect(void *conn)
{
/* Corosync hasn't finished setting up the connection at this point
* Sending messages now messes up the protocol!
*/
return (0);
}
/*
* Executive message handlers
*/
void
pcmk_cluster_swab(void *msg)
{
AIS_Message *ais_msg = msg;
ais_trace("Performing endian conversion...");
ais_msg->id = swab32(ais_msg->id);
ais_msg->size = swab32(ais_msg->size);
ais_msg->is_compressed = swab32(ais_msg->is_compressed);
ais_msg->compressed_size = swab32(ais_msg->compressed_size);
ais_msg->host.id = swab32(ais_msg->host.id);
ais_msg->host.pid = swab32(ais_msg->host.pid);
ais_msg->host.type = swab32(ais_msg->host.type);
ais_msg->host.size = swab32(ais_msg->host.size);
ais_msg->host.local = swab32(ais_msg->host.local);
ais_msg->sender.id = swab32(ais_msg->sender.id);
ais_msg->sender.pid = swab32(ais_msg->sender.pid);
ais_msg->sender.type = swab32(ais_msg->sender.type);
ais_msg->sender.size = swab32(ais_msg->sender.size);
ais_msg->sender.local = swab32(ais_msg->sender.local);
ais_msg->header.size = swab32(ais_msg->header.size);
ais_msg->header.id = swab32(ais_msg->header.id);
ais_msg->header.error = swab32(ais_msg->header.error);
}
void
pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid)
{
const AIS_Message *ais_msg = message;
ais_trace("Message from node %u (%s)", nodeid, nodeid == local_nodeid ? "local" : "remote");
/* Shouldn't be required...
update_member(
ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL);
*/
if (ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) {
route_ais_message(ais_msg, FALSE);
} else {
ais_trace("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)",
ais_msg->id, ais_dest(&(ais_msg->host)),
msg_type2text(ais_msg->host.type),
ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type));
}
}
void
pcmk_cluster_id_swab(void *msg)
{
struct crm_identify_msg_s *ais_msg = msg;
ais_trace("Performing endian conversion...");
ais_msg->id = swab32(ais_msg->id);
ais_msg->pid = swab32(ais_msg->pid);
ais_msg->votes = swab32(ais_msg->votes);
ais_msg->processes = swab32(ais_msg->processes);
ais_msg->born_on = swab64(ais_msg->born_on);
ais_msg->header.size = swab32(ais_msg->header.size);
ais_msg->header.id = swab32(ais_msg->header.id);
}
void
pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid)
{
int changed = 0;
const struct crm_identify_msg_s *msg = message;
if (nodeid != msg->id) {
ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id);
return;
}
ais_debug("Node update: %s (%s)", msg->uname, msg->version);
changed =
update_member(nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname,
NULL, msg->version);
if (changed) {
send_member_notification();
}
}
struct res_overlay {
cs_ipc_header_response_t header __attribute((aligned(8)));
char buf[4096];
};
struct res_overlay *res_overlay = NULL;
static void
send_ipc_ack(void *conn)
{
if (res_overlay == NULL) {
ais_malloc0(res_overlay, sizeof(struct res_overlay));
}
res_overlay->header.id = CRM_MESSAGE_IPC_ACK;
res_overlay->header.size = sizeof(cs_ipc_header_response_t);
res_overlay->header.error = CS_OK;
pcmk_api->ipc_response_send(conn, res_overlay, res_overlay->header.size);
}
/* local callbacks */
void
pcmk_ipc(void *conn, ais_void_ptr * msg)
{
AIS_Message *mutable;
int type = 0;
gboolean transient = TRUE;
const AIS_Message *ais_msg = (const AIS_Message *)msg;
void *async_conn = conn;
ais_trace("Message from client %p", conn);
if (check_message_sanity(msg, ((const AIS_Message *)msg)->data) == FALSE) {
/* The message is corrupted - ignore */
send_ipc_ack(conn);
msg = NULL;
return;
}
/* Make a copy of the message here and ACK it
* The message is only valid until a response is sent
* but the response must also be sent _before_ we send anything else
*/
mutable = ais_msg_copy(ais_msg);
AIS_ASSERT(check_message_sanity(mutable, mutable->data));
type = mutable->sender.type;
ais_trace
("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d",
type, mutable->host.local, pcmk_children[type].conn, mutable->host.type, crm_msg_ais,
mutable->sender.pid, pcmk_children[type].pid, ((int)SIZEOF(pcmk_children)));
if (type > crm_msg_none && type < SIZEOF(pcmk_children)) {
/* known child process */
transient = FALSE;
}
#if 0
/* If this check fails, the order of pcmk_children probably
* doesn't match that of the crm_ais_msg_types enum
*/
AIS_CHECK(transient || mutable->sender.pid == pcmk_children[type].pid,
ais_err("Sender: %d, child[%d]: %d", mutable->sender.pid, type,
pcmk_children[type].pid);
ais_free(mutable);
return);
#endif
if (transient == FALSE
&& type > crm_msg_none
&& mutable->host.local
&& pcmk_children[type].conn == NULL && mutable->host.type == crm_msg_ais) {
AIS_CHECK(mutable->sender.type != mutable->sender.pid,
ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
ais_info("Recorded connection %p for %s/%d",
conn, pcmk_children[type].name, pcmk_children[type].pid);
pcmk_children[type].conn = conn;
pcmk_children[type].async_conn = async_conn;
/* Make sure they have the latest membership */
if (pcmk_children[type].flags & crm_flag_members) {
char *update = pcmk_generate_membership_data();
g_hash_table_replace(membership_notify_list, async_conn, async_conn);
ais_info("Sending membership update " U64T " to %s",
membership_seq, pcmk_children[type].name);
send_client_msg(async_conn, crm_class_members, crm_msg_none, update);
}
} else if (transient) {
AIS_CHECK(mutable->sender.type == mutable->sender.pid,
ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
g_hash_table_replace(ipc_client_list, async_conn, GUINT_TO_POINTER(mutable->sender.pid));
}
mutable->sender.id = local_nodeid;
mutable->sender.size = local_uname_len;
memset(mutable->sender.uname, 0, MAX_NAME);
memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
route_ais_message(mutable, TRUE);
send_ipc_ack(conn);
msg = NULL;
ais_free(mutable);
}
int
pcmk_shutdown(void)
{
int lpc = 0;
static int phase = 0;
static int max_wait = 0;
static time_t next_log = 0;
static int max = SIZEOF(pcmk_children);
if (use_mcp) {
if (pcmk_children[crm_msg_crmd].conn || pcmk_children[crm_msg_stonith_ng].conn) {
time_t now = time(NULL);
if (now > next_log) {
next_log = now + 300;
ais_notice
("Preventing Corosync shutdown. Please ensure Pacemaker is stopped first.");
}
return -1;
}
ais_notice("Unloading Pacemaker plugin");
return 0;
}
if (phase == 0) {
ais_notice("Shutting down Pacemaker");
phase = max;
}
wait_active = FALSE; /* stop the wait loop */
for (; phase > 0; phase--) {
/* dont stop anything with start_seq < 1 */
for (lpc = max - 1; lpc >= 0; lpc--) {
if (phase != pcmk_children[lpc].start_seq) {
continue;
}
if (pcmk_children[lpc].pid) {
pid_t pid = 0;
int status = 0;
time_t now = time(NULL);
if (pcmk_children[lpc].respawn) {
max_wait = 5; /* 5 * 30s = 2.5 minutes... plenty once the crmd is gone */
next_log = now + 30;
pcmk_children[lpc].respawn = FALSE;
stop_child(&(pcmk_children[lpc]), SIGTERM);
}
pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
if (pid < 0) {
ais_perror("Call to wait4(%s/%d) failed - treating it as stopped",
pcmk_children[lpc].name, pcmk_children[lpc].pid);
} else if (pid == 0) {
if (now >= next_log) {
max_wait--;
next_log = now + 30;
ais_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...",
pcmk_children[lpc].name, pcmk_children[lpc].pid,
pcmk_children[lpc].start_seq);
if (max_wait <= 0 && phase < pcmk_children[crm_msg_crmd].start_seq) {
ais_err("Child %s taking too long to terminate, sending SIGKILL",
pcmk_children[lpc].name);
stop_child(&(pcmk_children[lpc]), SIGKILL);
}
}
/* Return control to corosync */
return -1;
}
}
/* cleanup */
ais_notice("%s confirmed stopped", pcmk_children[lpc].name);
pcmk_children[lpc].async_conn = NULL;
pcmk_children[lpc].conn = NULL;
pcmk_children[lpc].pid = 0;
}
}
send_cluster_id();
ais_notice("Shutdown complete");
/* TODO: Add back the logsys flush call once its written */
return 0;
}
struct member_loop_data {
char *string;
};
static void
member_vote_count_fn(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
plugin_has_votes += node->votes;
}
}
void
member_loop_fn(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
struct member_loop_data *data = user_data;
ais_trace("Dumping node %u", node->id);
data->string = append_member(data->string, node);
}
char *
pcmk_generate_membership_data(void)
{
int size = 0;
struct member_loop_data data;
size = 256;
ais_malloc0(data.string, size);
/* Ensure the list of active processes is up-to-date */
update_member(local_nodeid, 0, 0, -1, get_process_list(), local_uname, CRM_NODE_MEMBER, NULL);
plugin_has_votes = 0;
g_hash_table_foreach(membership_list, member_vote_count_fn, NULL);
if (plugin_has_votes > plugin_expected_votes) {
update_expected_votes(plugin_has_votes);
}
snprintf(data.string, size,
"",
membership_seq, plugin_has_quorum()? "true" : "false",
plugin_expected_votes, plugin_has_votes);
g_hash_table_foreach(membership_list, member_loop_fn, &data);
size = strlen(data.string);
data.string = realloc_safe(data.string, size + 9); /* 9 = + nul */
sprintf(data.string + size, "");
return data.string;
}
void
pcmk_nodes(void *conn, ais_void_ptr * msg)
{
char *data = pcmk_generate_membership_data();
void *async_conn = conn;
/* send the ACK before we send any other messages
* - but after we no longer need to access the message
*/
send_ipc_ack(conn);
msg = NULL;
if (async_conn) {
send_client_msg(async_conn, crm_class_members, crm_msg_none, data);
}
ais_free(data);
}
void
pcmk_remove_member(void *conn, ais_void_ptr * msg)
{
const AIS_Message *ais_msg = msg;
char *data = get_ais_data(ais_msg);
send_ipc_ack(conn);
msg = NULL;
if (data != NULL) {
char *bcast = ais_concat("remove-peer", data, ':');
send_plugin_msg(crm_msg_ais, NULL, bcast);
ais_info("Sent: %s", bcast);
ais_free(bcast);
}
ais_free(data);
}
static void
send_quorum_details(void *conn)
{
int size = 256;
char *data = NULL;
ais_malloc0(data, size);
snprintf(data, size, "",
membership_seq, plugin_has_quorum()? "true" : "false",
plugin_expected_votes, plugin_has_votes);
send_client_msg(conn, crm_class_quorum, crm_msg_none, data);
ais_free(data);
}
void
pcmk_quorum(void *conn, ais_void_ptr * msg)
{
char *dummy = NULL;
const AIS_Message *ais_msg = msg;
char *data = get_ais_data(ais_msg);
send_ipc_ack(conn);
msg = NULL;
/* Make sure the current number of votes is accurate */
dummy = pcmk_generate_membership_data();
ais_free(dummy);
/* Calls without data just want the current quorum details */
if (data != NULL && strlen(data) > 0) {
int value = ais_get_int(data, NULL);
update_expected_votes(value);
}
send_quorum_details(conn);
ais_free(data);
}
void
pcmk_notify(void *conn, ais_void_ptr * msg)
{
const AIS_Message *ais_msg = msg;
char *data = get_ais_data(ais_msg);
void *async_conn = conn;
int enable = 0;
int sender = ais_msg->sender.pid;
send_ipc_ack(conn);
msg = NULL;
if (ais_str_eq("true", data)) {
enable = 1;
}
ais_info("%s node notifications for child %d (%p)",
enable ? "Enabling" : "Disabling", sender, async_conn);
if (enable) {
g_hash_table_replace(membership_notify_list, async_conn, async_conn);
} else {
g_hash_table_remove(membership_notify_list, async_conn);
}
ais_free(data);
}
void
pcmk_nodeid(void *conn, ais_void_ptr * msg)
{
static int counter = 0;
struct crm_ais_nodeid_resp_s resp;
ais_trace("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter);
resp.header.id = crm_class_nodeid;
resp.header.size = sizeof(struct crm_ais_nodeid_resp_s);
resp.header.error = CS_OK;
resp.id = local_nodeid;
resp.counter = counter++;
memset(resp.uname, 0, MAX_NAME);
memcpy(resp.uname, local_uname, local_uname_len);
memset(resp.cname, 0, MAX_NAME);
memcpy(resp.cname, local_cname, local_cname_len);
pcmk_api->ipc_response_send(conn, &resp, resp.header.size);
}
static gboolean
ghash_send_update(gpointer key, gpointer value, gpointer data)
{
if (send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) {
/* remove it */
return TRUE;
}
return FALSE;
}
void
send_member_notification(void)
{
char *update = pcmk_generate_membership_data();
ais_info("Sending membership update " U64T " to %d children",
membership_seq, g_hash_table_size(membership_notify_list));
g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update);
ais_free(update);
}
gboolean
check_message_sanity(const AIS_Message * msg, const char *data)
{
gboolean sane = TRUE;
gboolean repaired = FALSE;
int dest = msg->host.type;
int tmp_size = msg->header.size - sizeof(AIS_Message);
if (sane && msg->header.size == 0) {
ais_err("Message with no size");
sane = FALSE;
}
if (sane && msg->header.error != CS_OK) {
ais_err("Message header contains an error: %d", msg->header.error);
sane = FALSE;
}
AIS_CHECK(msg->header.size > sizeof(AIS_Message),
- ais_err("Message %d size too small: %d < %zu",
- msg->header.id, msg->header.size, sizeof(AIS_Message));
+ ais_err("Message %d size too small: %d < %llu",
+ msg->header.id, msg->header.size,
+ (unsigned long long) sizeof(AIS_Message));
return FALSE);
if (sane && ais_data_len(msg) != tmp_size) {
ais_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg),
tmp_size);
sane = TRUE;
}
if (sane && ais_data_len(msg) == 0) {
ais_err("Message with no payload");
sane = FALSE;
}
if (sane && data && msg->is_compressed == FALSE) {
int str_size = strlen(data) + 1;
if (ais_data_len(msg) != str_size) {
int lpc = 0;
ais_err("Message payload is corrupted: expected %d bytes, got %d",
ais_data_len(msg), str_size);
sane = FALSE;
for (lpc = (str_size - 10); lpc < msg->size; lpc++) {
if (lpc < 0) {
lpc = 0;
}
ais_trace("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
}
}
}
if (sane == FALSE) {
AIS_CHECK(sane,
ais_err
("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
ais_data_len(msg), msg->header.size));
} else if (repaired) {
ais_err
("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
ais_data_len(msg), msg->header.size);
} else {
ais_trace
("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
ais_data_len(msg), msg->header.size);
}
return sane;
}
static int delivered_transient = 0;
static void
deliver_transient_msg(gpointer key, gpointer value, gpointer user_data)
{
int pid = GPOINTER_TO_INT(value);
AIS_Message *mutable = user_data;
if (pid == mutable->host.type) {
int rc = send_client_ipc(key, mutable);
delivered_transient++;
ais_info("Sent message to %s.%d (rc=%d)", ais_dest(&(mutable->host)), pid, rc);
if (rc != 0) {
ais_warn("Sending message to %s.%d failed (rc=%d)",
ais_dest(&(mutable->host)), pid, rc);
log_ais_message(LOG_DEBUG, mutable);
}
}
}
gboolean
route_ais_message(const AIS_Message * msg, gboolean local_origin)
{
int rc = 0;
int dest = msg->host.type;
const char *reason = "unknown";
AIS_Message *mutable = ais_msg_copy(msg);
static int service_id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
ais_trace("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)",
mutable->id, ais_dest(&(mutable->host)), msg_type2text(dest),
ais_dest(&(mutable->sender)), msg_type2text(mutable->sender.type),
mutable->sender.pid, local_origin ? "false" : "true", ais_data_len((mutable)));
if (local_origin == FALSE) {
if (mutable->host.size == 0 || ais_str_eq(local_uname, mutable->host.uname)) {
mutable->host.local = TRUE;
}
}
if (check_message_sanity(mutable, mutable->data) == FALSE) {
/* Don't send this message to anyone */
rc = 1;
goto bail;
}
if (mutable->host.local) {
void *conn = NULL;
const char *lookup = NULL;
int children_index = 0;
if (dest == crm_msg_ais) {
process_ais_message(mutable);
goto bail;
} else if (dest == crm_msg_lrmd) {
/* lrmd messages are routed via the crm */
dest = crm_msg_crmd;
} else if (dest == crm_msg_te) {
/* te messages are routed via the crm */
dest = crm_msg_crmd;
} else if (dest >= SIZEOF(pcmk_children)) {
/* Transient client */
delivered_transient = 0;
g_hash_table_foreach(ipc_client_list, deliver_transient_msg, mutable);
if (delivered_transient) {
ais_trace("Sent message to %d transient clients: %d", delivered_transient, dest);
goto bail;
} else {
/* try the crmd */
ais_trace("Sending message to transient client %d via crmd", dest);
dest = crm_msg_crmd;
}
} else if (dest == 0) {
ais_err("Invalid destination: %d", dest);
log_ais_message(LOG_ERR, mutable);
log_printf(LOG_ERR, "%s", get_ais_data(mutable));
rc = 1;
goto bail;
}
lookup = msg_type2text(dest);
if (dest == crm_msg_pe && ais_str_eq(pcmk_children[7].name, lookup)) {
children_index = 7;
} else {
children_index = dest;
}
conn = pcmk_children[children_index].async_conn;
if (mutable->header.id == service_id) {
mutable->header.id = 0; /* reset this back to zero for IPC messages */
} else if (mutable->header.id != 0) {
ais_err("reset header id back to zero from %d", mutable->header.id);
mutable->header.id = 0; /* reset this back to zero for IPC messages */
}
reason = "ipc delivery failed";
rc = send_client_ipc(conn, mutable);
} else if (local_origin) {
/* forward to other hosts */
ais_trace("Forwarding to cluster");
reason = "cluster delivery failed";
rc = send_plugin_msg_raw(mutable);
}
if (rc != 0) {
ais_warn("Sending message to %s.%s failed: %s (rc=%d)",
ais_dest(&(mutable->host)), msg_type2text(dest), reason, rc);
log_ais_message(LOG_DEBUG, mutable);
}
bail:
ais_free(mutable);
return rc == 0 ? TRUE : FALSE;
}
int
send_plugin_msg_raw(const AIS_Message * ais_msg)
{
int rc = 0;
struct iovec iovec;
static uint32_t msg_id = 0;
AIS_Message *mutable = ais_msg_copy(ais_msg);
AIS_ASSERT(local_nodeid != 0);
AIS_ASSERT(ais_msg->header.size == (sizeof(AIS_Message) + ais_data_len(ais_msg)));
if (mutable->id == 0) {
msg_id++;
AIS_CHECK(msg_id != 0 /* detect wrap-around */ ,
msg_id++; ais_err("Message ID wrapped around"));
mutable->id = msg_id;
}
mutable->header.error = CS_OK;
mutable->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
mutable->sender.id = local_nodeid;
mutable->sender.size = local_uname_len;
memset(mutable->sender.uname, 0, MAX_NAME);
memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
iovec.iov_base = (char *)mutable;
iovec.iov_len = mutable->header.size;
ais_trace("Sending message (size=%u)", (unsigned int)iovec.iov_len);
rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
if (rc == 0 && mutable->is_compressed == FALSE) {
ais_trace("Message sent: %.80s", mutable->data);
}
AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, mutable->data));
ais_free(mutable);
return rc;
}
#define min(x,y) (x)<(y)?(x):(y)
void
send_cluster_id(void)
{
int rc = 0;
int len = 0;
time_t now = time(NULL);
struct iovec iovec;
struct crm_identify_msg_s *msg = NULL;
static time_t started = 0;
static uint64_t first_seq = 0;
AIS_ASSERT(local_nodeid != 0);
if (started == 0) {
started = now;
first_seq = membership_seq;
}
if (local_born_on == 0) {
if (started + 15 < now) {
ais_debug("Born-on set to: " U64T " (age)", first_seq);
local_born_on = first_seq;
} else if (have_reliable_membership_id) {
ais_debug("Born-on set to: " U64T " (peer)", membership_seq);
local_born_on = membership_seq;
} else {
ais_debug("Leaving born-on unset: " U64T, membership_seq);
}
}
ais_malloc0(msg, sizeof(struct crm_identify_msg_s));
msg->header.size = sizeof(struct crm_identify_msg_s);
msg->id = local_nodeid;
/* msg->header.error = CS_OK; */
msg->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 1);
len = min(local_uname_len, MAX_NAME - 1);
memset(msg->uname, 0, MAX_NAME);
memcpy(msg->uname, local_uname, len);
len = min(strlen(VERSION), MAX_NAME - 1);
memset(msg->version, 0, MAX_NAME);
memcpy(msg->version, VERSION, len);
msg->votes = 1;
msg->pid = getpid();
msg->processes = get_process_list();
msg->born_on = local_born_on;
ais_debug("Local update: id=%u, born=" U64T ", seq=" U64T "",
local_nodeid, local_born_on, membership_seq);
update_member(local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL,
NULL, VERSION);
iovec.iov_base = (char *)msg;
iovec.iov_len = msg->header.size;
rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc));
ais_free(msg);
}
static gboolean
ghash_send_removal(gpointer key, gpointer value, gpointer data)
{
send_quorum_details(value);
if (send_client_msg(value, crm_class_rmpeer, crm_msg_none, data) != 0) {
/* remove it */
return TRUE;
}
return FALSE;
}
void
ais_remove_peer(char *node_id)
{
uint32_t id = ais_get_int(node_id, NULL);
crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
if (node == NULL) {
ais_info("Peer %u is unknown", id);
} else if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
ais_warn("Peer %u/%s is still active", id, node->uname);
} else if (g_hash_table_remove(membership_list, GUINT_TO_POINTER(id))) {
plugin_expected_votes--;
ais_notice("Removed dead peer %u from the membership list", id);
ais_info("Sending removal of %u to %d children",
id, g_hash_table_size(membership_notify_list));
g_hash_table_foreach_remove(membership_notify_list, ghash_send_removal, node_id);
} else {
ais_warn("Peer %u/%s was not removed", id, node->uname);
}
}
void
ais_remove_peer_by_name(const char *node_name)
{
GHashTableIter iter;
gpointer key = 0;
crm_node_t *node = NULL;
GList *node_list = NULL;
g_hash_table_iter_init(&iter, membership_list);
while (g_hash_table_iter_next(&iter, &key, (void **)&node)) {
if (ais_str_eq(node_name, node->uname)) {
uint32_t node_id = GPOINTER_TO_UINT(key);
char *node_id_s = NULL;
ais_malloc0(node_id_s, 32);
snprintf(node_id_s, 31, "%u", node_id);
node_list = g_list_append(node_list, node_id_s);
}
}
if (node_list) {
GList *gIter = NULL;
for (gIter = node_list; gIter != NULL; gIter = gIter->next) {
char *node_id_s = gIter->data;
ais_remove_peer(node_id_s);
}
g_list_free_full(node_list, free);
} else {
ais_warn("Peer %s is unkown", node_name);
}
}
gboolean
process_ais_message(const AIS_Message * msg)
{
int len = ais_data_len(msg);
char *data = get_ais_data(msg);
do_ais_log(LOG_DEBUG,
"Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid,
msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data);
if (data && len > 12 && strncmp("remove-peer:", data, 12) == 0) {
char *node = data + 12;
ais_remove_peer_by_name(node);
}
ais_free(data);
return TRUE;
}
static void
member_dump_fn(gpointer key, gpointer value, gpointer user_data)
{
crm_node_t *node = value;
ais_info(" node id:%u, uname=%s state=%s processes=%.16x born=" U64T " seen=" U64T
" addr=%s version=%s", node->id, node->uname ? node->uname : "-unknown-", node->state,
node->processes, node->born, node->last_seen, node->addr ? node->addr : "-unknown-",
node->version ? node->version : "-unknown-");
}
void
pcmk_exec_dump(void)
{
/* Called after SIG_USR2 */
process_ais_conf();
ais_info("Local id: %u, uname: %s, born: " U64T, local_nodeid, local_uname, local_born_on);
ais_info("Membership id: " U64T ", quorate: %s, expected: %u, actual: %u",
membership_seq, plugin_has_quorum()? "true" : "false",
plugin_expected_votes, plugin_has_votes);
g_hash_table_foreach(membership_list, member_dump_fn, NULL);
}
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index 74e33be29e..64cf4cc3a4 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,687 +1,693 @@
/*
* Copyright (C) 2004 Andrew Beekhof
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
static bool cpg_evicted = FALSE;
gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
#define cs_repeat(counter, max, code) do { \
code; \
if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
counter++; \
crm_debug("Retrying operation after %ds", counter); \
sleep(counter); \
} else { \
break; \
} \
} while(counter < max)
void
cluster_disconnect_cpg(crm_cluster_t *cluster)
{
pcmk_cpg_handle = 0;
if (cluster->cpg_handle) {
crm_trace("Disconnecting CPG");
cpg_leave(cluster->cpg_handle, &cluster->group);
cpg_finalize(cluster->cpg_handle);
cluster->cpg_handle = 0;
} else {
crm_info("No CPG connection");
}
}
uint32_t get_local_nodeid(cpg_handle_t handle)
{
int rc = CS_OK;
int retries = 0;
static uint32_t local_nodeid = 0;
cpg_handle_t local_handle = handle;
cpg_callbacks_t cb = { };
if(local_nodeid != 0) {
return local_nodeid;
}
#if 0
/* Should not be necessary */
if(get_cluster_type() == pcmk_cluster_classic_ais) {
get_ais_details(&local_nodeid, NULL);
goto done;
}
#endif
if(handle == 0) {
crm_trace("Creating connection");
cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
}
if (rc == CS_OK) {
retries = 0;
crm_trace("Performing lookup");
cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
}
if (rc != CS_OK) {
crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
}
if(handle == 0) {
crm_trace("Closing connection");
cpg_finalize(local_handle);
}
crm_debug("Local nodeid is %u", local_nodeid);
return local_nodeid;
}
GListPtr cs_message_queue = NULL;
int cs_message_timer = 0;
static ssize_t crm_cs_flush(gpointer data);
static gboolean
crm_cs_flush_cb(gpointer data)
{
cs_message_timer = 0;
crm_cs_flush(data);
return FALSE;
}
#define CS_SEND_MAX 200
static ssize_t
crm_cs_flush(gpointer data)
{
int sent = 0;
ssize_t rc = 0;
int queue_len = 0;
static unsigned int last_sent = 0;
cpg_handle_t *handle = (cpg_handle_t *)data;
if (*handle == 0) {
crm_trace("Connection is dead");
return pcmk_ok;
}
queue_len = g_list_length(cs_message_queue);
if ((queue_len % 1000) == 0 && queue_len > 1) {
crm_err("CPG queue has grown to %d", queue_len);
} else if (queue_len == CS_SEND_MAX) {
crm_warn("CPG queue has grown to %d", queue_len);
}
if (cs_message_timer) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active %d", cs_message_timer);
return pcmk_ok;
}
while (cs_message_queue && sent < CS_SEND_MAX) {
struct iovec *iov = cs_message_queue->data;
errno = 0;
rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
if (rc != CS_OK) {
break;
}
sent++;
last_sent++;
- crm_trace("CPG message sent, size=%zd", iov->iov_len);
+ crm_trace("CPG message sent, size=%llu",
+ (unsigned long long) iov->iov_len);
cs_message_queue = g_list_remove(cs_message_queue, iov);
free(iov->iov_base);
free(iov);
}
queue_len -= sent;
if (sent > 1 || cs_message_queue) {
- crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
- sent, queue_len, last_sent, ais_error2text(rc), rc);
+ crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
+ sent, queue_len, last_sent, ais_error2text(rc),
+ (long long) rc);
} else {
- crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%zd)",
- sent, queue_len, last_sent, ais_error2text(rc), rc);
+ crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
+ sent, queue_len, last_sent, ais_error2text(rc),
+ (long long) rc);
}
if (cs_message_queue) {
uint32_t delay_ms = 100;
if(rc != CS_OK) {
/* Proportionally more if sending failed but cap at 1s */
delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
}
cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
}
return rc;
}
gboolean
send_cpg_iov(struct iovec * iov)
{
static unsigned int queued = 0;
queued++;
- crm_trace("Queueing CPG message %u (%zd bytes)", queued, iov->iov_len);
+ crm_trace("Queueing CPG message %u (%llu bytes)",
+ queued, (unsigned long long) iov->iov_len);
cs_message_queue = g_list_append(cs_message_queue, iov);
crm_cs_flush(&pcmk_cpg_handle);
return TRUE;
}
static int
pcmk_cpg_dispatch(gpointer user_data)
{
int rc = 0;
crm_cluster_t *cluster = (crm_cluster_t*) user_data;
rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
if (rc != CS_OK) {
crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
cluster->cpg_handle = 0;
return -1;
} else if(cpg_evicted) {
crm_err("Evicted from CPG membership");
return -1;
}
return 0;
}
char *
pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
uint32_t *kind, const char **from)
{
char *data = NULL;
AIS_Message *msg = (AIS_Message *) content;
if(handle) {
/* 'msg' came from CPG not the plugin
* Do filtering and field massaging
*/
uint32_t local_nodeid = get_local_nodeid(handle);
const char *local_name = get_local_node_name();
if (msg->sender.id > 0 && msg->sender.id != nodeid) {
crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
return NULL;
} else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
/* Not for us */
crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
return NULL;
} else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
/* Not for us */
crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
return NULL;
}
msg->sender.id = nodeid;
if (msg->sender.size == 0) {
crm_node_t *peer = crm_get_peer(nodeid, NULL);
if (peer == NULL) {
crm_err("Peer with nodeid=%u is unknown", nodeid);
} else if (peer->uname == NULL) {
crm_err("No uname for peer with nodeid=%u", nodeid);
} else {
crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
msg->sender.size = strlen(peer->uname);
memset(msg->sender.uname, 0, MAX_NAME);
memcpy(msg->sender.uname, peer->uname, msg->sender.size);
}
}
}
crm_trace("Got new%s message (size=%d, %d, %d)",
msg->is_compressed ? " compressed" : "",
ais_data_len(msg), msg->size, msg->compressed_size);
if (kind != NULL) {
*kind = msg->header.id;
}
if (from != NULL) {
*from = msg->sender.uname;
}
if (msg->is_compressed && msg->size > 0) {
int rc = BZ_OK;
char *uncompressed = NULL;
unsigned int new_size = msg->size + 1;
if (check_message_sanity(msg, NULL) == FALSE) {
goto badmsg;
}
crm_trace("Decompressing message data");
uncompressed = calloc(1, new_size);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
if (rc != BZ_OK) {
crm_err("Decompression failed: %d", rc);
free(uncompressed);
goto badmsg;
}
CRM_ASSERT(rc == BZ_OK);
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if (check_message_sanity(msg, data) == FALSE) {
goto badmsg;
} else if (safe_str_eq("identify", data)) {
int pid = getpid();
char *pid_s = crm_itoa(pid);
send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
free(pid_s);
return NULL;
} else {
data = strdup(msg->data);
}
if (msg->header.id != crm_class_members) {
/* Is this even needed anymore? */
crm_get_peer(msg->sender.id, msg->sender.uname);
}
if (msg->header.id == crm_class_rmpeer) {
uint32_t id = crm_int_helper(data, NULL);
crm_info("Removing peer %s/%u", data, id);
reap_crm_member(id, NULL);
free(data);
return NULL;
#if SUPPORT_PLUGIN
} else if (is_classic_ais_cluster()) {
plugin_handle_membership(msg);
#endif
}
crm_trace("Payload: %.200s", data);
return data;
badmsg:
crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(AIS_Message),
msg->header.size, msg->size, msg->compressed_size);
free(data);
return NULL;
}
void
pcmk_cpg_membership(cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
gboolean found = FALSE;
static int counter = 0;
uint32_t local_nodeid = get_local_nodeid(handle);
for (i = 0; i < left_list_entries; i++) {
crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
crm_info("Node %u left group %s (peer=%s, counter=%d.%d)",
left_list[i].nodeid, groupName->value,
(peer? peer->uname : ""), counter, i);
if (peer) {
crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
}
}
for (i = 0; i < joined_list_entries; i++) {
crm_info("Node %u joined group %s (counter=%d.%d)",
joined_list[i].nodeid, groupName->value, counter, i);
}
for (i = 0; i < member_list_entries; i++) {
crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
crm_info("Node %u still member of group %s (peer=%s, counter=%d.%d)",
member_list[i].nodeid, groupName->value,
(peer? peer->uname : ""), counter, i);
/* Anyone that is sending us CPG messages must also be a _CPG_ member.
* But it's _not_ safe to assume it's in the quorum membership.
* We may have just found out it's dead and are processing the last couple of messages it sent
*/
peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
time_t now = time(NULL);
/* Co-opt the otherwise unused votes field */
if(peer->votes == 0) {
peer->votes = now;
} else if(now > (60 + peer->votes)) {
/* On the otherhand, if we're still getting messages, at a certain point
* we need to acknowledge our internal cache is probably wrong
*
* Set the threshold to 1 minute
*/
crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) {
peer->votes = 0;
}
}
}
if (local_nodeid == member_list[i].nodeid) {
found = TRUE;
}
}
if (!found) {
crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
cpg_evicted = TRUE;
}
counter++;
}
gboolean
cluster_connect_cpg(crm_cluster_t *cluster)
{
int rc = -1;
int fd = 0;
int retries = 0;
uint32_t id = 0;
crm_node_t *peer = NULL;
cpg_handle_t handle = 0;
struct mainloop_fd_callbacks cpg_fd_callbacks = {
.dispatch = pcmk_cpg_dispatch,
.destroy = cluster->destroy,
};
cpg_callbacks_t cpg_callbacks = {
.cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
.cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
/* .cpg_deliver_fn = pcmk_cpg_deliver, */
/* .cpg_confchg_fn = pcmk_cpg_membership, */
};
cpg_evicted = FALSE;
cluster->group.length = 0;
cluster->group.value[0] = 0;
/* group.value is char[128] */
strncpy(cluster->group.value, crm_system_name?crm_system_name:"unknown", 127);
cluster->group.value[127] = 0;
cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
if (rc != CS_OK) {
crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
goto bail;
}
id = get_local_nodeid(handle);
if (id == 0) {
crm_err("Could not get local node id from the CPG API");
goto bail;
}
cluster->nodeid = id;
retries = 0;
cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
if (rc != CS_OK) {
crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
goto bail;
}
rc = cpg_fd_get(handle, &fd);
if (rc != CS_OK) {
crm_err("Could not obtain the CPG API connection: %d\n", rc);
goto bail;
}
pcmk_cpg_handle = handle;
cluster->cpg_handle = handle;
mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
bail:
if (rc != CS_OK) {
cpg_finalize(handle);
return FALSE;
}
peer = crm_get_peer(id, NULL);
crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
return TRUE;
}
gboolean
send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
{
gboolean rc = TRUE;
char *data = NULL;
data = dump_xml_unformatted(msg);
rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
free(data);
return rc;
}
gboolean
send_cluster_text(int class, const char *data,
gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
{
static int msg_id = 0;
static int local_pid = 0;
static int local_name_len = 0;
static const char *local_name = NULL;
char *target = NULL;
struct iovec *iov;
AIS_Message *msg = NULL;
enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
/* There are only 6 handlers registered to crm_lib_service in plugin.c */
CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
return FALSE);
#if !SUPPORT_PLUGIN
CRM_CHECK(dest != crm_msg_ais, return FALSE);
#endif
if(local_name == NULL) {
local_name = get_local_node_name();
}
if(local_name_len == 0 && local_name) {
local_name_len = strlen(local_name);
}
if (data == NULL) {
data = "";
}
if (local_pid == 0) {
local_pid = getpid();
}
if (sender == crm_msg_none) {
sender = local_pid;
}
msg = calloc(1, sizeof(AIS_Message));
msg_id++;
msg->id = msg_id;
msg->header.id = class;
msg->header.error = CS_OK;
msg->host.type = dest;
msg->host.local = local;
if (node) {
if (node->uname) {
target = strdup(node->uname);
msg->host.size = strlen(node->uname);
memset(msg->host.uname, 0, MAX_NAME);
memcpy(msg->host.uname, node->uname, msg->host.size);
} else {
target = crm_strdup_printf("%u", node->id);
}
msg->host.id = node->id;
} else {
target = strdup("all");
}
msg->sender.id = 0;
msg->sender.type = sender;
msg->sender.pid = local_pid;
msg->sender.size = local_name_len;
memset(msg->sender.uname, 0, MAX_NAME);
if(local_name && msg->sender.size) {
memcpy(msg->sender.uname, local_name, msg->sender.size);
}
msg->size = 1 + strlen(data);
msg->header.size = sizeof(AIS_Message) + msg->size;
if (msg->size < CRM_BZ2_THRESHOLD) {
msg = realloc_safe(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
} else {
char *compressed = NULL;
unsigned int new_size = 0;
char *uncompressed = strdup(data);
if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
msg->header.size = sizeof(AIS_Message) + new_size;
msg = realloc_safe(msg, msg->header.size);
memcpy(msg->data, compressed, new_size);
msg->is_compressed = TRUE;
msg->compressed_size = new_size;
} else {
msg = realloc_safe(msg, msg->header.size);
memcpy(msg->data, data, msg->size);
}
free(uncompressed);
free(compressed);
}
iov = calloc(1, sizeof(struct iovec));
iov->iov_base = msg;
iov->iov_len = msg->header.size;
if (msg->compressed_size) {
- crm_trace("Queueing CPG message %u to %s (%zd bytes, %d bytes compressed payload): %.200s",
- msg->id, target, iov->iov_len, msg->compressed_size, data);
+ crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
+ msg->id, target, (unsigned long long) iov->iov_len,
+ msg->compressed_size, data);
} else {
- crm_trace("Queueing CPG message %u to %s (%zd bytes, %d bytes payload): %.200s",
- msg->id, target, iov->iov_len, msg->size, data);
+ crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
+ msg->id, target, (unsigned long long) iov->iov_len,
+ msg->size, data);
}
free(target);
#if SUPPORT_PLUGIN
/* The plugin is the only time we dont use CPG messaging */
if(get_cluster_type() == pcmk_cluster_classic_ais) {
return send_plugin_text(class, iov);
}
#endif
send_cpg_iov(iov);
return TRUE;
}
enum crm_ais_msg_types
text2msg_type(const char *text)
{
int type = crm_msg_none;
CRM_CHECK(text != NULL, return type);
if (safe_str_eq(text, "ais")) {
type = crm_msg_ais;
} else if (safe_str_eq(text, "crm_plugin")) {
type = crm_msg_ais;
} else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
type = crm_msg_cib;
} else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
type = crm_msg_crmd;
} else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
type = crm_msg_crmd;
} else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
type = crm_msg_te;
} else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
type = crm_msg_pe;
} else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
type = crm_msg_lrmd;
} else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
type = crm_msg_stonithd;
} else if (safe_str_eq(text, "stonith-ng")) {
type = crm_msg_stonith_ng;
} else if (safe_str_eq(text, "attrd")) {
type = crm_msg_attrd;
} else {
/* This will normally be a transient client rather than
* a cluster daemon. Set the type to the pid of the client
*/
int scan_rc = sscanf(text, "%d", &type);
if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
/* Ensure it's sane */
type = crm_msg_none;
}
}
return type;
}
diff --git a/lib/common/ipc.c b/lib/common/ipc.c
index 4342707f94..6d6d3cd15d 100644
--- a/lib/common/ipc.c
+++ b/lib/common/ipc.c
@@ -1,1280 +1,1283 @@
/*
* Copyright (C) 2004 Andrew Beekhof
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define PCMK_IPC_VERSION 1
struct crm_ipc_response_header {
struct qb_ipc_response_header qb;
uint32_t size_uncompressed;
uint32_t size_compressed;
uint32_t flags;
uint8_t version; /* Protect against version changes for anyone that might bother to statically link us */
};
static int hdr_offset = 0;
static unsigned int ipc_buffer_max = 0;
static unsigned int pick_ipc_buffer(unsigned int max);
static inline void
crm_ipc_init(void)
{
if (hdr_offset == 0) {
hdr_offset = sizeof(struct crm_ipc_response_header);
}
if (ipc_buffer_max == 0) {
ipc_buffer_max = pick_ipc_buffer(0);
}
}
unsigned int
crm_ipc_default_buffer_size(void)
{
return pick_ipc_buffer(0);
}
static char *
generateReference(const char *custom1, const char *custom2)
{
static uint ref_counter = 0;
const char *local_cust1 = custom1;
const char *local_cust2 = custom2;
int reference_len = 4;
char *since_epoch = NULL;
reference_len += 20; /* too big */
reference_len += 40; /* too big */
if (local_cust1 == NULL) {
local_cust1 = "_empty_";
}
reference_len += strlen(local_cust1);
if (local_cust2 == NULL) {
local_cust2 = "_empty_";
}
reference_len += strlen(local_cust2);
since_epoch = calloc(1, reference_len);
if (since_epoch != NULL) {
sprintf(since_epoch, "%s-%s-%lu-%u",
local_cust1, local_cust2, (unsigned long)time(NULL), ref_counter++);
}
return since_epoch;
}
xmlNode *
create_request_adv(const char *task, xmlNode * msg_data,
const char *host_to, const char *sys_to,
const char *sys_from, const char *uuid_from, const char *origin)
{
char *true_from = NULL;
xmlNode *request = NULL;
char *reference = generateReference(task, sys_from);
if (uuid_from != NULL) {
true_from = generate_hash_key(sys_from, uuid_from);
} else if (sys_from != NULL) {
true_from = strdup(sys_from);
} else {
crm_err("No sys from specified");
}
/* host_from will get set for us if necessary by CRMd when routed */
request = create_xml_node(NULL, __FUNCTION__);
crm_xml_add(request, F_CRM_ORIGIN, origin);
crm_xml_add(request, F_TYPE, T_CRM);
crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST);
crm_xml_add(request, F_CRM_REFERENCE, reference);
crm_xml_add(request, F_CRM_TASK, task);
crm_xml_add(request, F_CRM_SYS_TO, sys_to);
crm_xml_add(request, F_CRM_SYS_FROM, true_from);
/* HOSTTO will be ignored if it is to the DC anyway. */
if (host_to != NULL && strlen(host_to) > 0) {
crm_xml_add(request, F_CRM_HOST_TO, host_to);
}
if (msg_data != NULL) {
add_message_xml(request, F_CRM_DATA, msg_data);
}
free(reference);
free(true_from);
return request;
}
/*
* This method adds a copy of xml_response_data
*/
xmlNode *
create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data, const char *origin)
{
xmlNode *reply = NULL;
const char *host_from = crm_element_value(original_request, F_CRM_HOST_FROM);
const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM);
const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO);
const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE);
const char *operation = crm_element_value(original_request, F_CRM_TASK);
const char *crm_msg_reference = crm_element_value(original_request, F_CRM_REFERENCE);
if (type == NULL) {
crm_err("Cannot create new_message, no message type in original message");
CRM_ASSERT(type != NULL);
return NULL;
#if 0
} else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) {
crm_err("Cannot create new_message, original message was not a request");
return NULL;
#endif
}
reply = create_xml_node(NULL, __FUNCTION__);
if (reply == NULL) {
crm_err("Cannot create new_message, malloc failed");
return NULL;
}
crm_xml_add(reply, F_CRM_ORIGIN, origin);
crm_xml_add(reply, F_TYPE, T_CRM);
crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET);
crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE);
crm_xml_add(reply, F_CRM_REFERENCE, crm_msg_reference);
crm_xml_add(reply, F_CRM_TASK, operation);
/* since this is a reply, we reverse the from and to */
crm_xml_add(reply, F_CRM_SYS_TO, sys_from);
crm_xml_add(reply, F_CRM_SYS_FROM, sys_to);
/* HOSTTO will be ignored if it is to the DC anyway. */
if (host_from != NULL && strlen(host_from) > 0) {
crm_xml_add(reply, F_CRM_HOST_TO, host_from);
}
if (xml_response_data != NULL) {
add_message_xml(reply, F_CRM_DATA, xml_response_data);
}
return reply;
}
/* Libqb based IPC */
/* Server... */
GHashTable *client_connections = NULL;
crm_client_t *
crm_client_get(qb_ipcs_connection_t * c)
{
if (client_connections) {
return g_hash_table_lookup(client_connections, c);
}
crm_trace("No client found for %p", c);
return NULL;
}
crm_client_t *
crm_client_get_by_id(const char *id)
{
gpointer key;
crm_client_t *client;
GHashTableIter iter;
if (client_connections && id) {
g_hash_table_iter_init(&iter, client_connections);
while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
if (strcmp(client->id, id) == 0) {
return client;
}
}
}
crm_trace("No client found with id=%s", id);
return NULL;
}
const char *
crm_client_name(crm_client_t * c)
{
if (c == NULL) {
return "null";
} else if (c->name == NULL && c->id == NULL) {
return "unknown";
} else if (c->name == NULL) {
return c->id;
} else {
return c->name;
}
}
void
crm_client_init(void)
{
if (client_connections == NULL) {
crm_trace("Creating client hash table");
client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
}
}
void
crm_client_cleanup(void)
{
if (client_connections != NULL) {
int active = g_hash_table_size(client_connections);
if (active) {
crm_err("Exiting with %d active connections", active);
}
g_hash_table_destroy(client_connections); client_connections = NULL;
}
}
void
crm_client_disconnect_all(qb_ipcs_service_t *service)
{
qb_ipcs_connection_t *c = NULL;
if (service == NULL) {
return;
}
c = qb_ipcs_connection_first_get(service);
while (c != NULL) {
qb_ipcs_connection_t *last = c;
c = qb_ipcs_connection_next_get(service, last);
/* There really shouldn't be anyone connected at this point */
crm_notice("Disconnecting client %p, pid=%d...", last, crm_ipcs_client_pid(last));
qb_ipcs_disconnect(last);
qb_ipcs_connection_unref(last);
}
}
crm_client_t *
crm_client_new(qb_ipcs_connection_t * c, uid_t uid_client, gid_t gid_client)
{
static uid_t uid_server = 0;
static gid_t gid_cluster = 0;
crm_client_t *client = NULL;
CRM_LOG_ASSERT(c);
if (c == NULL) {
return NULL;
}
if (gid_cluster == 0) {
uid_server = getuid();
if(crm_user_lookup(CRM_DAEMON_USER, NULL, &gid_cluster) < 0) {
static bool have_error = FALSE;
if(have_error == FALSE) {
crm_warn("Could not find group for user %s", CRM_DAEMON_USER);
have_error = TRUE;
}
}
}
if(gid_cluster != 0 && gid_client != 0) {
uid_t best_uid = -1; /* Passing -1 to chown(2) means don't change */
if(uid_client == 0 || uid_server == 0) { /* Someone is priveliged, but the other may not be */
best_uid = QB_MAX(uid_client, uid_server);
crm_trace("Allowing user %u to clean up after disconnect", best_uid);
}
crm_trace("Giving access to group %u", gid_cluster);
qb_ipcs_connection_auth_set(c, best_uid, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
}
crm_client_init();
/* TODO: Do our own auth checking, return NULL if unauthorized */
client = calloc(1, sizeof(crm_client_t));
client->ipcs = c;
client->kind = CRM_CLIENT_IPC;
client->pid = crm_ipcs_client_pid(c);
client->id = crm_generate_uuid();
crm_debug("Connecting %p for uid=%d gid=%d pid=%u id=%s", c, uid_client, gid_client, client->pid, client->id);
#if ENABLE_ACL
client->user = uid2username(uid_client);
#endif
g_hash_table_insert(client_connections, c, client);
return client;
}
void
crm_client_destroy(crm_client_t * c)
{
if (c == NULL) {
return;
}
if (client_connections) {
if (c->ipcs) {
crm_trace("Destroying %p/%p (%d remaining)",
c, c->ipcs, crm_hash_table_size(client_connections) - 1);
g_hash_table_remove(client_connections, c->ipcs);
} else {
crm_trace("Destroying remote connection %p (%d remaining)",
c, crm_hash_table_size(client_connections) - 1);
g_hash_table_remove(client_connections, c->id);
}
}
if (c->event_timer) {
g_source_remove(c->event_timer);
}
crm_debug("Destroying %d events", g_list_length(c->event_queue));
while (c->event_queue) {
struct iovec *event = c->event_queue->data;
c->event_queue = g_list_remove(c->event_queue, event);
free(event[0].iov_base);
free(event[1].iov_base);
free(event);
}
free(c->id);
free(c->name);
free(c->user);
if (c->remote) {
if (c->remote->auth_timeout) {
g_source_remove(c->remote->auth_timeout);
}
free(c->remote->buffer);
free(c->remote);
}
free(c);
}
int
crm_ipcs_client_pid(qb_ipcs_connection_t * c)
{
struct qb_ipcs_connection_stats stats;
stats.client_pid = 0;
qb_ipcs_connection_stats_get(c, &stats, 0);
return stats.client_pid;
}
xmlNode *
crm_ipcs_recv(crm_client_t * c, void *data, size_t size, uint32_t * id, uint32_t * flags)
{
xmlNode *xml = NULL;
char *uncompressed = NULL;
char *text = ((char *)data) + sizeof(struct crm_ipc_response_header);
struct crm_ipc_response_header *header = data;
if (id) {
*id = ((struct qb_ipc_response_header *)data)->id;
}
if (flags) {
*flags = header->flags;
}
if (is_set(header->flags, crm_ipc_proxied)) {
/* mark this client as being the endpoint of a proxy connection.
* Proxy connections responses are sent on the event channel to avoid
* blocking the proxy daemon (crmd) */
c->flags |= crm_client_flag_ipc_proxied;
}
if(header->version > PCMK_IPC_VERSION) {
crm_err("Filtering incompatible v%d IPC message, we only support versions <= %d",
header->version, PCMK_IPC_VERSION);
return NULL;
}
if (header->size_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->size_uncompressed;
uncompressed = calloc(1, size_u);
crm_trace("Decompressing message data %u bytes into %u bytes",
header->size_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
text = uncompressed;
if (rc != BZ_OK) {
crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
free(uncompressed);
return NULL;
}
}
CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
crm_trace("Received %.200s", text);
xml = string2xml(text);
free(uncompressed);
return xml;
}
ssize_t crm_ipcs_flush_events(crm_client_t * c);
static gboolean
crm_ipcs_flush_events_cb(gpointer data)
{
crm_client_t *c = data;
c->event_timer = 0;
crm_ipcs_flush_events(c);
return FALSE;
}
ssize_t
crm_ipcs_flush_events(crm_client_t * c)
{
int sent = 0;
ssize_t rc = 0;
int queue_len = 0;
if (c == NULL) {
return pcmk_ok;
} else if (c->event_timer) {
/* There is already a timer, wait until it goes off */
crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
return pcmk_ok;
}
queue_len = g_list_length(c->event_queue);
while (c->event_queue && sent < 100) {
struct crm_ipc_response_header *header = NULL;
struct iovec *event = c->event_queue->data;
rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
if (rc < 0) {
break;
}
sent++;
header = event[0].iov_base;
if (header->size_compressed) {
- crm_trace("Event %d to %p[%d] (%zu compressed bytes) sent",
- header->qb.id, c->ipcs, c->pid, rc);
+ crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
+ header->qb.id, c->ipcs, c->pid, (long long) rc);
} else {
- crm_trace("Event %d to %p[%d] (%zu bytes) sent: %.120s",
- header->qb.id, c->ipcs, c->pid, rc, (char *)(event[1].iov_base));
+ crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
+ header->qb.id, c->ipcs, c->pid, (long long) rc,
+ (char *) (event[1].iov_base));
}
c->event_queue = g_list_remove(c->event_queue, event);
free(event[0].iov_base);
free(event[1].iov_base);
free(event);
}
queue_len -= sent;
if (sent > 0 || c->event_queue) {
- crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%zd)",
- sent, queue_len, c->ipcs, c->pid, pcmk_strerror(rc < 0 ? rc : 0), rc);
+ crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
+ sent, queue_len, c->ipcs, c->pid,
+ pcmk_strerror(rc < 0 ? rc : 0), (long long) rc);
}
if (c->event_queue) {
if (queue_len % 100 == 0 && queue_len > 99) {
crm_warn("Event queue for %p[%d] has grown to %d", c->ipcs, c->pid, queue_len);
} else if (queue_len > 500) {
crm_err("Evicting slow client %p[%d]: event queue reached %d entries",
c->ipcs, c->pid, queue_len);
qb_ipcs_disconnect(c->ipcs);
return rc;
}
c->event_timer = g_timeout_add(1000 + 100 * queue_len, crm_ipcs_flush_events_cb, c);
}
return rc;
}
ssize_t
crm_ipc_prepare(uint32_t request, xmlNode * message, struct iovec ** result, uint32_t max_send_size)
{
static unsigned int biggest = 0;
struct iovec *iov;
unsigned int total = 0;
char *compressed = NULL;
char *buffer = dump_xml_unformatted(message);
struct crm_ipc_response_header *header = calloc(1, sizeof(struct crm_ipc_response_header));
CRM_ASSERT(result != NULL);
crm_ipc_init();
if (max_send_size == 0) {
max_send_size = ipc_buffer_max;
}
CRM_LOG_ASSERT(max_send_size != 0);
*result = NULL;
iov = calloc(2, sizeof(struct iovec));
iov[0].iov_len = hdr_offset;
iov[0].iov_base = header;
header->version = PCMK_IPC_VERSION;
header->size_uncompressed = 1 + strlen(buffer);
total = iov[0].iov_len + header->size_uncompressed;
if (total < max_send_size) {
iov[1].iov_base = buffer;
iov[1].iov_len = header->size_uncompressed;
} else {
unsigned int new_size = 0;
if (crm_compress_string
(buffer, header->size_uncompressed, max_send_size, &compressed, &new_size)) {
header->flags |= crm_ipc_compressed;
header->size_compressed = new_size;
iov[1].iov_len = header->size_compressed;
iov[1].iov_base = compressed;
free(buffer);
biggest = QB_MAX(header->size_compressed, biggest);
} else {
ssize_t rc = -EMSGSIZE;
crm_log_xml_trace(message, "EMSGSIZE");
biggest = QB_MAX(header->size_uncompressed, biggest);
crm_err
("Could not compress the message (%u bytes) into less than the configured ipc limit (%u bytes). "
"Set PCMK_ipc_buffer to a higher value (%u bytes suggested)",
header->size_uncompressed, max_send_size, 4 * biggest);
free(compressed);
free(buffer);
free(header);
free(iov);
return rc;
}
}
header->qb.size = iov[0].iov_len + iov[1].iov_len;
header->qb.id = (int32_t)request; /* Replying to a specific request */
*result = iov;
CRM_ASSERT(header->qb.size > 0);
return header->qb.size;
}
ssize_t
crm_ipcs_sendv(crm_client_t * c, struct iovec * iov, enum crm_ipc_flags flags)
{
ssize_t rc;
static uint32_t id = 1;
struct crm_ipc_response_header *header = iov[0].iov_base;
if (c->flags & crm_client_flag_ipc_proxied) {
/* _ALL_ replies to proxied connections need to be sent as events */
if (is_not_set(flags, crm_ipc_server_event)) {
flags |= crm_ipc_server_event;
/* this flag lets us know this was originally meant to be a response.
* even though we're sending it over the event channel. */
flags |= crm_ipc_proxied_relay_response;
}
}
header->flags |= flags;
if (flags & crm_ipc_server_event) {
header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */
if (flags & crm_ipc_server_free) {
crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
c->event_queue = g_list_append(c->event_queue, iov);
} else {
struct iovec *iov_copy = calloc(2, sizeof(struct iovec));
crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
iov_copy[0].iov_len = iov[0].iov_len;
iov_copy[0].iov_base = malloc(iov[0].iov_len);
memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
iov_copy[1].iov_len = iov[1].iov_len;
iov_copy[1].iov_base = malloc(iov[1].iov_len);
memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
c->event_queue = g_list_append(c->event_queue, iov_copy);
}
} else {
CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
if (rc < header->qb.size) {
crm_notice("Response %d to %p[%d] (%u bytes) failed: %s (%d)",
header->qb.id, c->ipcs, c->pid, header->qb.size, pcmk_strerror(rc), rc);
} else {
- crm_trace("Response %d sent, %zd bytes to %p[%d]", header->qb.id, rc, c->ipcs, c->pid);
+ crm_trace("Response %d sent, %lld bytes to %p[%d]",
+ header->qb.id, (long long) rc, c->ipcs, c->pid);
}
if (flags & crm_ipc_server_free) {
free(iov[0].iov_base);
free(iov[1].iov_base);
free(iov);
}
}
if (flags & crm_ipc_server_event) {
rc = crm_ipcs_flush_events(c);
} else {
crm_ipcs_flush_events(c);
}
if (rc == -EPIPE || rc == -ENOTCONN) {
crm_trace("Client %p disconnected", c->ipcs);
}
return rc;
}
ssize_t
crm_ipcs_send(crm_client_t * c, uint32_t request, xmlNode * message,
enum crm_ipc_flags flags)
{
struct iovec *iov = NULL;
ssize_t rc = 0;
if(c == NULL) {
return -EDESTADDRREQ;
}
crm_ipc_init();
rc = crm_ipc_prepare(request, message, &iov, ipc_buffer_max);
if (rc > 0) {
rc = crm_ipcs_sendv(c, iov, flags | crm_ipc_server_free);
} else {
free(iov);
crm_notice("Message to %p[%d] failed: %s (%d)",
c->ipcs, c->pid, pcmk_strerror(rc), rc);
}
return rc;
}
void
crm_ipcs_send_ack(crm_client_t * c, uint32_t request, uint32_t flags, const char *tag, const char *function,
int line)
{
if (flags & crm_ipc_client_response) {
xmlNode *ack = create_xml_node(NULL, tag);
crm_trace("Ack'ing msg from %s (%p)", crm_client_name(c), c);
c->request_id = 0;
crm_xml_add(ack, "function", function);
crm_xml_add_int(ack, "line", line);
crm_ipcs_send(c, request, ack, flags);
free_xml(ack);
}
}
/* Client... */
#define MIN_MSG_SIZE 12336 /* sizeof(struct qb_ipc_connection_response) */
#define MAX_MSG_SIZE 128*1024 /* 128k default */
struct crm_ipc_s {
struct pollfd pfd;
/* the max size we can send/receive over ipc */
unsigned int max_buf_size;
/* Size of the allocated 'buffer' */
unsigned int buf_size;
int msg_size;
int need_reply;
char *buffer;
char *name;
uint32_t buffer_flags;
qb_ipcc_connection_t *ipc;
};
static unsigned int
pick_ipc_buffer(unsigned int max)
{
static unsigned int global_max = 0;
if (global_max == 0) {
const char *env = getenv("PCMK_ipc_buffer");
if (env) {
int env_max = crm_parse_int(env, "0");
global_max = (env_max > 0)? QB_MAX(MIN_MSG_SIZE, env_max) : MAX_MSG_SIZE;
} else {
global_max = MAX_MSG_SIZE;
}
}
return QB_MAX(max, global_max);
}
crm_ipc_t *
crm_ipc_new(const char *name, size_t max_size)
{
crm_ipc_t *client = NULL;
client = calloc(1, sizeof(crm_ipc_t));
client->name = strdup(name);
client->buf_size = pick_ipc_buffer(max_size);
client->buffer = malloc(client->buf_size);
/* Clients initiating connection pick the max buf size */
client->max_buf_size = client->buf_size;
client->pfd.fd = -1;
client->pfd.events = POLLIN;
client->pfd.revents = 0;
return client;
}
/*!
* \brief Establish an IPC connection to a Pacemaker component
*
* \param[in] client Connection instance obtained from crm_ipc_new()
*
* \return TRUE on success, FALSE otherwise (in which case errno will be set)
*/
bool
crm_ipc_connect(crm_ipc_t * client)
{
client->need_reply = FALSE;
client->ipc = qb_ipcc_connect(client->name, client->buf_size);
if (client->ipc == NULL) {
crm_debug("Could not establish %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno);
return FALSE;
}
client->pfd.fd = crm_ipc_get_fd(client);
if (client->pfd.fd < 0) {
crm_debug("Could not obtain file descriptor for %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno);
return FALSE;
}
qb_ipcc_context_set(client->ipc, client);
#ifdef HAVE_IPCS_GET_BUFFER_SIZE
client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc);
if (client->max_buf_size > client->buf_size) {
free(client->buffer);
client->buffer = calloc(1, client->max_buf_size);
client->buf_size = client->max_buf_size;
}
#endif
return TRUE;
}
void
crm_ipc_close(crm_ipc_t * client)
{
if (client) {
crm_trace("Disconnecting %s IPC connection %p (%p)", client->name, client, client->ipc);
if (client->ipc) {
qb_ipcc_connection_t *ipc = client->ipc;
client->ipc = NULL;
qb_ipcc_disconnect(ipc);
}
}
}
void
crm_ipc_destroy(crm_ipc_t * client)
{
if (client) {
if (client->ipc && qb_ipcc_is_connected(client->ipc)) {
crm_notice("Destroying an active IPC connection to %s", client->name);
/* The next line is basically unsafe
*
* If this connection was attached to mainloop and mainloop is active,
* the 'disconnected' callback will end up back here and we'll end
* up free'ing the memory twice - something that can still happen
* even without this if we destroy a connection and it closes before
* we call exit
*/
/* crm_ipc_close(client); */
}
crm_trace("Destroying IPC connection to %s: %p", client->name, client);
free(client->buffer);
free(client->name);
free(client);
}
}
int
crm_ipc_get_fd(crm_ipc_t * client)
{
int fd = 0;
if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) {
return fd;
}
errno = EINVAL;
crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s",
(client? client->name : "unspecified client"));
return -errno;
}
bool
crm_ipc_connected(crm_ipc_t * client)
{
bool rc = FALSE;
if (client == NULL) {
crm_trace("No client");
return FALSE;
} else if (client->ipc == NULL) {
crm_trace("No connection");
return FALSE;
} else if (client->pfd.fd < 0) {
crm_trace("Bad descriptor");
return FALSE;
}
rc = qb_ipcc_is_connected(client->ipc);
if (rc == FALSE) {
client->pfd.fd = -EINVAL;
}
return rc;
}
int
crm_ipc_ready(crm_ipc_t * client)
{
CRM_ASSERT(client != NULL);
if (crm_ipc_connected(client) == FALSE) {
return -ENOTCONN;
}
client->pfd.revents = 0;
return poll(&(client->pfd), 1, 0);
}
static int
crm_ipc_decompress(crm_ipc_t * client)
{
struct crm_ipc_response_header *header = (struct crm_ipc_response_header *)(void*)client->buffer;
if (header->size_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->size_uncompressed;
/* never let buf size fall below our max size required for ipc reads. */
unsigned int new_buf_size = QB_MAX((hdr_offset + size_u), client->max_buf_size);
char *uncompressed = calloc(1, new_buf_size);
crm_trace("Decompressing message data %u bytes into %u bytes",
header->size_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed + hdr_offset, &size_u,
client->buffer + hdr_offset, header->size_compressed, 1, 0);
if (rc != BZ_OK) {
crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
free(uncompressed);
return -EILSEQ;
}
/*
* This assert no longer holds true. For an identical msg, some clients may
* require compression, and others may not. If that same msg (event) is sent
* to multiple clients, it could result in some clients receiving a compressed
* msg even though compression was not explicitly required for them.
*
* CRM_ASSERT((header->size_uncompressed + hdr_offset) >= ipc_buffer_max);
*/
CRM_ASSERT(size_u == header->size_uncompressed);
memcpy(uncompressed, client->buffer, hdr_offset); /* Preserve the header */
header = (struct crm_ipc_response_header *)(void*)uncompressed;
free(client->buffer);
client->buf_size = new_buf_size;
client->buffer = uncompressed;
}
CRM_ASSERT(client->buffer[hdr_offset + header->size_uncompressed - 1] == 0);
return pcmk_ok;
}
long
crm_ipc_read(crm_ipc_t * client)
{
struct crm_ipc_response_header *header = NULL;
CRM_ASSERT(client != NULL);
CRM_ASSERT(client->ipc != NULL);
CRM_ASSERT(client->buffer != NULL);
crm_ipc_init();
client->buffer[0] = 0;
client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size - 1, 0);
if (client->msg_size >= 0) {
int rc = crm_ipc_decompress(client);
if (rc != pcmk_ok) {
return rc;
}
header = (struct crm_ipc_response_header *)(void*)client->buffer;
if(header->version > PCMK_IPC_VERSION) {
crm_err("Filtering incompatible v%d IPC message, we only support versions <= %d",
header->version, PCMK_IPC_VERSION);
return -EBADMSG;
}
crm_trace("Received %s event %d, size=%u, rc=%d, text: %.100s",
client->name, header->qb.id, header->qb.size, client->msg_size,
client->buffer + hdr_offset);
} else {
crm_trace("No message from %s received: %s", client->name, pcmk_strerror(client->msg_size));
}
if (crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) {
crm_err("Connection to %s failed", client->name);
}
if (header) {
/* Data excluding the header */
return header->size_uncompressed;
}
return -ENOMSG;
}
const char *
crm_ipc_buffer(crm_ipc_t * client)
{
CRM_ASSERT(client != NULL);
return client->buffer + sizeof(struct crm_ipc_response_header);
}
uint32_t
crm_ipc_buffer_flags(crm_ipc_t * client)
{
struct crm_ipc_response_header *header = NULL;
CRM_ASSERT(client != NULL);
if (client->buffer == NULL) {
return 0;
}
header = (struct crm_ipc_response_header *)(void*)client->buffer;
return header->flags;
}
const char *
crm_ipc_name(crm_ipc_t * client)
{
CRM_ASSERT(client != NULL);
return client->name;
}
static int
internal_ipc_send_recv(crm_ipc_t * client, const void *iov)
{
int rc = 0;
do {
rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1);
} while (rc == -EAGAIN && crm_ipc_connected(client));
return rc;
}
static int
internal_ipc_send_request(crm_ipc_t * client, const void *iov, int ms_timeout)
{
int rc = 0;
time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
do {
rc = qb_ipcc_sendv(client->ipc, iov, 2);
} while (rc == -EAGAIN && time(NULL) < timeout && crm_ipc_connected(client));
return rc;
}
static int
internal_ipc_get_reply(crm_ipc_t * client, int request_id, int ms_timeout)
{
time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
int rc = 0;
crm_ipc_init();
/* get the reply */
crm_trace("client %s waiting on reply to msg id %d", client->name, request_id);
do {
rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000);
if (rc > 0) {
struct crm_ipc_response_header *hdr = NULL;
int rc = crm_ipc_decompress(client);
if (rc != pcmk_ok) {
return rc;
}
hdr = (struct crm_ipc_response_header *)(void*)client->buffer;
if (hdr->qb.id == request_id) {
/* Got it */
break;
} else if (hdr->qb.id < request_id) {
xmlNode *bad = string2xml(crm_ipc_buffer(client));
crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id);
crm_log_xml_notice(bad, "OldIpcReply");
} else {
xmlNode *bad = string2xml(crm_ipc_buffer(client));
crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id);
crm_log_xml_notice(bad, "ImpossibleReply");
CRM_ASSERT(hdr->qb.id <= request_id);
}
} else if (crm_ipc_connected(client) == FALSE) {
crm_err("Server disconnected client %s while waiting for msg id %d", client->name,
request_id);
break;
}
} while (time(NULL) < timeout);
return rc;
}
int
crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout,
xmlNode ** reply)
{
long rc = 0;
struct iovec *iov;
static uint32_t id = 0;
static int factor = 8;
struct crm_ipc_response_header *header;
crm_ipc_init();
if (client == NULL) {
crm_notice("Invalid connection");
return -ENOTCONN;
} else if (crm_ipc_connected(client) == FALSE) {
/* Don't even bother */
crm_notice("Connection to %s closed", client->name);
return -ENOTCONN;
}
if (ms_timeout == 0) {
ms_timeout = 5000;
}
if (client->need_reply) {
crm_trace("Trying again to obtain pending reply from %s", client->name);
rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout);
if (rc < 0) {
crm_warn("Sending to %s (%p) is disabled until pending reply is received", client->name,
client->ipc);
return -EALREADY;
} else {
crm_notice("Lost reply from %s (%p) finally arrived, sending re-enabled", client->name,
client->ipc);
client->need_reply = FALSE;
}
}
id++;
CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */
rc = crm_ipc_prepare(id, message, &iov, client->max_buf_size);
if(rc < 0) {
return rc;
}
header = iov[0].iov_base;
header->flags |= flags;
if(is_set(flags, crm_ipc_proxied)) {
/* Don't look for a synchronous response */
clear_bit(flags, crm_ipc_client_response);
}
if(header->size_compressed) {
if(factor < 10 && (client->max_buf_size / 10) < (rc / factor)) {
crm_notice("Compressed message exceeds %d0%% of the configured ipc limit (%u bytes), "
"consider setting PCMK_ipc_buffer to %u or higher",
factor, client->max_buf_size, 2 * client->max_buf_size);
factor++;
}
}
crm_trace("Sending from client: %s request id: %d bytes: %u timeout:%d msg...",
client->name, header->qb.id, header->qb.size, ms_timeout);
if (ms_timeout > 0 || is_not_set(flags, crm_ipc_client_response)) {
rc = internal_ipc_send_request(client, iov, ms_timeout);
if (rc <= 0) {
crm_trace("Failed to send from client %s request %d with %u bytes...",
client->name, header->qb.id, header->qb.size);
goto send_cleanup;
} else if (is_not_set(flags, crm_ipc_client_response)) {
crm_trace("Message sent, not waiting for reply to %d from %s to %u bytes...",
header->qb.id, client->name, header->qb.size);
goto send_cleanup;
}
rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout);
if (rc < 0) {
/* No reply, for now, disable sending
*
* The alternative is to close the connection since we don't know
* how to detect and discard out-of-sequence replies
*
* TODO - implement the above
*/
client->need_reply = TRUE;
}
} else {
rc = internal_ipc_send_recv(client, iov);
}
if (rc > 0) {
struct crm_ipc_response_header *hdr = (struct crm_ipc_response_header *)(void*)client->buffer;
crm_trace("Received response %d, size=%u, rc=%ld, text: %.200s", hdr->qb.id, hdr->qb.size,
rc, crm_ipc_buffer(client));
if (reply) {
*reply = string2xml(crm_ipc_buffer(client));
}
} else {
crm_trace("Response not received: rc=%ld, errno=%d", rc, errno);
}
send_cleanup:
if (crm_ipc_connected(client) == FALSE) {
crm_notice("Connection to %s closed: %s (%ld)", client->name, pcmk_strerror(rc), rc);
} else if (rc == -ETIMEDOUT) {
crm_warn("Request %d to %s (%p) failed: %s (%ld) after %dms",
header->qb.id, client->name, client->ipc, pcmk_strerror(rc), rc, ms_timeout);
crm_write_blackbox(0, NULL);
} else if (rc <= 0) {
crm_warn("Request %d to %s (%p) failed: %s (%ld)",
header->qb.id, client->name, client->ipc, pcmk_strerror(rc), rc);
}
free(header);
free(iov[1].iov_base);
free(iov);
return rc;
}
/* Utils */
xmlNode *
create_hello_message(const char *uuid,
const char *client_name, const char *major_version, const char *minor_version)
{
xmlNode *hello_node = NULL;
xmlNode *hello = NULL;
if (uuid == NULL || strlen(uuid) == 0
|| client_name == NULL || strlen(client_name) == 0
|| major_version == NULL || strlen(major_version) == 0
|| minor_version == NULL || strlen(minor_version) == 0) {
crm_err("Missing fields, Hello message will not be valid.");
return NULL;
}
hello_node = create_xml_node(NULL, XML_TAG_OPTIONS);
crm_xml_add(hello_node, "major_version", major_version);
crm_xml_add(hello_node, "minor_version", minor_version);
crm_xml_add(hello_node, "client_name", client_name);
crm_xml_add(hello_node, "client_uuid", uuid);
crm_trace("creating hello message");
hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid);
free_xml(hello_node);
return hello;
}
diff --git a/lib/common/remote.c b/lib/common/remote.c
index 85e12ea07a..915453d42d 100644
--- a/lib/common/remote.c
+++ b/lib/common/remote.c
@@ -1,922 +1,930 @@
/*
* Copyright (c) 2008 Andrew Beekhof
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include
const int psk_tls_kx_order[] = {
GNUTLS_KX_DHE_PSK,
GNUTLS_KX_PSK,
};
const int anon_tls_kx_order[] = {
GNUTLS_KX_ANON_DH,
GNUTLS_KX_DHE_RSA,
GNUTLS_KX_DHE_DSS,
GNUTLS_KX_RSA,
0
};
#endif
/* Swab macros from linux/swab.h */
#ifdef HAVE_LINUX_SWAB_H
# include
#else
/*
* casts are necessary for constants, because we never know how for sure
* how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
*/
#define __swab16(x) ((uint16_t)( \
(((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
(((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
#define __swab32(x) ((uint32_t)( \
(((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
(((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
(((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
(((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
#define __swab64(x) ((uint64_t)( \
(((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
(((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
(((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
(((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
(((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
(((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
(((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
(((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
#endif
#define REMOTE_MSG_VERSION 1
#define ENDIAN_LOCAL 0xBADADBBD
struct crm_remote_header_v0
{
uint32_t endian; /* Detect messages from hosts with different endian-ness */
uint32_t version;
uint64_t id;
uint64_t flags;
uint32_t size_total;
uint32_t payload_offset;
uint32_t payload_compressed;
uint32_t payload_uncompressed;
/* New fields get added here */
} __attribute__ ((packed));
static struct crm_remote_header_v0 *
crm_remote_header(crm_remote_t * remote)
{
struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
return NULL;
} else if(header->endian != ENDIAN_LOCAL) {
uint32_t endian = __swab32(header->endian);
CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
if(endian != ENDIAN_LOCAL) {
crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
ENDIAN_LOCAL, header->endian, endian);
return NULL;
}
header->id = __swab64(header->id);
header->flags = __swab64(header->flags);
header->endian = __swab32(header->endian);
header->version = __swab32(header->version);
header->size_total = __swab32(header->size_total);
header->payload_offset = __swab32(header->payload_offset);
header->payload_compressed = __swab32(header->payload_compressed);
header->payload_uncompressed = __swab32(header->payload_uncompressed);
}
return header;
}
#ifdef HAVE_GNUTLS_GNUTLS_H
int
crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
{
int rc = 0;
int pollrc = 0;
time_t start = time(NULL);
do {
rc = gnutls_handshake(*remote->tls_session);
if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
pollrc = crm_remote_ready(remote, 1000);
if (pollrc < 0) {
/* poll returned error, there is no hope */
rc = -1;
}
}
} while (((time(NULL) - start) < (timeout_ms / 1000)) &&
(rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
if (rc < 0) {
crm_trace("gnutls_handshake() failed with %d", rc);
}
return rc;
}
void *
crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
void *credentials)
{
gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
gnutls_init(session, type);
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
/* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
/* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
# else
gnutls_set_default_priority(*session);
gnutls_kx_set_priority(*session, anon_tls_kx_order);
# endif
gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
switch (type) {
case GNUTLS_SERVER:
gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
(gnutls_anon_server_credentials_t) credentials);
break;
case GNUTLS_CLIENT:
gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
(gnutls_anon_client_credentials_t) credentials);
break;
}
return session;
}
void *
create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
{
gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
gnutls_init(session, type);
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
# else
gnutls_set_default_priority(*session);
gnutls_kx_set_priority(*session, psk_tls_kx_order);
# endif
gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
switch (type) {
case GNUTLS_SERVER:
gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
(gnutls_psk_server_credentials_t) credentials);
break;
case GNUTLS_CLIENT:
gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
(gnutls_psk_client_credentials_t) credentials);
break;
}
return session;
}
static int
crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
{
const char *unsent = buf;
int rc = 0;
int total_send;
if (buf == NULL) {
return -1;
}
total_send = len;
- crm_trace("Message size: %zd", len);
+ crm_trace("Message size: %llu", (unsigned long long) len);
while (TRUE) {
rc = gnutls_record_send(*session, unsent, len);
if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
crm_debug("Retry");
} else if (rc < 0) {
crm_err("Connection terminated rc = %d", rc);
break;
} else if (rc < len) {
- crm_debug("Sent %d of %zd bytes", rc, len);
+ crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
len -= rc;
unsent += rc;
} else {
crm_trace("Sent all %d bytes", rc);
break;
}
}
return rc < 0 ? rc : total_send;
}
#endif
static int
crm_send_plaintext(int sock, const char *buf, size_t len)
{
int rc = 0;
const char *unsent = buf;
int total_send;
if (buf == NULL) {
return -1;
}
total_send = len;
- crm_trace("Message on socket %d: size=%zd", sock, len);
+ crm_trace("Message on socket %d: size=%llu",
+ sock, (unsigned long long) len);
retry:
rc = write(sock, unsent, len);
if (rc < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
crm_trace("Retry");
goto retry;
default:
crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
break;
}
} else if (rc < len) {
- crm_trace("Only sent %d of %zd remaining bytes", rc, len);
+ crm_trace("Only sent %d of %llu remaining bytes",
+ rc, (unsigned long long) len);
len -= rc;
unsent += rc;
goto retry;
} else {
crm_trace("Sent %d bytes: %.100s", rc, buf);
}
return rc < 0 ? rc : total_send;
}
static int
crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
{
int lpc = 0;
int rc = -ESOCKTNOSUPPORT;
for(; lpc < iovs; lpc++) {
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote->tls_session) {
rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
} else if (remote->tcp_socket) {
#else
if (remote->tcp_socket) {
#endif
rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
} else {
crm_err("Unsupported connection type");
}
}
return rc;
}
int
crm_remote_send(crm_remote_t * remote, xmlNode * msg)
{
int rc = -1;
static uint64_t id = 0;
char *xml_text = dump_xml_unformatted(msg);
struct iovec iov[2];
struct crm_remote_header_v0 *header;
if (xml_text == NULL) {
crm_err("Invalid XML, can not send msg");
return -1;
}
header = calloc(1, sizeof(struct crm_remote_header_v0));
iov[0].iov_base = header;
iov[0].iov_len = sizeof(struct crm_remote_header_v0);
iov[1].iov_base = xml_text;
iov[1].iov_len = 1 + strlen(xml_text);
id++;
header->id = id;
header->endian = ENDIAN_LOCAL;
header->version = REMOTE_MSG_VERSION;
header->payload_offset = iov[0].iov_len;
header->payload_uncompressed = iov[1].iov_len;
header->size_total = iov[0].iov_len + iov[1].iov_len;
crm_trace("Sending len[0]=%d, start=%x\n",
(int)iov[0].iov_len, *(int*)(void*)xml_text);
rc = crm_remote_sendv(remote, iov, 2);
if (rc < 0) {
crm_err("Failed to send remote msg, rc = %d", rc);
}
free(iov[0].iov_base);
free(iov[1].iov_base);
return rc;
}
/*!
* \internal
* \brief handles the recv buffer and parsing out msgs.
* \note new_data is owned by this function once it is passed in.
*/
xmlNode *
crm_remote_parse_buffer(crm_remote_t * remote)
{
xmlNode *xml = NULL;
struct crm_remote_header_v0 *header = crm_remote_header(remote);
if (remote->buffer == NULL || header == NULL) {
return NULL;
}
/* Support compression on the receiving end now, in case we ever want to add it later */
if (header->payload_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->payload_uncompressed;
char *uncompressed = calloc(1, header->payload_offset + size_u);
crm_trace("Decompressing message data %d bytes into %d bytes",
header->payload_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
remote->buffer + header->payload_offset,
header->payload_compressed, 1, 0);
if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
crm_warn("Couldn't decompress v%d message, we only understand v%d",
header->version, REMOTE_MSG_VERSION);
free(uncompressed);
return NULL;
} else if (rc != BZ_OK) {
crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
free(uncompressed);
return NULL;
}
CRM_ASSERT(size_u == header->payload_uncompressed);
memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
remote->buffer_size = header->payload_offset + size_u;
free(remote->buffer);
remote->buffer = uncompressed;
header = crm_remote_header(remote);
}
/* take ownership of the buffer */
remote->buffer_offset = 0;
CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
xml = string2xml(remote->buffer + header->payload_offset);
if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
crm_warn("Couldn't parse v%d message, we only understand v%d",
header->version, REMOTE_MSG_VERSION);
} else if (xml == NULL) {
crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
}
return xml;
}
/*!
* \internal
* \brief Determine if a remote session has data to read
*
* \retval 0, timeout occurred.
* \retval positive, data is ready to be read
* \retval negative, session has ended
*/
int
crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
{
struct pollfd fds = { 0, };
int sock = 0;
int rc = 0;
time_t start;
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote->tls_session) {
void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
sock = GPOINTER_TO_INT(sock_ptr);
} else if (remote->tcp_socket) {
#else
if (remote->tcp_socket) {
#endif
sock = remote->tcp_socket;
} else {
crm_err("Unsupported connection type");
}
if (sock <= 0) {
crm_trace("No longer connected");
return -ENOTCONN;
}
start = time(NULL);
errno = 0;
do {
fds.fd = sock;
fds.events = POLLIN;
/* If we got an EINTR while polling, and we have a
* specific timeout we are trying to honor, attempt
* to adjust the timeout to the closest second. */
if (errno == EINTR && (timeout > 0)) {
timeout = timeout - ((time(NULL) - start) * 1000);
if (timeout < 1000) {
timeout = 1000;
}
}
rc = poll(&fds, 1, timeout);
} while (rc < 0 && errno == EINTR);
return rc;
}
/*!
* \internal
* \brief Read bytes off non blocking remote connection.
*
* \note only use with NON-Blocking sockets. Should only be used after polling socket.
* This function will return once max_size is met, the socket read buffer
* is empty, or an error is encountered.
*
* \retval number of bytes received
*/
static size_t
crm_remote_recv_once(crm_remote_t * remote)
{
int rc = 0;
size_t read_len = sizeof(struct crm_remote_header_v0);
struct crm_remote_header_v0 *header = crm_remote_header(remote);
if(header) {
/* Stop at the end of the current message */
read_len = header->size_total;
}
/* automatically grow the buffer when needed */
if(remote->buffer_size < read_len) {
remote->buffer_size = 2 * read_len;
- crm_trace("Expanding buffer to %zu bytes", remote->buffer_size);
+ crm_trace("Expanding buffer to %llu bytes",
+ (unsigned long long) remote->buffer_size);
remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
CRM_ASSERT(remote->buffer != NULL);
}
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote->tls_session) {
rc = gnutls_record_recv(*(remote->tls_session),
remote->buffer + remote->buffer_offset,
remote->buffer_size - remote->buffer_offset);
if (rc == GNUTLS_E_INTERRUPTED) {
rc = -EINTR;
} else if (rc == GNUTLS_E_AGAIN) {
rc = -EAGAIN;
} else if (rc < 0) {
crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
rc = -pcmk_err_generic;
}
} else if (remote->tcp_socket) {
#else
if (remote->tcp_socket) {
#endif
errno = 0;
rc = read(remote->tcp_socket,
remote->buffer + remote->buffer_offset,
remote->buffer_size - remote->buffer_offset);
if(rc < 0) {
rc = -errno;
}
} else {
crm_err("Unsupported connection type");
return -ESOCKTNOSUPPORT;
}
/* process any errors. */
if (rc > 0) {
remote->buffer_offset += rc;
/* always null terminate buffer, the +1 to alloc always allows for this. */
remote->buffer[remote->buffer_offset] = '\0';
- crm_trace("Received %u more bytes, %zu total", rc, remote->buffer_offset);
+ crm_trace("Received %u more bytes, %llu total",
+ rc, (unsigned long long) remote->buffer_offset);
} else if (rc == -EINTR || rc == -EAGAIN) {
crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
} else if (rc == 0) {
- crm_debug("EOF encoutered after %zu bytes", remote->buffer_offset);
+ crm_debug("EOF encoutered after %llu bytes",
+ (unsigned long long) remote->buffer_offset);
return -ENOTCONN;
} else {
- crm_debug("Error receiving message after %zu bytes: %s (%d)",
- remote->buffer_offset, pcmk_strerror(rc), rc);
+ crm_debug("Error receiving message after %llu bytes: %s (%d)",
+ (unsigned long long) remote->buffer_offset,
+ pcmk_strerror(rc), rc);
return -ENOTCONN;
}
header = crm_remote_header(remote);
if(header) {
if(remote->buffer_offset < header->size_total) {
- crm_trace("Read less than the advertised length: %zu < %u bytes",
- remote->buffer_offset, header->size_total);
+ crm_trace("Read less than the advertised length: %llu < %u bytes",
+ (unsigned long long) remote->buffer_offset,
+ header->size_total);
} else {
- crm_trace("Read full message of %zu bytes", remote->buffer_offset);
+ crm_trace("Read full message of %llu bytes",
+ (unsigned long long) remote->buffer_offset);
return remote->buffer_offset;
}
}
return -EAGAIN;
}
/*!
* \internal
* \brief Read data off the socket until at least one full message is present or timeout occures.
* \retval TRUE message read
* \retval FALSE full message not read
*/
gboolean
crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
{
int rc;
time_t start = time(NULL);
int remaining_timeout = 0;
if (total_timeout == 0) {
total_timeout = 10000;
} else if (total_timeout < 0) {
total_timeout = 60000;
}
*disconnected = 0;
remaining_timeout = total_timeout;
while ((remaining_timeout > 0) && !(*disconnected)) {
/* read some more off the tls buffer if we still have time left. */
crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
total_timeout, remaining_timeout);
rc = crm_remote_ready(remote, remaining_timeout);
if (rc == 0) {
crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
return FALSE;
} else if (rc == -EAGAIN) {
crm_trace("waiting for remote connection data (up to %dms)",
remaining_timeout);
} else if(rc < 0) {
crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
} else {
rc = crm_remote_recv_once(remote);
if(rc > 0) {
return TRUE;
} else if (rc < 0) {
crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
}
}
if(rc == -ENOTCONN) {
*disconnected = 1;
return FALSE;
}
remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
}
return FALSE;
}
struct tcp_async_cb_data {
gboolean success;
int sock;
void *userdata;
void (*callback) (void *userdata, int sock);
int timeout; /*ms */
time_t start;
};
static gboolean
check_connect_finished(gpointer userdata)
{
struct tcp_async_cb_data *cb_data = userdata;
int rc = 0;
int sock = cb_data->sock;
int error = 0;
fd_set rset, wset;
socklen_t len = sizeof(error);
struct timeval ts = { 0, };
if (cb_data->success == TRUE) {
goto dispatch_done;
}
FD_ZERO(&rset);
FD_SET(sock, &rset);
wset = rset;
crm_trace("fd %d: checking to see if connect finished", sock);
rc = select(sock + 1, &rset, &wset, NULL, &ts);
if (rc < 0) {
rc = errno;
if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
/* reschedule if there is still time left */
if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
goto reschedule;
} else {
rc = -ETIMEDOUT;
}
}
crm_trace("fd %d: select failed %d connect dispatch ", sock, rc);
goto dispatch_done;
} else if (rc == 0) {
if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
goto reschedule;
}
crm_debug("fd %d: timeout during select", sock);
rc = -ETIMEDOUT;
goto dispatch_done;
} else {
crm_trace("fd %d: select returned success", sock);
rc = 0;
}
/* can we read or write to the socket now? */
if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
crm_trace("fd %d: call to getsockopt failed", sock);
rc = -1;
goto dispatch_done;
}
if (error) {
crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
rc = -1;
goto dispatch_done;
}
} else {
crm_trace("neither read nor write set after select");
rc = -1;
goto dispatch_done;
}
dispatch_done:
if (!rc) {
crm_trace("fd %d: connected", sock);
/* Success, set the return code to the sock to report to the callback */
rc = cb_data->sock;
cb_data->sock = 0;
} else {
close(sock);
}
if (cb_data->callback) {
cb_data->callback(cb_data->userdata, rc);
}
free(cb_data);
return FALSE;
reschedule:
/* will check again next interval */
return TRUE;
}
static int
internal_tcp_connect_async(int sock,
const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
{
int rc = 0;
int flag = 0;
int interval = 500;
int timer;
struct tcp_async_cb_data *cb_data = NULL;
if ((flag = fcntl(sock, F_GETFL)) >= 0) {
if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
crm_err("fcntl() write failed");
return -1;
}
}
rc = connect(sock, addr, addrlen);
if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
return -1;
}
cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
cb_data->userdata = userdata;
cb_data->callback = callback;
cb_data->sock = sock;
cb_data->timeout = timeout;
cb_data->start = time(NULL);
if (rc == 0) {
/* The connect was successful immediately, we still return to mainloop
* and let this callback get called later. This avoids the user of this api
* to have to account for the fact the callback could be invoked within this
* function before returning. */
cb_data->success = TRUE;
interval = 1;
}
/* Check connect finished is mostly doing a non-block poll on the socket
* to see if we can read/write to it. Once we can, the connect has completed.
* This method allows us to connect to the server without blocking mainloop.
*
* This is a poor man's way of polling to see when the connection finished.
* At some point we should figure out a way to use a mainloop fd callback for this.
* Something about the way mainloop is currently polling prevents this from working at the
* moment though. */
crm_trace("fd %d: scheduling to check if connect finished in %dms second", sock, interval);
timer = g_timeout_add(interval, check_connect_finished, cb_data);
if (timer_id) {
*timer_id = timer;
}
return 0;
}
static int
internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
{
int flag = 0;
int rc = connect(sock, addr, addrlen);
if (rc == 0) {
if ((flag = fcntl(sock, F_GETFL)) >= 0) {
if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
crm_err("fcntl() write failed");
return -1;
}
}
}
return rc;
}
/*!
* \internal
* \brief tcp connection to server at specified port
* \retval negative, failed to connect.
* \retval positive, sock fd
*/
int
crm_remote_tcp_connect_async(const char *host, int port, int timeout, /*ms */
int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
{
char buffer[256];
struct addrinfo *res = NULL;
struct addrinfo *rp = NULL;
struct addrinfo hints;
const char *server = host;
int ret_ga;
int sock = -1;
/* getaddrinfo */
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_CANONNAME;
crm_debug("Looking up %s", server);
ret_ga = getaddrinfo(server, NULL, &hints, &res);
if (ret_ga) {
crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
return -1;
}
if (!res || !res->ai_addr) {
crm_err("getaddrinfo failed");
goto async_cleanup;
}
for (rp = res; rp != NULL; rp = rp->ai_next) {
struct sockaddr *addr = rp->ai_addr;
if (!addr) {
continue;
}
if (rp->ai_canonname) {
server = res->ai_canonname;
}
crm_debug("Got address %s for %s", server, host);
/* create socket */
sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
crm_err("Socket creation failed for remote client connection.");
continue;
}
memset(buffer, 0, DIMOF(buffer));
if (addr->sa_family == AF_INET6) {
struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *)(void*)addr;
addr_in->sin6_port = htons(port);
inet_ntop(addr->sa_family, &addr_in->sin6_addr, buffer, DIMOF(buffer));
} else {
struct sockaddr_in *addr_in = (struct sockaddr_in *)(void*)addr;
addr_in->sin_port = htons(port);
inet_ntop(addr->sa_family, &addr_in->sin_addr, buffer, DIMOF(buffer));
}
crm_info("Attempting to connect to remote server at %s:%d", buffer, port);
if (callback) {
if (internal_tcp_connect_async
(sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
goto async_cleanup; /* Success for now, we'll hear back later in the callback */
}
} else {
if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
break; /* Success */
}
}
close(sock);
sock = -1;
}
async_cleanup:
if (res) {
freeaddrinfo(res);
}
return sock;
}
int
crm_remote_tcp_connect(const char *host, int port)
{
return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
}
diff --git a/lib/lrmd/lrmd_client.c b/lib/lrmd/lrmd_client.c
index 3208d8e436..d214a22512 100644
--- a/lib/lrmd/lrmd_client.c
+++ b/lib/lrmd/lrmd_client.c
@@ -1,2229 +1,2231 @@
/*
* Copyright (c) 2012 David Vossel
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include
#endif
#include
#include
#include
#include
#include
#define MAX_TLS_RECV_WAIT 10000
CRM_TRACE_INIT_DATA(lrmd);
static int lrmd_api_disconnect(lrmd_t * lrmd);
static int lrmd_api_is_connected(lrmd_t * lrmd);
/* IPC proxy functions */
int lrmd_internal_proxy_send(lrmd_t * lrmd, xmlNode *msg);
static void lrmd_internal_proxy_dispatch(lrmd_t *lrmd, xmlNode *msg);
void lrmd_internal_set_proxy_callback(lrmd_t * lrmd, void *userdata, void (*callback)(lrmd_t *lrmd, void *userdata, xmlNode *msg));
#ifdef HAVE_GNUTLS_GNUTLS_H
# define LRMD_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */
gnutls_psk_client_credentials_t psk_cred_s;
int lrmd_tls_set_key(gnutls_datum_t * key);
static void lrmd_tls_disconnect(lrmd_t * lrmd);
static int global_remote_msg_id = 0;
int lrmd_tls_send_msg(crm_remote_t * session, xmlNode * msg, uint32_t id, const char *msg_type);
static void lrmd_tls_connection_destroy(gpointer userdata);
#endif
typedef struct lrmd_private_s {
enum client_type type;
char *token;
mainloop_io_t *source;
/* IPC parameters */
crm_ipc_t *ipc;
crm_remote_t *remote;
/* Extra TLS parameters */
char *remote_nodename;
#ifdef HAVE_GNUTLS_GNUTLS_H
char *server;
int port;
gnutls_psk_client_credentials_t psk_cred_c;
/* while the async connection is occuring, this is the id
* of the connection timeout timer. */
int async_timer;
int sock;
/* since tls requires a round trip across the network for a
* request/reply, there are times where we just want to be able
* to send a request from the client and not wait around (or even care
* about) what the reply is. */
int expected_late_replies;
GList *pending_notify;
crm_trigger_t *process_notify;
#endif
lrmd_event_callback callback;
/* Internal IPC proxy msg passing for remote guests */
void (*proxy_callback)(lrmd_t *lrmd, void *userdata, xmlNode *msg);
void *proxy_callback_userdata;
char *peer_version;
} lrmd_private_t;
static lrmd_list_t *
lrmd_list_add(lrmd_list_t * head, const char *value)
{
lrmd_list_t *p, *end;
p = calloc(1, sizeof(lrmd_list_t));
p->val = strdup(value);
end = head;
while (end && end->next) {
end = end->next;
}
if (end) {
end->next = p;
} else {
head = p;
}
return head;
}
void
lrmd_list_freeall(lrmd_list_t * head)
{
lrmd_list_t *p;
while (head) {
char *val = (char *)head->val;
p = head->next;
free(val);
free(head);
head = p;
}
}
lrmd_key_value_t *
lrmd_key_value_add(lrmd_key_value_t * head, const char *key, const char *value)
{
lrmd_key_value_t *p, *end;
p = calloc(1, sizeof(lrmd_key_value_t));
p->key = strdup(key);
p->value = strdup(value);
end = head;
while (end && end->next) {
end = end->next;
}
if (end) {
end->next = p;
} else {
head = p;
}
return head;
}
void
lrmd_key_value_freeall(lrmd_key_value_t * head)
{
lrmd_key_value_t *p;
while (head) {
p = head->next;
free(head->key);
free(head->value);
free(head);
head = p;
}
}
static void
dup_attr(gpointer key, gpointer value, gpointer user_data)
{
g_hash_table_replace(user_data, strdup(key), strdup(value));
}
lrmd_event_data_t *
lrmd_copy_event(lrmd_event_data_t * event)
{
lrmd_event_data_t *copy = NULL;
copy = calloc(1, sizeof(lrmd_event_data_t));
/* This will get all the int values.
* we just have to be careful not to leave any
* dangling pointers to strings. */
memcpy(copy, event, sizeof(lrmd_event_data_t));
copy->rsc_id = event->rsc_id ? strdup(event->rsc_id) : NULL;
copy->op_type = event->op_type ? strdup(event->op_type) : NULL;
copy->user_data = event->user_data ? strdup(event->user_data) : NULL;
copy->output = event->output ? strdup(event->output) : NULL;
copy->exit_reason = event->exit_reason ? strdup(event->exit_reason) : NULL;
copy->remote_nodename = event->remote_nodename ? strdup(event->remote_nodename) : NULL;
if (event->params) {
copy->params = g_hash_table_new_full(crm_str_hash,
g_str_equal, g_hash_destroy_str, g_hash_destroy_str);
if (copy->params != NULL) {
g_hash_table_foreach(event->params, dup_attr, copy->params);
}
}
return copy;
}
void
lrmd_free_event(lrmd_event_data_t * event)
{
if (!event) {
return;
}
/* free gives me grief if i try to cast */
free((char *)event->rsc_id);
free((char *)event->op_type);
free((char *)event->user_data);
free((char *)event->output);
free((char *)event->exit_reason);
free((char *)event->remote_nodename);
if (event->params) {
g_hash_table_destroy(event->params);
}
free(event);
}
static int
lrmd_dispatch_internal(lrmd_t * lrmd, xmlNode * msg)
{
const char *type;
const char *proxy_session = crm_element_value(msg, F_LRMD_IPC_SESSION);
lrmd_private_t *native = lrmd->private;
lrmd_event_data_t event = { 0, };
if (proxy_session != NULL) {
/* this is proxy business */
lrmd_internal_proxy_dispatch(lrmd, msg);
return 1;
} else if (!native->callback) {
/* no callback set */
crm_trace("notify event received but client has not set callback");
return 1;
}
event.remote_nodename = native->remote_nodename;
type = crm_element_value(msg, F_LRMD_OPERATION);
crm_element_value_int(msg, F_LRMD_CALLID, &event.call_id);
event.rsc_id = crm_element_value(msg, F_LRMD_RSC_ID);
if (crm_str_eq(type, LRMD_OP_RSC_REG, TRUE)) {
event.type = lrmd_event_register;
} else if (crm_str_eq(type, LRMD_OP_RSC_UNREG, TRUE)) {
event.type = lrmd_event_unregister;
} else if (crm_str_eq(type, LRMD_OP_RSC_EXEC, TRUE)) {
crm_element_value_int(msg, F_LRMD_TIMEOUT, &event.timeout);
crm_element_value_int(msg, F_LRMD_RSC_INTERVAL, &event.interval);
crm_element_value_int(msg, F_LRMD_RSC_START_DELAY, &event.start_delay);
crm_element_value_int(msg, F_LRMD_EXEC_RC, (int *)&event.rc);
crm_element_value_int(msg, F_LRMD_OP_STATUS, &event.op_status);
crm_element_value_int(msg, F_LRMD_RSC_DELETED, &event.rsc_deleted);
crm_element_value_int(msg, F_LRMD_RSC_RUN_TIME, (int *)&event.t_run);
crm_element_value_int(msg, F_LRMD_RSC_RCCHANGE_TIME, (int *)&event.t_rcchange);
crm_element_value_int(msg, F_LRMD_RSC_EXEC_TIME, (int *)&event.exec_time);
crm_element_value_int(msg, F_LRMD_RSC_QUEUE_TIME, (int *)&event.queue_time);
event.op_type = crm_element_value(msg, F_LRMD_RSC_ACTION);
event.user_data = crm_element_value(msg, F_LRMD_RSC_USERDATA_STR);
event.output = crm_element_value(msg, F_LRMD_RSC_OUTPUT);
event.exit_reason = crm_element_value(msg, F_LRMD_RSC_EXIT_REASON);
event.type = lrmd_event_exec_complete;
event.params = xml2list(msg);
} else if (crm_str_eq(type, LRMD_OP_NEW_CLIENT, TRUE)) {
event.type = lrmd_event_new_client;
} else if (crm_str_eq(type, LRMD_OP_POKE, TRUE)) {
event.type = lrmd_event_poke;
} else {
return 1;
}
crm_trace("op %s notify event received", type);
native->callback(&event);
if (event.params) {
g_hash_table_destroy(event.params);
}
return 1;
}
static int
lrmd_ipc_dispatch(const char *buffer, ssize_t length, gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
xmlNode *msg;
int rc;
if (!native->callback) {
/* no callback set */
return 1;
}
msg = string2xml(buffer);
rc = lrmd_dispatch_internal(lrmd, msg);
free_xml(msg);
return rc;
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_free_xml(gpointer userdata)
{
free_xml((xmlNode *) userdata);
}
static int
lrmd_tls_connected(lrmd_t * lrmd)
{
lrmd_private_t *native = lrmd->private;
if (native->remote->tls_session) {
return TRUE;
}
return FALSE;
}
static int
lrmd_tls_dispatch(gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
xmlNode *xml = NULL;
int rc = 0;
int disconnected = 0;
if (lrmd_tls_connected(lrmd) == FALSE) {
crm_trace("tls dispatch triggered after disconnect");
return 0;
}
crm_trace("tls_dispatch triggered");
/* First check if there are any pending notifies to process that came
* while we were waiting for replies earlier. */
if (native->pending_notify) {
GList *iter = NULL;
crm_trace("Processing pending notifies");
for (iter = native->pending_notify; iter; iter = iter->next) {
lrmd_dispatch_internal(lrmd, iter->data);
}
g_list_free_full(native->pending_notify, lrmd_free_xml);
native->pending_notify = NULL;
}
/* Next read the current buffer and see if there are any messages to handle. */
rc = crm_remote_ready(native->remote, 0);
if (rc == 0) {
/* nothing to read, see if any full messages are already in buffer. */
xml = crm_remote_parse_buffer(native->remote);
} else if (rc < 0) {
disconnected = 1;
} else {
crm_remote_recv(native->remote, -1, &disconnected);
xml = crm_remote_parse_buffer(native->remote);
}
while (xml) {
const char *msg_type = crm_element_value(xml, F_LRMD_REMOTE_MSG_TYPE);
if (safe_str_eq(msg_type, "notify")) {
lrmd_dispatch_internal(lrmd, xml);
} else if (safe_str_eq(msg_type, "reply")) {
if (native->expected_late_replies > 0) {
native->expected_late_replies--;
} else {
int reply_id = 0;
crm_element_value_int(xml, F_LRMD_CALLID, &reply_id);
/* if this happens, we want to know about it */
crm_err("Got outdated reply %d", reply_id);
}
}
free_xml(xml);
xml = crm_remote_parse_buffer(native->remote);
}
if (disconnected) {
crm_info("Server disconnected while reading remote server msg.");
lrmd_tls_disconnect(lrmd);
return 0;
}
return 1;
}
#endif
/* Not used with mainloop */
int
lrmd_poll(lrmd_t * lrmd, int timeout)
{
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
return crm_ipc_ready(native->ipc);
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
if (native->pending_notify) {
return 1;
}
return crm_remote_ready(native->remote, 0);
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return 0;
}
/* Not used with mainloop */
bool
lrmd_dispatch(lrmd_t * lrmd)
{
lrmd_private_t *private = NULL;
CRM_ASSERT(lrmd != NULL);
private = lrmd->private;
switch (private->type) {
case CRM_CLIENT_IPC:
while (crm_ipc_ready(private->ipc)) {
if (crm_ipc_read(private->ipc) > 0) {
const char *msg = crm_ipc_buffer(private->ipc);
lrmd_ipc_dispatch(msg, strlen(msg), lrmd);
}
}
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
lrmd_tls_dispatch(lrmd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", private->type);
}
if (lrmd_api_is_connected(lrmd) == FALSE) {
crm_err("Connection closed");
return FALSE;
}
return TRUE;
}
static xmlNode *
lrmd_create_op(const char *token, const char *op, xmlNode * data, enum lrmd_call_options options)
{
xmlNode *op_msg = create_xml_node(NULL, "lrmd_command");
CRM_CHECK(op_msg != NULL, return NULL);
CRM_CHECK(token != NULL, return NULL);
crm_xml_add(op_msg, F_XML_TAGNAME, "lrmd_command");
crm_xml_add(op_msg, F_TYPE, T_LRMD);
crm_xml_add(op_msg, F_LRMD_CALLBACK_TOKEN, token);
crm_xml_add(op_msg, F_LRMD_OPERATION, op);
crm_trace("Sending call options: %.8lx, %d", (long)options, options);
crm_xml_add_int(op_msg, F_LRMD_CALLOPTS, options);
if (data != NULL) {
add_message_xml(op_msg, F_LRMD_CALLDATA, data);
}
return op_msg;
}
static void
lrmd_ipc_connection_destroy(gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
crm_info("IPC connection destroyed");
/* Prevent these from being cleaned up in lrmd_api_disconnect() */
native->ipc = NULL;
native->source = NULL;
if (native->callback) {
lrmd_event_data_t event = { 0, };
event.type = lrmd_event_disconnect;
event.remote_nodename = native->remote_nodename;
native->callback(&event);
}
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_tls_connection_destroy(gpointer userdata)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
crm_info("TLS connection destroyed");
if (native->remote->tls_session) {
gnutls_bye(*native->remote->tls_session, GNUTLS_SHUT_RDWR);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
}
if (native->psk_cred_c) {
gnutls_psk_free_client_credentials(native->psk_cred_c);
}
if (native->sock) {
close(native->sock);
}
if (native->process_notify) {
mainloop_destroy_trigger(native->process_notify);
native->process_notify = NULL;
}
if (native->pending_notify) {
g_list_free_full(native->pending_notify, lrmd_free_xml);
native->pending_notify = NULL;
}
free(native->remote->buffer);
native->remote->buffer = NULL;
native->source = 0;
native->sock = 0;
native->psk_cred_c = NULL;
native->remote->tls_session = NULL;
native->sock = 0;
if (native->callback) {
lrmd_event_data_t event = { 0, };
event.remote_nodename = native->remote_nodename;
event.type = lrmd_event_disconnect;
native->callback(&event);
}
return;
}
int
lrmd_tls_send_msg(crm_remote_t * session, xmlNode * msg, uint32_t id, const char *msg_type)
{
int rc = -1;
crm_xml_add_int(msg, F_LRMD_REMOTE_MSG_ID, id);
crm_xml_add(msg, F_LRMD_REMOTE_MSG_TYPE, msg_type);
rc = crm_remote_send(session, msg);
if (rc < 0) {
crm_err("Failed to send remote lrmd tls msg, rc = %d", rc);
return rc;
}
return rc;
}
static xmlNode *
lrmd_tls_recv_reply(lrmd_t * lrmd, int total_timeout, int expected_reply_id, int *disconnected)
{
lrmd_private_t *native = lrmd->private;
xmlNode *xml = NULL;
time_t start = time(NULL);
const char *msg_type = NULL;
int reply_id = 0;
int remaining_timeout = 0;
/* A timeout of 0 here makes no sense. We have to wait a period of time
* for the response to come back. If -1 or 0, default to 10 seconds. */
if (total_timeout <= 0 || total_timeout > MAX_TLS_RECV_WAIT) {
total_timeout = MAX_TLS_RECV_WAIT;
}
while (!xml) {
xml = crm_remote_parse_buffer(native->remote);
if (!xml) {
/* read some more off the tls buffer if we still have time left. */
if (remaining_timeout) {
remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
} else {
remaining_timeout = total_timeout;
}
if (remaining_timeout <= 0) {
crm_err("Never received the expected reply during the timeout period, disconnecting.");
*disconnected = TRUE;
return NULL;
}
crm_remote_recv(native->remote, remaining_timeout, disconnected);
xml = crm_remote_parse_buffer(native->remote);
if (!xml) {
crm_err("Unable to receive expected reply, disconnecting.");
*disconnected = TRUE;
return NULL;
} else if (*disconnected) {
return NULL;
}
}
CRM_ASSERT(xml != NULL);
crm_element_value_int(xml, F_LRMD_REMOTE_MSG_ID, &reply_id);
msg_type = crm_element_value(xml, F_LRMD_REMOTE_MSG_TYPE);
if (!msg_type) {
crm_err("Empty msg type received while waiting for reply");
free_xml(xml);
xml = NULL;
} else if (safe_str_eq(msg_type, "notify")) {
/* got a notify while waiting for reply, trigger the notify to be processed later */
crm_info("queueing notify");
native->pending_notify = g_list_append(native->pending_notify, xml);
if (native->process_notify) {
crm_info("notify trigger set.");
mainloop_set_trigger(native->process_notify);
}
xml = NULL;
} else if (safe_str_neq(msg_type, "reply")) {
/* msg isn't a reply, make some noise */
crm_err("Expected a reply, got %s", msg_type);
free_xml(xml);
xml = NULL;
} else if (reply_id != expected_reply_id) {
if (native->expected_late_replies > 0) {
native->expected_late_replies--;
} else {
crm_err("Got outdated reply, expected id %d got id %d", expected_reply_id, reply_id);
}
free_xml(xml);
xml = NULL;
}
}
if (native->remote->buffer && native->process_notify) {
mainloop_set_trigger(native->process_notify);
}
return xml;
}
static int
lrmd_tls_send(lrmd_t * lrmd, xmlNode * msg)
{
int rc = 0;
lrmd_private_t *native = lrmd->private;
global_remote_msg_id++;
if (global_remote_msg_id <= 0) {
global_remote_msg_id = 1;
}
rc = lrmd_tls_send_msg(native->remote, msg, global_remote_msg_id, "request");
if (rc <= 0) {
crm_err("Remote lrmd send failed, disconnecting");
lrmd_tls_disconnect(lrmd);
return -ENOTCONN;
}
return pcmk_ok;
}
static int
lrmd_tls_send_recv(lrmd_t * lrmd, xmlNode * msg, int timeout, xmlNode ** reply)
{
int rc = 0;
int disconnected = 0;
xmlNode *xml = NULL;
if (lrmd_tls_connected(lrmd) == FALSE) {
return -1;
}
rc = lrmd_tls_send(lrmd, msg);
if (rc < 0) {
return rc;
}
xml = lrmd_tls_recv_reply(lrmd, timeout, global_remote_msg_id, &disconnected);
if (disconnected) {
crm_err("Remote lrmd server disconnected while waiting for reply with id %d. ",
global_remote_msg_id);
lrmd_tls_disconnect(lrmd);
rc = -ENOTCONN;
} else if (!xml) {
crm_err("Remote lrmd never received reply for request id %d. timeout: %dms ",
global_remote_msg_id, timeout);
rc = -ECOMM;
}
if (reply) {
*reply = xml;
} else {
free_xml(xml);
}
return rc;
}
#endif
static int
lrmd_send_xml(lrmd_t * lrmd, xmlNode * msg, int timeout, xmlNode ** reply)
{
int rc = -1;
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
rc = crm_ipc_send(native->ipc, msg, crm_ipc_client_response, timeout, reply);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_send_recv(lrmd, msg, timeout, reply);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return rc;
}
static int
lrmd_send_xml_no_reply(lrmd_t * lrmd, xmlNode * msg)
{
int rc = -1;
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
rc = crm_ipc_send(native->ipc, msg, crm_ipc_flags_none, 0, NULL);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_send(lrmd, msg);
if (rc == pcmk_ok) {
/* we don't want to wait around for the reply, but
* since the request/reply protocol needs to behave the same
* as libqb, a reply will eventually come later anyway. */
native->expected_late_replies++;
}
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return rc;
}
static int
lrmd_api_is_connected(lrmd_t * lrmd)
{
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
return crm_ipc_connected(native->ipc);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
return lrmd_tls_connected(lrmd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return 0;
}
static int
lrmd_send_command(lrmd_t * lrmd, const char *op, xmlNode * data, xmlNode ** output_data, int timeout, /* ms. defaults to 1000 if set to 0 */
enum lrmd_call_options options, gboolean expect_reply)
{ /* TODO we need to reduce usage of this boolean */
int rc = pcmk_ok;
int reply_id = -1;
lrmd_private_t *native = lrmd->private;
xmlNode *op_msg = NULL;
xmlNode *op_reply = NULL;
if (!lrmd_api_is_connected(lrmd)) {
return -ENOTCONN;
}
if (op == NULL) {
crm_err("No operation specified");
return -EINVAL;
}
CRM_CHECK(native->token != NULL,;
);
crm_trace("sending %s op to lrmd", op);
op_msg = lrmd_create_op(native->token, op, data, options);
if (op_msg == NULL) {
return -EINVAL;
}
crm_xml_add_int(op_msg, F_LRMD_TIMEOUT, timeout);
if (expect_reply) {
rc = lrmd_send_xml(lrmd, op_msg, timeout, &op_reply);
} else {
rc = lrmd_send_xml_no_reply(lrmd, op_msg);
goto done;
}
if (rc < 0) {
crm_perror(LOG_ERR, "Couldn't perform %s operation (timeout=%d): %d", op, timeout, rc);
rc = -ECOMM;
goto done;
} else if(op_reply == NULL) {
rc = -ENOMSG;
goto done;
}
rc = pcmk_ok;
crm_element_value_int(op_reply, F_LRMD_CALLID, &reply_id);
crm_trace("%s op reply received", op);
if (crm_element_value_int(op_reply, F_LRMD_RC, &rc) != 0) {
rc = -ENOMSG;
goto done;
}
crm_log_xml_trace(op_reply, "Reply");
if (output_data) {
*output_data = op_reply;
op_reply = NULL; /* Prevent subsequent free */
}
done:
if (lrmd_api_is_connected(lrmd) == FALSE) {
crm_err("LRMD disconnected");
}
free_xml(op_msg);
free_xml(op_reply);
return rc;
}
static int
lrmd_api_poke_connection(lrmd_t * lrmd)
{
int rc;
lrmd_private_t *native = lrmd->private;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
rc = lrmd_send_command(lrmd, LRMD_OP_POKE, data, NULL, 0, 0, native->type == CRM_CLIENT_IPC ? TRUE : FALSE);
free_xml(data);
return rc < 0 ? rc : pcmk_ok;
}
int
remote_proxy_check(lrmd_t * lrmd, GHashTable *hash)
{
int rc;
const char *value;
lrmd_private_t *native = lrmd->private;
xmlNode *data = create_xml_node(NULL, F_LRMD_OPERATION);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
value = g_hash_table_lookup(hash, "stonith-watchdog-timeout");
crm_xml_add(data, F_LRMD_WATCHDOG, value);
rc = lrmd_send_command(lrmd, LRMD_OP_CHECK, data, NULL, 0, 0, native->type == CRM_CLIENT_IPC ? TRUE : FALSE);
free_xml(data);
return rc < 0 ? rc : pcmk_ok;
}
static int
lrmd_handshake(lrmd_t * lrmd, const char *name)
{
int rc = pcmk_ok;
lrmd_private_t *native = lrmd->private;
xmlNode *reply = NULL;
xmlNode *hello = create_xml_node(NULL, "lrmd_command");
crm_xml_add(hello, F_TYPE, T_LRMD);
crm_xml_add(hello, F_LRMD_OPERATION, CRM_OP_REGISTER);
crm_xml_add(hello, F_LRMD_CLIENTNAME, name);
crm_xml_add(hello, F_LRMD_PROTOCOL_VERSION, LRMD_PROTOCOL_VERSION);
/* advertise that we are a proxy provider */
if (native->proxy_callback) {
crm_xml_add(hello, F_LRMD_IS_IPC_PROVIDER, "true");
}
rc = lrmd_send_xml(lrmd, hello, -1, &reply);
if (rc < 0) {
crm_perror(LOG_DEBUG, "Couldn't complete registration with the lrmd API: %d", rc);
rc = -ECOMM;
} else if (reply == NULL) {
crm_err("Did not receive registration reply");
rc = -EPROTO;
} else {
const char *version = crm_element_value(reply, F_LRMD_PROTOCOL_VERSION);
const char *msg_type = crm_element_value(reply, F_LRMD_OPERATION);
const char *tmp_ticket = crm_element_value(reply, F_LRMD_CLIENTID);
crm_element_value_int(reply, F_LRMD_RC, &rc);
if (rc == -EPROTO) {
crm_err("LRMD protocol mismatch client version %s, server version %s",
LRMD_PROTOCOL_VERSION, version);
crm_log_xml_err(reply, "Protocol Error");
} else if (safe_str_neq(msg_type, CRM_OP_REGISTER)) {
crm_err("Invalid registration message: %s", msg_type);
crm_log_xml_err(reply, "Bad reply");
rc = -EPROTO;
} else if (tmp_ticket == NULL) {
crm_err("No registration token provided");
crm_log_xml_err(reply, "Bad reply");
rc = -EPROTO;
} else {
crm_trace("Obtained registration token: %s", tmp_ticket);
native->token = strdup(tmp_ticket);
native->peer_version = strdup(version?version:"1.0"); /* Included since 1.1 */
rc = pcmk_ok;
}
}
free_xml(reply);
free_xml(hello);
if (rc != pcmk_ok) {
lrmd_api_disconnect(lrmd);
}
return rc;
}
static int
lrmd_ipc_connect(lrmd_t * lrmd, int *fd)
{
int rc = pcmk_ok;
lrmd_private_t *native = lrmd->private;
static struct ipc_client_callbacks lrmd_callbacks = {
.dispatch = lrmd_ipc_dispatch,
.destroy = lrmd_ipc_connection_destroy
};
crm_info("Connecting to lrmd");
if (fd) {
/* No mainloop */
native->ipc = crm_ipc_new(CRM_SYSTEM_LRMD, 0);
if (native->ipc && crm_ipc_connect(native->ipc)) {
*fd = crm_ipc_get_fd(native->ipc);
} else if (native->ipc) {
crm_perror(LOG_ERR, "Connection to local resource manager failed");
rc = -ENOTCONN;
}
} else {
native->source = mainloop_add_ipc_client(CRM_SYSTEM_LRMD, G_PRIORITY_HIGH, 0, lrmd, &lrmd_callbacks);
native->ipc = mainloop_get_ipc_client(native->source);
}
if (native->ipc == NULL) {
crm_debug("Could not connect to the LRMD API");
rc = -ENOTCONN;
}
return rc;
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static int
set_key(gnutls_datum_t * key, const char *location)
{
FILE *stream;
int read_len = 256;
int cur_len = 0;
int buf_len = read_len;
static char *key_cache = NULL;
static size_t key_cache_len = 0;
static time_t key_cache_updated;
if (location == NULL) {
return -1;
}
if (key_cache) {
time_t now = time(NULL);
if ((now - key_cache_updated) < 60) {
key->data = gnutls_malloc(key_cache_len + 1);
key->size = key_cache_len;
memcpy(key->data, key_cache, key_cache_len);
crm_debug("using cached LRMD key");
return 0;
} else {
key_cache_len = 0;
key_cache_updated = 0;
free(key_cache);
key_cache = NULL;
crm_debug("clearing lrmd key cache");
}
}
stream = fopen(location, "r");
if (!stream) {
return -1;
}
key->data = gnutls_malloc(read_len);
while (!feof(stream)) {
int next;
if (cur_len == buf_len) {
buf_len = cur_len + read_len;
key->data = gnutls_realloc(key->data, buf_len);
}
next = fgetc(stream);
if (next == EOF && feof(stream)) {
break;
}
key->data[cur_len] = next;
cur_len++;
}
fclose(stream);
key->size = cur_len;
if (!cur_len) {
gnutls_free(key->data);
key->data = 0;
return -1;
}
if (!key_cache) {
key_cache = calloc(1, key->size + 1);
memcpy(key_cache, key->data, key->size);
key_cache_len = key->size;
key_cache_updated = time(NULL);
}
return 0;
}
int
lrmd_tls_set_key(gnutls_datum_t * key)
{
int rc = 0;
const char *specific_location = getenv("PCMK_authkey_location");
if (set_key(key, specific_location) == 0) {
crm_debug("Using custom authkey location %s", specific_location);
return 0;
} else if (specific_location) {
crm_err("No valid lrmd remote key found at %s, trying default location", specific_location);
}
if (set_key(key, DEFAULT_REMOTE_KEY_LOCATION) != 0) {
rc = set_key(key, ALT_REMOTE_KEY_LOCATION);
}
if (rc) {
crm_err("No valid lrmd remote key found at %s", DEFAULT_REMOTE_KEY_LOCATION);
return -1;
}
return rc;
}
static void
lrmd_gnutls_global_init(void)
{
static int gnutls_init = 0;
if (!gnutls_init) {
crm_gnutls_global_init();
}
gnutls_init = 1;
}
#endif
static void
report_async_connection_result(lrmd_t * lrmd, int rc)
{
lrmd_private_t *native = lrmd->private;
if (native->callback) {
lrmd_event_data_t event = { 0, };
event.type = lrmd_event_connect;
event.remote_nodename = native->remote_nodename;
event.connection_rc = rc;
native->callback(&event);
}
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_tcp_connect_cb(void *userdata, int sock)
{
lrmd_t *lrmd = userdata;
lrmd_private_t *native = lrmd->private;
char name[256] = { 0, };
static struct mainloop_fd_callbacks lrmd_tls_callbacks = {
.dispatch = lrmd_tls_dispatch,
.destroy = lrmd_tls_connection_destroy,
};
int rc = sock;
gnutls_datum_t psk_key = { NULL, 0 };
native->async_timer = 0;
if (rc < 0) {
lrmd_tls_connection_destroy(lrmd);
crm_info("remote lrmd connect to %s at port %d failed", native->server, native->port);
report_async_connection_result(lrmd, rc);
return;
}
/* TODO continue with tls stuff now that tcp connect passed. make this async as well soon
* to avoid all blocking code in the client. */
native->sock = sock;
if (lrmd_tls_set_key(&psk_key) != 0) {
lrmd_tls_connection_destroy(lrmd);
return;
}
gnutls_psk_allocate_client_credentials(&native->psk_cred_c);
gnutls_psk_set_client_credentials(native->psk_cred_c, DEFAULT_REMOTE_USERNAME, &psk_key, GNUTLS_PSK_KEY_RAW);
gnutls_free(psk_key.data);
native->remote->tls_session = create_psk_tls_session(sock, GNUTLS_CLIENT, native->psk_cred_c);
if (crm_initiate_client_tls_handshake(native->remote, LRMD_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
crm_warn("Client tls handshake failed for server %s:%d. Disconnecting", native->server,
native->port);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
native->remote->tls_session = NULL;
lrmd_tls_connection_destroy(lrmd);
report_async_connection_result(lrmd, -1);
return;
}
crm_info("Remote lrmd client TLS connection established with server %s:%d", native->server,
native->port);
snprintf(name, 128, "remote-lrmd-%s:%d", native->server, native->port);
native->process_notify = mainloop_add_trigger(G_PRIORITY_HIGH, lrmd_tls_dispatch, lrmd);
native->source =
mainloop_add_fd(name, G_PRIORITY_HIGH, native->sock, lrmd, &lrmd_tls_callbacks);
rc = lrmd_handshake(lrmd, name);
report_async_connection_result(lrmd, rc);
return;
}
static int
lrmd_tls_connect_async(lrmd_t * lrmd, int timeout /*ms */ )
{
int rc = -1;
int sock = 0;
int timer_id = 0;
lrmd_private_t *native = lrmd->private;
lrmd_gnutls_global_init();
sock = crm_remote_tcp_connect_async(native->server, native->port, timeout, &timer_id, lrmd,
lrmd_tcp_connect_cb);
if (sock != -1) {
native->sock = sock;
rc = 0;
native->async_timer = timer_id;
}
return rc;
}
static int
lrmd_tls_connect(lrmd_t * lrmd, int *fd)
{
static struct mainloop_fd_callbacks lrmd_tls_callbacks = {
.dispatch = lrmd_tls_dispatch,
.destroy = lrmd_tls_connection_destroy,
};
lrmd_private_t *native = lrmd->private;
int sock;
gnutls_datum_t psk_key = { NULL, 0 };
lrmd_gnutls_global_init();
sock = crm_remote_tcp_connect(native->server, native->port);
if (sock < 0) {
crm_warn("Could not establish remote lrmd connection to %s", native->server);
lrmd_tls_connection_destroy(lrmd);
return -ENOTCONN;
}
native->sock = sock;
if (lrmd_tls_set_key(&psk_key) != 0) {
lrmd_tls_connection_destroy(lrmd);
return -1;
}
gnutls_psk_allocate_client_credentials(&native->psk_cred_c);
gnutls_psk_set_client_credentials(native->psk_cred_c, DEFAULT_REMOTE_USERNAME, &psk_key, GNUTLS_PSK_KEY_RAW);
gnutls_free(psk_key.data);
native->remote->tls_session = create_psk_tls_session(sock, GNUTLS_CLIENT, native->psk_cred_c);
if (crm_initiate_client_tls_handshake(native->remote, LRMD_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
crm_err("Session creation for %s:%d failed", native->server, native->port);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
native->remote->tls_session = NULL;
lrmd_tls_connection_destroy(lrmd);
return -1;
}
crm_info("Remote lrmd client TLS connection established with server %s:%d", native->server,
native->port);
if (fd) {
*fd = sock;
} else {
char name[256] = { 0, };
snprintf(name, 128, "remote-lrmd-%s:%d", native->server, native->port);
native->process_notify = mainloop_add_trigger(G_PRIORITY_HIGH, lrmd_tls_dispatch, lrmd);
native->source =
mainloop_add_fd(name, G_PRIORITY_HIGH, native->sock, lrmd, &lrmd_tls_callbacks);
}
return pcmk_ok;
}
#endif
static int
lrmd_api_connect(lrmd_t * lrmd, const char *name, int *fd)
{
int rc = -ENOTCONN;
lrmd_private_t *native = lrmd->private;
switch (native->type) {
case CRM_CLIENT_IPC:
rc = lrmd_ipc_connect(lrmd, fd);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_connect(lrmd, fd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
if (rc == pcmk_ok) {
rc = lrmd_handshake(lrmd, name);
}
return rc;
}
static int
lrmd_api_connect_async(lrmd_t * lrmd, const char *name, int timeout)
{
int rc = 0;
lrmd_private_t *native = lrmd->private;
if (!native->callback) {
crm_err("Async connect not possible, no lrmd client callback set.");
return -1;
}
switch (native->type) {
case CRM_CLIENT_IPC:
/* fake async connection with ipc. it should be fast
* enough that we gain very little from async */
rc = lrmd_api_connect(lrmd, name, NULL);
if (!rc) {
report_async_connection_result(lrmd, rc);
}
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
rc = lrmd_tls_connect_async(lrmd, timeout);
if (rc) {
/* connection failed, report rc now */
report_async_connection_result(lrmd, rc);
}
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
return rc;
}
static void
lrmd_ipc_disconnect(lrmd_t * lrmd)
{
lrmd_private_t *native = lrmd->private;
if (native->source != NULL) {
/* Attached to mainloop */
mainloop_del_ipc_client(native->source);
native->source = NULL;
native->ipc = NULL;
} else if (native->ipc) {
/* Not attached to mainloop */
crm_ipc_t *ipc = native->ipc;
native->ipc = NULL;
crm_ipc_close(ipc);
crm_ipc_destroy(ipc);
}
}
#ifdef HAVE_GNUTLS_GNUTLS_H
static void
lrmd_tls_disconnect(lrmd_t * lrmd)
{
lrmd_private_t *native = lrmd->private;
if (native->remote->tls_session) {
gnutls_bye(*native->remote->tls_session, GNUTLS_SHUT_RDWR);
gnutls_deinit(*native->remote->tls_session);
gnutls_free(native->remote->tls_session);
native->remote->tls_session = 0;
}
if (native->async_timer) {
g_source_remove(native->async_timer);
native->async_timer = 0;
}
if (native->source != NULL) {
/* Attached to mainloop */
mainloop_del_ipc_client(native->source);
native->source = NULL;
} else if (native->sock) {
close(native->sock);
native->sock = 0;
}
if (native->pending_notify) {
g_list_free_full(native->pending_notify, lrmd_free_xml);
native->pending_notify = NULL;
}
}
#endif
static int
lrmd_api_disconnect(lrmd_t * lrmd)
{
lrmd_private_t *native = lrmd->private;
crm_info("Disconnecting from %d lrmd service", native->type);
switch (native->type) {
case CRM_CLIENT_IPC:
lrmd_ipc_disconnect(lrmd);
break;
#ifdef HAVE_GNUTLS_GNUTLS_H
case CRM_CLIENT_TLS:
lrmd_tls_disconnect(lrmd);
break;
#endif
default:
crm_err("Unsupported connection type: %d", native->type);
}
free(native->token);
native->token = NULL;
free(native->peer_version);
native->peer_version = NULL;
return 0;
}
static int
lrmd_api_register_rsc(lrmd_t * lrmd,
const char *rsc_id,
const char *class,
const char *provider, const char *type, enum lrmd_call_options options)
{
int rc = pcmk_ok;
xmlNode *data = NULL;
if (!class || !type || !rsc_id) {
return -EINVAL;
}
if (safe_str_eq(class, "ocf") && !provider) {
return -EINVAL;
}
data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
crm_xml_add(data, F_LRMD_CLASS, class);
crm_xml_add(data, F_LRMD_PROVIDER, provider);
crm_xml_add(data, F_LRMD_TYPE, type);
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_REG, data, NULL, 0, options, TRUE);
free_xml(data);
return rc;
}
static int
lrmd_api_unregister_rsc(lrmd_t * lrmd, const char *rsc_id, enum lrmd_call_options options)
{
int rc = pcmk_ok;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_UNREG, data, NULL, 0, options, TRUE);
free_xml(data);
return rc;
}
lrmd_rsc_info_t *
lrmd_copy_rsc_info(lrmd_rsc_info_t * rsc_info)
{
lrmd_rsc_info_t *copy = NULL;
copy = calloc(1, sizeof(lrmd_rsc_info_t));
copy->id = strdup(rsc_info->id);
copy->type = strdup(rsc_info->type);
copy->class = strdup(rsc_info->class);
if (rsc_info->provider) {
copy->provider = strdup(rsc_info->provider);
}
return copy;
}
void
lrmd_free_rsc_info(lrmd_rsc_info_t * rsc_info)
{
if (!rsc_info) {
return;
}
free(rsc_info->id);
free(rsc_info->type);
free(rsc_info->class);
free(rsc_info->provider);
free(rsc_info);
}
static lrmd_rsc_info_t *
lrmd_api_get_rsc_info(lrmd_t * lrmd, const char *rsc_id, enum lrmd_call_options options)
{
lrmd_rsc_info_t *rsc_info = NULL;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
xmlNode *output = NULL;
const char *class = NULL;
const char *provider = NULL;
const char *type = NULL;
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
lrmd_send_command(lrmd, LRMD_OP_RSC_INFO, data, &output, 0, options, TRUE);
free_xml(data);
if (!output) {
return NULL;
}
class = crm_element_value(output, F_LRMD_CLASS);
provider = crm_element_value(output, F_LRMD_PROVIDER);
type = crm_element_value(output, F_LRMD_TYPE);
if (!class || !type) {
free_xml(output);
return NULL;
} else if (safe_str_eq(class, "ocf") && !provider) {
free_xml(output);
return NULL;
}
rsc_info = calloc(1, sizeof(lrmd_rsc_info_t));
rsc_info->id = strdup(rsc_id);
rsc_info->class = strdup(class);
if (provider) {
rsc_info->provider = strdup(provider);
}
rsc_info->type = strdup(type);
free_xml(output);
return rsc_info;
}
static void
lrmd_api_set_callback(lrmd_t * lrmd, lrmd_event_callback callback)
{
lrmd_private_t *native = lrmd->private;
native->callback = callback;
}
void
lrmd_internal_set_proxy_callback(lrmd_t * lrmd, void *userdata, void (*callback)(lrmd_t *lrmd, void *userdata, xmlNode *msg))
{
lrmd_private_t *native = lrmd->private;
native->proxy_callback = callback;
native->proxy_callback_userdata = userdata;
}
void
lrmd_internal_proxy_dispatch(lrmd_t *lrmd, xmlNode *msg)
{
lrmd_private_t *native = lrmd->private;
if (native->proxy_callback) {
crm_log_xml_trace(msg, "PROXY_INBOUND");
native->proxy_callback(lrmd, native->proxy_callback_userdata, msg);
}
}
int
lrmd_internal_proxy_send(lrmd_t * lrmd, xmlNode *msg)
{
if (lrmd == NULL) {
return -ENOTCONN;
}
crm_xml_add(msg, F_LRMD_OPERATION, CRM_OP_IPC_FWD);
crm_log_xml_trace(msg, "PROXY_OUTBOUND");
return lrmd_send_xml_no_reply(lrmd, msg);
}
static int
stonith_get_metadata(const char *provider, const char *type, char **output)
{
int rc = pcmk_ok;
stonith_t *stonith_api = stonith_api_new();
if(stonith_api) {
stonith_api->cmds->metadata(stonith_api, st_opt_sync_call, type, provider, output, 0);
stonith_api->cmds->free(stonith_api);
}
if (*output == NULL) {
rc = -EIO;
}
return rc;
}
#define lsb_metadata_template \
"\n" \
"\n" \
"\n" \
" 1.0\n" \
" \n" \
" %s\n" \
" \n" \
" %s\n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" \n" \
" %s\n" \
" %s\n" \
" %s\n" \
" %s\n" \
" %s\n" \
" %s\n" \
" %s\n" \
" \n" \
"\n"
#define LSB_INITSCRIPT_INFOBEGIN_TAG "### BEGIN INIT INFO"
#define LSB_INITSCRIPT_INFOEND_TAG "### END INIT INFO"
#define PROVIDES "# Provides:"
#define REQ_START "# Required-Start:"
#define REQ_STOP "# Required-Stop:"
#define SHLD_START "# Should-Start:"
#define SHLD_STOP "# Should-Stop:"
#define DFLT_START "# Default-Start:"
#define DFLT_STOP "# Default-Stop:"
#define SHORT_DSCR "# Short-Description:"
#define DESCRIPTION "# Description:"
#define lsb_meta_helper_free_value(m) \
do { \
if ((m) != NULL) { \
xmlFree(m); \
(m) = NULL; \
} \
} while(0)
/*
* \internal
* \brief Grab an LSB header value
*
* \param[in] line Line read from LSB init script
* \param[in/out] value If not set, will be set to XML-safe copy of value
* \param[in] prefix Set value if line starts with this pattern
*
* \return TRUE if value was set, FALSE otherwise
*/
static inline gboolean
lsb_meta_helper_get_value(const char *line, char **value, const char *prefix)
{
if (!*value && !strncasecmp(line, prefix, strlen(prefix))) {
*value = (char *)xmlEncodeEntitiesReentrant(NULL, BAD_CAST line+strlen(prefix));
return TRUE;
}
return FALSE;
}
static int
lsb_get_metadata(const char *type, char **output)
{
char ra_pathname[PATH_MAX] = { 0, };
FILE *fp;
char buffer[1024];
char *provides = NULL;
char *req_start = NULL;
char *req_stop = NULL;
char *shld_start = NULL;
char *shld_stop = NULL;
char *dflt_start = NULL;
char *dflt_stop = NULL;
char *s_dscrpt = NULL;
char *xml_l_dscrpt = NULL;
int offset = 0;
int max = 2048;
char description[max];
if(type[0] == '/') {
snprintf(ra_pathname, sizeof(ra_pathname), "%s", type);
} else {
snprintf(ra_pathname, sizeof(ra_pathname), "%s/%s", LSB_ROOT_DIR, type);
}
crm_trace("Looking into %s", ra_pathname);
if (!(fp = fopen(ra_pathname, "r"))) {
return -errno;
}
/* Enter into the lsb-compliant comment block */
while (fgets(buffer, sizeof(buffer), fp)) {
/* Now suppose each of the following eight arguments contain only one line */
if (lsb_meta_helper_get_value(buffer, &provides, PROVIDES)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &req_start, REQ_START)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &req_stop, REQ_STOP)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &shld_start, SHLD_START)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &shld_stop, SHLD_STOP)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &dflt_start, DFLT_START)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &dflt_stop, DFLT_STOP)) {
continue;
}
if (lsb_meta_helper_get_value(buffer, &s_dscrpt, SHORT_DSCR)) {
continue;
}
/* Long description may cross multiple lines */
if (offset == 0 && (0 == strncasecmp(buffer, DESCRIPTION, strlen(DESCRIPTION)))) {
/* Between # and keyword, more than one space, or a tab
* character, indicates the continuation line.
*
* Extracted from LSB init script standard
*/
while (fgets(buffer, sizeof(buffer), fp)) {
if (!strncmp(buffer, "# ", 3) || !strncmp(buffer, "#\t", 2)) {
buffer[0] = ' ';
offset += snprintf(description+offset, max-offset, "%s", buffer);
} else {
fputs(buffer, fp);
break; /* Long description ends */
}
}
continue;
}
if (xml_l_dscrpt == NULL && offset > 0) {
xml_l_dscrpt = (char *)xmlEncodeEntitiesReentrant(NULL, BAD_CAST(description));
}
if (!strncasecmp(buffer, LSB_INITSCRIPT_INFOEND_TAG, strlen(LSB_INITSCRIPT_INFOEND_TAG))) {
/* Get to the out border of LSB comment block */
break;
}
if (buffer[0] != '#') {
break; /* Out of comment block in the beginning */
}
}
fclose(fp);
*output = crm_strdup_printf(lsb_metadata_template, type,
(xml_l_dscrpt == NULL) ? type : xml_l_dscrpt,
(s_dscrpt == NULL) ? type : s_dscrpt, (provides == NULL) ? "" : provides,
(req_start == NULL) ? "" : req_start, (req_stop == NULL) ? "" : req_stop,
(shld_start == NULL) ? "" : shld_start, (shld_stop == NULL) ? "" : shld_stop,
(dflt_start == NULL) ? "" : dflt_start, (dflt_stop == NULL) ? "" : dflt_stop);
lsb_meta_helper_free_value(xml_l_dscrpt);
lsb_meta_helper_free_value(s_dscrpt);
lsb_meta_helper_free_value(provides);
lsb_meta_helper_free_value(req_start);
lsb_meta_helper_free_value(req_stop);
lsb_meta_helper_free_value(shld_start);
lsb_meta_helper_free_value(shld_stop);
lsb_meta_helper_free_value(dflt_start);
lsb_meta_helper_free_value(dflt_stop);
- crm_trace("Created fake metadata: %zd", strlen(*output));
+ crm_trace("Created fake metadata: %llu",
+ (unsigned long long) strlen(*output));
return pcmk_ok;
}
#if SUPPORT_NAGIOS
static int
nagios_get_metadata(const char *type, char **output)
{
int rc = pcmk_ok;
FILE *file_strm = NULL;
int start = 0, length = 0, read_len = 0;
char *metadata_file = NULL;
int len = 36;
len += strlen(NAGIOS_METADATA_DIR);
len += strlen(type);
metadata_file = calloc(1, len);
CRM_CHECK(metadata_file != NULL, return -ENOMEM);
sprintf(metadata_file, "%s/%s.xml", NAGIOS_METADATA_DIR, type);
file_strm = fopen(metadata_file, "r");
if (file_strm == NULL) {
crm_err("Metadata file %s does not exist", metadata_file);
free(metadata_file);
return -EIO;
}
/* see how big the file is */
start = ftell(file_strm);
fseek(file_strm, 0L, SEEK_END);
length = ftell(file_strm);
fseek(file_strm, 0L, start);
CRM_ASSERT(length >= 0);
CRM_ASSERT(start == ftell(file_strm));
if (length <= 0) {
crm_info("%s was not valid", metadata_file);
free(*output);
*output = NULL;
rc = -EIO;
} else {
crm_trace("Reading %d bytes from file", length);
*output = calloc(1, (length + 1));
read_len = fread(*output, 1, length, file_strm);
if (read_len != length) {
crm_err("Calculated and read bytes differ: %d vs. %d", length, read_len);
free(*output);
*output = NULL;
rc = -EIO;
}
}
fclose(file_strm);
free(metadata_file);
return rc;
}
#endif
#if SUPPORT_HEARTBEAT
/* strictly speaking, support for class=heartbeat style scripts
* does not require "heartbeat support" to be enabled.
* But since those scripts are part of the "heartbeat" package usually,
* and are very unlikely to be present in any other deployment,
* I leave it inside this ifdef.
*
* Yes, I know, these are legacy and should die,
* or at least be rewritten to be a proper OCF style agent.
* But they exist, and custom scripts following these rules do, too.
*
* Taken from the old "glue" lrmd, see
* http://hg.linux-ha.org/glue/file/0a7add1d9996/lib/plugins/lrm/raexechb.c#l49
* http://hg.linux-ha.org/glue/file/0a7add1d9996/lib/plugins/lrm/raexechb.c#l393
*/
static const char hb_metadata_template[] =
"\n"
"\n"
"\n"
"1.0\n"
"\n"
"%s"
"\n"
"%s\n"
"\n"
"\n"
"\n"
"This argument will be passed as the first argument to the "
"heartbeat resource agent (assuming it supports one)\n"
"\n"
"argv[1]\n"
"\n"
"\n"
"\n"
"\n"
"This argument will be passed as the second argument to the "
"heartbeat resource agent (assuming it supports one)\n"
"\n"
"argv[2]\n"
"\n"
"\n"
"\n"
"\n"
"This argument will be passed as the third argument to the "
"heartbeat resource agent (assuming it supports one)\n"
"\n"
"argv[3]\n"
"\n"
"\n"
"\n"
"\n"
"This argument will be passed as the fourth argument to the "
"heartbeat resource agent (assuming it supports one)\n"
"\n"
"argv[4]\n"
"\n"
"\n"
"\n"
"\n"
"This argument will be passed as the fifth argument to the "
"heartbeat resource agent (assuming it supports one)\n"
"\n"
"argv[5]\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n"
"\n";
static int
heartbeat_get_metadata(const char *type, char **output)
{
*output = crm_strdup_printf(hb_metadata_template, type, type, type);
- crm_trace("Created fake metadata: %zd", strlen(*output));
+ crm_trace("Created fake metadata: %llu",
+ (unsigned long long) strlen(*output));
return pcmk_ok;
}
#endif
static int
generic_get_metadata(const char *standard, const char *provider, const char *type, char **output)
{
svc_action_t *action;
action = resources_action_create(type, standard, provider, type,
"meta-data", 0, 30000, NULL, 0);
if (action == NULL) {
crm_err("Unable to retrieve meta-data for %s:%s:%s", standard, provider, type);
services_action_free(action);
return -EINVAL;
}
if (!(services_action_sync(action))) {
crm_err("Failed to retrieve meta-data for %s:%s:%s", standard, provider, type);
services_action_free(action);
return -EIO;
}
if (!action->stdout_data) {
crm_err("Failed to receive meta-data for %s:%s:%s", standard, provider, type);
services_action_free(action);
return -EIO;
}
*output = strdup(action->stdout_data);
services_action_free(action);
return pcmk_ok;
}
static int
lrmd_api_get_metadata(lrmd_t * lrmd,
const char *class,
const char *provider,
const char *type, char **output, enum lrmd_call_options options)
{
if (!class || !type) {
return -EINVAL;
}
if (safe_str_eq(class, "service")) {
class = resources_find_service_class(type);
}
if (safe_str_eq(class, "stonith")) {
return stonith_get_metadata(provider, type, output);
} else if (safe_str_eq(class, "lsb")) {
return lsb_get_metadata(type, output);
#if SUPPORT_NAGIOS
} else if (safe_str_eq(class, "nagios")) {
return nagios_get_metadata(type, output);
#endif
#if SUPPORT_HEARTBEAT
} else if (safe_str_eq(class, "heartbeat")) {
return heartbeat_get_metadata(type, output);
#endif
}
return generic_get_metadata(class, provider, type, output);
}
static int
lrmd_api_exec(lrmd_t * lrmd, const char *rsc_id, const char *action, const char *userdata, int interval, /* ms */
int timeout, /* ms */
int start_delay, /* ms */
enum lrmd_call_options options, lrmd_key_value_t * params)
{
int rc = pcmk_ok;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
xmlNode *args = create_xml_node(data, XML_TAG_ATTRS);
lrmd_key_value_t *tmp = NULL;
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
crm_xml_add(data, F_LRMD_RSC_ACTION, action);
crm_xml_add(data, F_LRMD_RSC_USERDATA_STR, userdata);
crm_xml_add_int(data, F_LRMD_RSC_INTERVAL, interval);
crm_xml_add_int(data, F_LRMD_TIMEOUT, timeout);
crm_xml_add_int(data, F_LRMD_RSC_START_DELAY, start_delay);
for (tmp = params; tmp; tmp = tmp->next) {
hash2smartfield((gpointer) tmp->key, (gpointer) tmp->value, args);
}
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_EXEC, data, NULL, timeout, options, TRUE);
free_xml(data);
lrmd_key_value_freeall(params);
return rc;
}
static int
lrmd_api_cancel(lrmd_t * lrmd, const char *rsc_id, const char *action, int interval)
{
int rc = pcmk_ok;
xmlNode *data = create_xml_node(NULL, F_LRMD_RSC);
crm_xml_add(data, F_LRMD_ORIGIN, __FUNCTION__);
crm_xml_add(data, F_LRMD_RSC_ACTION, action);
crm_xml_add(data, F_LRMD_RSC_ID, rsc_id);
crm_xml_add_int(data, F_LRMD_RSC_INTERVAL, interval);
rc = lrmd_send_command(lrmd, LRMD_OP_RSC_CANCEL, data, NULL, 0, 0, TRUE);
free_xml(data);
return rc;
}
static int
list_stonith_agents(lrmd_list_t ** resources)
{
int rc = 0;
stonith_t *stonith_api = stonith_api_new();
stonith_key_value_t *stonith_resources = NULL;
stonith_key_value_t *dIter = NULL;
if(stonith_api) {
stonith_api->cmds->list_agents(stonith_api, st_opt_sync_call, NULL, &stonith_resources, 0);
stonith_api->cmds->free(stonith_api);
}
for (dIter = stonith_resources; dIter; dIter = dIter->next) {
rc++;
if (resources) {
*resources = lrmd_list_add(*resources, dIter->value);
}
}
stonith_key_value_freeall(stonith_resources, 1, 0);
return rc;
}
static int
lrmd_api_list_agents(lrmd_t * lrmd, lrmd_list_t ** resources, const char *class,
const char *provider)
{
int rc = 0;
if (safe_str_eq(class, "stonith")) {
rc += list_stonith_agents(resources);
} else {
GListPtr gIter = NULL;
GList *agents = resources_list_agents(class, provider);
for (gIter = agents; gIter != NULL; gIter = gIter->next) {
*resources = lrmd_list_add(*resources, (const char *)gIter->data);
rc++;
}
g_list_free_full(agents, free);
if (!class) {
rc += list_stonith_agents(resources);
}
}
if (rc == 0) {
crm_notice("No agents found for class %s", class);
rc = -EPROTONOSUPPORT;
}
return rc;
}
static int
does_provider_have_agent(const char *agent, const char *provider, const char *class)
{
int found = 0;
GList *agents = NULL;
GListPtr gIter2 = NULL;
agents = resources_list_agents(class, provider);
for (gIter2 = agents; gIter2 != NULL; gIter2 = gIter2->next) {
if (safe_str_eq(agent, gIter2->data)) {
found = 1;
}
}
g_list_free_full(agents, free);
return found;
}
static int
lrmd_api_list_ocf_providers(lrmd_t * lrmd, const char *agent, lrmd_list_t ** providers)
{
int rc = pcmk_ok;
char *provider = NULL;
GList *ocf_providers = NULL;
GListPtr gIter = NULL;
ocf_providers = resources_list_providers("ocf");
for (gIter = ocf_providers; gIter != NULL; gIter = gIter->next) {
provider = gIter->data;
if (!agent || does_provider_have_agent(agent, provider, "ocf")) {
*providers = lrmd_list_add(*providers, (const char *)gIter->data);
rc++;
}
}
g_list_free_full(ocf_providers, free);
return rc;
}
static int
lrmd_api_list_standards(lrmd_t * lrmd, lrmd_list_t ** supported)
{
int rc = 0;
GList *standards = NULL;
GListPtr gIter = NULL;
standards = resources_list_standards();
for (gIter = standards; gIter != NULL; gIter = gIter->next) {
*supported = lrmd_list_add(*supported, (const char *)gIter->data);
rc++;
}
if (list_stonith_agents(NULL) > 0) {
*supported = lrmd_list_add(*supported, "stonith");
rc++;
}
g_list_free_full(standards, free);
return rc;
}
lrmd_t *
lrmd_api_new(void)
{
lrmd_t *new_lrmd = NULL;
lrmd_private_t *pvt = NULL;
new_lrmd = calloc(1, sizeof(lrmd_t));
pvt = calloc(1, sizeof(lrmd_private_t));
pvt->remote = calloc(1, sizeof(crm_remote_t));
new_lrmd->cmds = calloc(1, sizeof(lrmd_api_operations_t));
pvt->type = CRM_CLIENT_IPC;
new_lrmd->private = pvt;
new_lrmd->cmds->connect = lrmd_api_connect;
new_lrmd->cmds->connect_async = lrmd_api_connect_async;
new_lrmd->cmds->is_connected = lrmd_api_is_connected;
new_lrmd->cmds->poke_connection = lrmd_api_poke_connection;
new_lrmd->cmds->disconnect = lrmd_api_disconnect;
new_lrmd->cmds->register_rsc = lrmd_api_register_rsc;
new_lrmd->cmds->unregister_rsc = lrmd_api_unregister_rsc;
new_lrmd->cmds->get_rsc_info = lrmd_api_get_rsc_info;
new_lrmd->cmds->set_callback = lrmd_api_set_callback;
new_lrmd->cmds->get_metadata = lrmd_api_get_metadata;
new_lrmd->cmds->exec = lrmd_api_exec;
new_lrmd->cmds->cancel = lrmd_api_cancel;
new_lrmd->cmds->list_agents = lrmd_api_list_agents;
new_lrmd->cmds->list_ocf_providers = lrmd_api_list_ocf_providers;
new_lrmd->cmds->list_standards = lrmd_api_list_standards;
return new_lrmd;
}
lrmd_t *
lrmd_remote_api_new(const char *nodename, const char *server, int port)
{
#ifdef HAVE_GNUTLS_GNUTLS_H
lrmd_t *new_lrmd = lrmd_api_new();
lrmd_private_t *native = new_lrmd->private;
if (!nodename && !server) {
lrmd_api_delete(new_lrmd);
return NULL;
}
native->type = CRM_CLIENT_TLS;
native->remote_nodename = nodename ? strdup(nodename) : strdup(server);
native->server = server ? strdup(server) : strdup(nodename);
native->port = port;
if (native->port == 0) {
const char *remote_port_str = getenv("PCMK_remote_port");
native->port = remote_port_str ? atoi(remote_port_str) : DEFAULT_REMOTE_PORT;
}
return new_lrmd;
#else
crm_err("GNUTLS is not enabled for this build, remote LRMD client can not be created");
return NULL;
#endif
}
void
lrmd_api_delete(lrmd_t * lrmd)
{
if (!lrmd) {
return;
}
lrmd->cmds->disconnect(lrmd); /* no-op if already disconnected */
free(lrmd->cmds);
if (lrmd->private) {
lrmd_private_t *native = lrmd->private;
#ifdef HAVE_GNUTLS_GNUTLS_H
free(native->server);
#endif
free(native->remote_nodename);
free(native->remote);
free(native->token);
free(native->peer_version);
}
free(lrmd->private);
free(lrmd);
}