diff --git a/crmd/lrm.c b/crmd/lrm.c
index 95ab6fb7c2..ef0723b7cf 100644
--- a/crmd/lrm.c
+++ b/crmd/lrm.c
@@ -1,2269 +1,2271 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public
  * License as published by the Free Software Foundation; either
  * version 2 of the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * General Public License for more details.
  *
  * You should have received a copy of the GNU General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 #include <crm_internal.h>
 
 #include <sys/param.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 
 #include <crm/crm.h>
 #include <crm/services.h>
 
 #include <crm/msg_xml.h>
 #include <crm/common/xml.h>
 
 #include <crmd.h>
 #include <crmd_fsa.h>
 #include <crmd_messages.h>
 #include <crmd_callbacks.h>
 #include <crmd_lrm.h>
 
 #define START_DELAY_THRESHOLD 5 * 60 * 1000
 #define MAX_LRM_REG_FAILS 30
 
 struct delete_event_s {
     int rc;
     const char *rsc;
     lrm_state_t *lrm_state;
 };
 
 gboolean process_lrm_event(lrm_state_t * lrm_state, lrmd_event_data_t * op);
 static gboolean is_rsc_active(lrm_state_t * lrm_state, const char *rsc_id);
 static gboolean build_active_RAs(lrm_state_t * lrm_state, xmlNode * rsc_list);
 static gboolean stop_recurring_actions(gpointer key, gpointer value, gpointer user_data);
 static int delete_rsc_status(lrm_state_t * lrm_state, const char *rsc_id, int call_options,
                              const char *user_name);
 
 static lrmd_event_data_t *construct_op(lrm_state_t * lrm_state, xmlNode * rsc_op,
                                        const char *rsc_id, const char *operation);
 static void do_lrm_rsc_op(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, const char *operation,
                           xmlNode * msg, xmlNode * request);
 
 void send_direct_ack(const char *to_host, const char *to_sys,
                      lrmd_rsc_info_t * rsc, lrmd_event_data_t * op, const char *rsc_id);
 
 static gboolean lrm_state_verify_stopped(lrm_state_t * lrm_state, enum crmd_fsa_state cur_state,
                                          int log_level);
 
 static void
 lrm_connection_destroy(void)
 {
     if (is_set(fsa_input_register, R_LRM_CONNECTED)) {
         crm_crit("LRM Connection failed");
         register_fsa_input(C_FSA_INTERNAL, I_ERROR, NULL);
         clear_bit(fsa_input_register, R_LRM_CONNECTED);
 
     } else {
         crm_info("LRM Connection disconnected");
     }
 
 }
 
 static char *
 make_stop_id(const char *rsc, int call_id)
 {
     char *op_id = NULL;
 
     op_id = calloc(1, strlen(rsc) + 34);
     if (op_id != NULL) {
         snprintf(op_id, strlen(rsc) + 34, "%s:%d", rsc, call_id);
     }
     return op_id;
 }
 
 static void
 copy_instance_keys(gpointer key, gpointer value, gpointer user_data)
 {
     if (strstr(key, CRM_META "_") == NULL) {
         g_hash_table_replace(user_data, strdup((const char *)key), strdup((const char *)value));
     }
 }
 
 static void
 copy_meta_keys(gpointer key, gpointer value, gpointer user_data)
 {
     if (strstr(key, CRM_META "_") != NULL) {
         g_hash_table_replace(user_data, strdup((const char *)key), strdup((const char *)value));
     }
 }
 
 static void
 update_history_cache(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, lrmd_event_data_t * op)
 {
     int target_rc = 0;
     rsc_history_t *entry = NULL;
 
     if (op->rsc_deleted) {
         crm_debug("Purged history for '%s' after %s", op->rsc_id, op->op_type);
         delete_rsc_status(lrm_state, op->rsc_id, cib_quorum_override, NULL);
         return;
     }
 
     if (safe_str_eq(op->op_type, RSC_NOTIFY)) {
         return;
     }
 
     crm_debug("Updating history for '%s' with %s op", op->rsc_id, op->op_type);
 
     entry = g_hash_table_lookup(lrm_state->resource_history, op->rsc_id);
     if (entry == NULL && rsc) {
         entry = calloc(1, sizeof(rsc_history_t));
         entry->id = strdup(op->rsc_id);
         g_hash_table_insert(lrm_state->resource_history, entry->id, entry);
 
         entry->rsc.id = entry->id;
         entry->rsc.type = strdup(rsc->type);
         entry->rsc.class = strdup(rsc->class);
         if (rsc->provider) {
             entry->rsc.provider = strdup(rsc->provider);
         } else {
             entry->rsc.provider = NULL;
         }
 
     } else if (entry == NULL) {
         crm_info("Resource %s no longer exists, not updating cache", op->rsc_id);
         return;
     }
 
     entry->last_callid = op->call_id;
     target_rc = rsc_op_expected_rc(op);
     if (op->op_status == PCMK_LRM_OP_CANCELLED) {
         if (op->interval > 0) {
             GList *gIter, *gIterNext;
 
             crm_trace("Removing cancelled recurring op: %s_%s_%d", op->rsc_id, op->op_type,
                       op->interval);
 
             for (gIter = entry->recurring_op_list; gIter != NULL; gIter = gIterNext) {
                 lrmd_event_data_t *existing = gIter->data;
 
                 gIterNext = gIter->next;
 
                 if (crm_str_eq(op->rsc_id, existing->rsc_id, TRUE)
                     && safe_str_eq(op->op_type, existing->op_type)
                     && op->interval == existing->interval) {
                     lrmd_free_event(existing);
                     entry->recurring_op_list = g_list_delete_link(entry->recurring_op_list, gIter);
                 }
             }
             return;
 
         } else {
             crm_trace("Skipping %s_%s_%d rc=%d, status=%d", op->rsc_id, op->op_type, op->interval,
                       op->rc, op->op_status);
         }
 
     } else if (did_rsc_op_fail(op, target_rc)) {
         /* We must store failed monitors here
          * - otherwise the block below will cause them to be forgetten them when a stop happens
          */
         if (entry->failed) {
             lrmd_free_event(entry->failed);
         }
         entry->failed = lrmd_copy_event(op);
 
     } else if (op->interval == 0) {
         if (entry->last) {
             lrmd_free_event(entry->last);
         }
         entry->last = lrmd_copy_event(op);
 
         if (op->params &&
             (safe_str_eq(CRMD_ACTION_START, op->op_type) ||
              safe_str_eq("reload", op->op_type) ||
              safe_str_eq(CRMD_ACTION_STATUS, op->op_type))) {
 
             if (entry->stop_params) {
                 g_hash_table_destroy(entry->stop_params);
             }
             entry->stop_params = g_hash_table_new_full(crm_str_hash,
                                                        g_str_equal, g_hash_destroy_str,
                                                        g_hash_destroy_str);
 
             g_hash_table_foreach(op->params, copy_instance_keys, entry->stop_params);
         }
     }
 
     if (op->interval > 0) {
         GListPtr iter = NULL;
 
         for(iter = entry->recurring_op_list; iter; iter = iter->next) {
             lrmd_event_data_t *o = iter->data;
 
             /* op->rsc_id is implied */
             if(op->interval == o->interval && strcmp(op->op_type, o->op_type) == 0) {
                 crm_trace("Removing existing recurring op entry: %s_%s_%d", op->rsc_id, op->op_type, op->interval);
                 entry->recurring_op_list = g_list_remove(entry->recurring_op_list, o);
                 break;
             }
         }
 
         crm_trace("Adding recurring op: %s_%s_%d", op->rsc_id, op->op_type, op->interval);
         entry->recurring_op_list = g_list_prepend(entry->recurring_op_list, lrmd_copy_event(op));
 
     } else if (entry->recurring_op_list && safe_str_eq(op->op_type, RSC_STATUS) == FALSE) {
         GList *gIter = entry->recurring_op_list;
 
         crm_trace("Dropping %d recurring ops because of: %s_%s_%d",
                   g_list_length(gIter), op->rsc_id, op->op_type, op->interval);
         for (; gIter != NULL; gIter = gIter->next) {
             lrmd_free_event(gIter->data);
         }
         g_list_free(entry->recurring_op_list);
         entry->recurring_op_list = NULL;
     }
 }
 
 void
 lrm_op_callback(lrmd_event_data_t * op)
 {
     const char *nodename = NULL;
     lrm_state_t *lrm_state = NULL;
 
     CRM_CHECK(op != NULL, return);
 
     /* determine the node name for this connection. */
     nodename = op->remote_nodename ? op->remote_nodename : fsa_our_uname;
 
     if (op->type == lrmd_event_disconnect && (safe_str_eq(nodename, fsa_our_uname))) {
         /* if this is the local lrmd ipc connection, set the right bits in the
          * crmd when the connection goes down */
         lrm_connection_destroy();
         return;
     } else if (op->type != lrmd_event_exec_complete) {
         /* we only need to process execution results */
         return;
     }
 
     lrm_state = lrm_state_find(nodename);
     CRM_ASSERT(lrm_state != NULL);
 
     process_lrm_event(lrm_state, op);
 }
 
 /*	 A_LRM_CONNECT	*/
 void
 do_lrm_control(long long action,
                enum crmd_fsa_cause cause,
                enum crmd_fsa_state cur_state,
                enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     /* This only pertains to local lrmd connections.  Remote connections are handled as
      * resources within the pengine.  Connecting and disconnecting from remote lrmd instances
      * handled differently than the local. */
 
     lrm_state_t *lrm_state = NULL;
 
     if(fsa_our_uname == NULL) {
         return; /* Nothing to do */
     }
     lrm_state = lrm_state_find_or_create(fsa_our_uname);
     if (lrm_state == NULL) {
         register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL);
         return;
     }
 
     if (action & A_LRM_DISCONNECT) {
         if (lrm_state_verify_stopped(lrm_state, cur_state, LOG_INFO) == FALSE) {
             if (action == A_LRM_DISCONNECT) {
                 crmd_fsa_stall(FALSE);
                 return;
             }
         }
 
         clear_bit(fsa_input_register, R_LRM_CONNECTED);
         crm_info("Disconnecting from the LRM");
         lrm_state_disconnect(lrm_state);
         lrm_state_reset_tables(lrm_state);
         crm_notice("Disconnected from the LRM");
     }
 
     if (action & A_LRM_CONNECT) {
         int ret = pcmk_ok;
 
         crm_debug("Connecting to the LRM");
         ret = lrm_state_ipc_connect(lrm_state);
 
         if (ret != pcmk_ok) {
             if (lrm_state->num_lrm_register_fails < MAX_LRM_REG_FAILS) {
                 crm_warn("Failed to sign on to the LRM %d"
                          " (%d max) times", lrm_state->num_lrm_register_fails, MAX_LRM_REG_FAILS);
 
                 crm_timer_start(wait_timer);
                 crmd_fsa_stall(FALSE);
                 return;
             }
         }
 
         if (ret != pcmk_ok) {
             crm_err("Failed to sign on to the LRM %d" " (max) times",
                     lrm_state->num_lrm_register_fails);
             register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL);
             return;
         }
 
         set_bit(fsa_input_register, R_LRM_CONNECTED);
         crm_info("LRM connection established");
     }
 
     if (action & ~(A_LRM_CONNECT | A_LRM_DISCONNECT)) {
         crm_err("Unexpected action %s in %s", fsa_action2string(action), __FUNCTION__);
     }
 }
 
 static gboolean
 lrm_state_verify_stopped(lrm_state_t * lrm_state, enum crmd_fsa_state cur_state, int log_level)
 {
     int counter = 0;
     gboolean rc = TRUE;
     const char *when = "lrm disconnect";
 
     GHashTableIter gIter;
     const char *key = NULL;
     rsc_history_t *entry = NULL;
     struct recurring_op_s *pending = NULL;
 
     crm_debug("Checking for active resources before exit");
 
     if (cur_state == S_TERMINATE) {
         log_level = LOG_ERR;
         when = "shutdown";
 
     } else if (is_set(fsa_input_register, R_SHUTDOWN)) {
         when = "shutdown... waiting";
     }
 
     if (lrm_state->pending_ops && lrm_state_is_connected(lrm_state) == TRUE) {
         guint removed = g_hash_table_foreach_remove(
             lrm_state->pending_ops, stop_recurring_actions, lrm_state);
 
         crm_notice("Stopped %u recurring operations at %s (%u ops remaining)",
                    removed, when, g_hash_table_size(lrm_state->pending_ops));
     }
 
     if (lrm_state->pending_ops) {
         g_hash_table_iter_init(&gIter, lrm_state->pending_ops);
         while (g_hash_table_iter_next(&gIter, NULL, (void **)&pending)) {
             /* Ignore recurring actions in the shutdown calculations */
             if (pending->interval == 0) {
                 counter++;
             }
         }
     }
 
     if (counter > 0) {
         do_crm_log(log_level, "%d pending LRM operations at %s", counter, when);
 
         if (cur_state == S_TERMINATE || !is_set(fsa_input_register, R_SENT_RSC_STOP)) {
             g_hash_table_iter_init(&gIter, lrm_state->pending_ops);
             while (g_hash_table_iter_next(&gIter, (gpointer*)&key, (gpointer*)&pending)) {
                 do_crm_log(log_level, "Pending action: %s (%s)", key, pending->op_key);
             }
 
         } else {
             rc = FALSE;
         }
         return rc;
     }
 
     if (lrm_state->resource_history == NULL) {
         return rc;
     }
 
     if (cur_state == S_TERMINATE || is_set(fsa_input_register, R_SHUTDOWN)) {
         /* At this point we're not waiting, we're just shutting down */
         when = "shutdown";
     }
 
     counter = 0;
     g_hash_table_iter_init(&gIter, lrm_state->resource_history);
     while (g_hash_table_iter_next(&gIter, NULL, (gpointer*)&entry)) {
         if (is_rsc_active(lrm_state, entry->id) == FALSE) {
             continue;
         }
 
         counter++;
         crm_trace("Found %s active", entry->id);
         if (lrm_state->pending_ops) {
             GHashTableIter hIter;
 
             g_hash_table_iter_init(&hIter, lrm_state->pending_ops);
             while (g_hash_table_iter_next(&hIter, (gpointer*)&key, (gpointer*)&pending)) {
                 if (crm_str_eq(entry->id, pending->rsc_id, TRUE)) {
                     crm_notice("%sction %s (%s) incomplete at %s",
                                pending->interval == 0 ? "A" : "Recurring a",
                                key, pending->op_key, when);
                 }
             }
         }
     }
 
     if (counter) {
         crm_err("%d resources were active at %s.", counter, when);
     }
 
     return rc;
 }
 
 static char *
 get_rsc_metadata(const char *type, const char *class, const char *provider)
 {
     int rc = 0;
     char *metadata = NULL;
 
     /* Always use a local connection for this operation */
     lrm_state_t *lrm_state = lrm_state_find(fsa_our_uname);
 
     CRM_CHECK(type != NULL, return NULL);
     CRM_CHECK(class != NULL, return NULL);
     CRM_CHECK(lrm_state != NULL, return NULL);
 
     if (provider == NULL) {
         provider = "heartbeat";
     }
 
     crm_trace("Retreiving metadata for %s::%s:%s", type, class, provider);
     rc = lrm_state_get_metadata(lrm_state, class, provider, type, &metadata, 0);
 
     if (metadata == NULL) {
         crm_warn("No metadata found for %s::%s:%s: %s (%d)", type, class, provider, pcmk_strerror(rc), rc);
     }
 
     return metadata;
 }
 
 typedef struct reload_data_s {
     char *key;
     char *metadata;
     time_t last_query;
     gboolean can_reload;
     GListPtr restart_list;
 } reload_data_t;
 
 static void
 g_hash_destroy_reload(gpointer data)
 {
     reload_data_t *reload = data;
 
     free(reload->key);
     free(reload->metadata);
     g_list_free_full(reload->restart_list, free);
     free(reload);
 }
 
 GHashTable *reload_hash = NULL;
 static GListPtr
 get_rsc_restart_list(lrmd_rsc_info_t * rsc, lrmd_event_data_t * op)
 {
     int len = 0;
     char *key = NULL;
     char *copy = NULL;
     const char *value = NULL;
     const char *provider = NULL;
 
     xmlNode *param = NULL;
     xmlNode *params = NULL;
     xmlNode *actions = NULL;
     xmlNode *metadata = NULL;
 
     time_t now = time(NULL);
     reload_data_t *reload = NULL;
 
     if (reload_hash == NULL) {
         reload_hash = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, g_hash_destroy_reload);
     }
 
     provider = rsc->provider;
     if (provider == NULL) {
         provider = "heartbeat";
     }
 
     len = strlen(rsc->type) + strlen(rsc->class) + strlen(provider) + 4;
     key = malloc(len);
     if(key) {
         snprintf(key, len, "%s::%s:%s", rsc->type, rsc->class, provider);
         reload = g_hash_table_lookup(reload_hash, key);
     }
 
     if (reload && ((now - 9) > reload->last_query)
         && safe_str_eq(op->op_type, RSC_START)) {
         reload = NULL;          /* re-query */
     }
 
     if (reload == NULL) {
         xmlNode *action = NULL;
 
         reload = calloc(1, sizeof(reload_data_t));
         g_hash_table_replace(reload_hash, key, reload);
 
         reload->last_query = now;
         reload->key = key;
         key = NULL;
         reload->metadata = get_rsc_metadata(rsc->type, rsc->class, provider);
 
         if(reload->metadata == NULL) {
             goto cleanup;
         }
 
         metadata = string2xml(reload->metadata);
         if (metadata == NULL) {
             crm_err("Metadata for %s::%s:%s is not valid XML",
                     rsc->provider, rsc->class, rsc->type);
             goto cleanup;
         }
 
         actions = find_xml_node(metadata, "actions", TRUE);
 
         for (action = __xml_first_child(actions); action != NULL; action = __xml_next(action)) {
             if (crm_str_eq((const char *)action->name, "action", TRUE)) {
                 value = crm_element_value(action, "name");
                 if (safe_str_eq("reload", value)) {
                     reload->can_reload = TRUE;
                     break;
                 }
             }
         }
 
         if (reload->can_reload == FALSE) {
             goto cleanup;
         }
 
         params = find_xml_node(metadata, "parameters", TRUE);
         for (param = __xml_first_child(params); param != NULL; param = __xml_next(param)) {
             if (crm_str_eq((const char *)param->name, "parameter", TRUE)) {
                 value = crm_element_value(param, "unique");
                 if (crm_is_true(value)) {
                     value = crm_element_value(param, "name");
                     if (value == NULL) {
                         crm_err("%s: NULL param", key);
                         continue;
                     }
                     crm_debug("Attr %s is not reloadable", value);
                     copy = strdup(value);
                     CRM_LOG_ASSERT(copy != NULL);
                     if(copy == NULL) { continue; };
                     reload->restart_list = g_list_append(reload->restart_list, copy);
                 }
             }
         }
     }
 
   cleanup:
     free(key);
     free_xml(metadata);
     return reload->restart_list;
 }
 
 static void
 append_restart_list(lrmd_rsc_info_t * rsc, lrmd_event_data_t * op, xmlNode * update,
                     const char *version)
 {
     int len = 0;
     char *list = NULL;
     char *digest = NULL;
     const char *value = NULL;
     xmlNode *restart = NULL;
     GListPtr restart_list = NULL;
     GListPtr lpc = NULL;
 
     if (op->interval > 0) {
         /* monitors are not reloadable */
         return;
 
     } else if (op->params == NULL) {
         crm_debug("%s has no parameters", ID(update));
         return;
 
     } else if (rsc == NULL) {
         return;
 
     } else if (crm_str_eq(CRMD_ACTION_STOP, op->op_type, TRUE)) {
         /* Stopped resources don't need to be reloaded */
         return;
 
     } else if (compare_version("1.0.8", version) > 0) {
         /* Caller version does not support reloads */
         return;
     }
 
     restart_list = get_rsc_restart_list(rsc, op);
     if (restart_list == NULL) {
         /* Resource does not support reloads */
         return;
     }
 
     restart = create_xml_node(NULL, XML_TAG_PARAMS);
     for (lpc = restart_list; lpc != NULL; lpc = lpc->next) {
         const char *param = (const char *)lpc->data;
 
         int start = len;
 
         CRM_LOG_ASSERT(param != NULL);
         if(param == NULL) {  continue; };
 
         value = g_hash_table_lookup(op->params, param);
         if (value != NULL) {
             crm_xml_add(restart, param, value);
         }
         len += strlen(param) + 2;
         list = realloc_safe(list, len + 1);
         sprintf(list + start, " %s ", param);
     }
 
     digest = calculate_operation_digest(restart, version);
     crm_xml_add(update, XML_LRM_ATTR_OP_RESTART, list);
     crm_xml_add(update, XML_LRM_ATTR_RESTART_DIGEST, digest);
 
     crm_trace("%s: %s, %s", rsc->id, digest, list);
     crm_log_xml_trace(restart, "restart digest source");
 
     free_xml(restart);
     free(digest);
     free(list);
 }
 
 static gboolean
 build_operation_update(xmlNode * parent, lrmd_rsc_info_t * rsc, lrmd_event_data_t * op,
                        const char *src)
 {
     int target_rc = 0;
     xmlNode *xml_op = NULL;
     const char *caller_version = CRM_FEATURE_SET;
 
     if (op == NULL) {
         return FALSE;
 
     } else if (AM_I_DC) {
 
     } else if (fsa_our_dc_version != NULL) {
         caller_version = fsa_our_dc_version;
     } else if (op->params == NULL) {
         caller_version = fsa_our_dc_version;
     } else {
         /* there is a small risk in formerly mixed clusters that
          *   it will be sub-optimal.
          * however with our upgrade policy, the update we send
          *   should still be completely supported anyway
          */
         caller_version = g_hash_table_lookup(op->params, XML_ATTR_CRM_VERSION);
         crm_debug("Falling back to operation originator version: %s", caller_version);
     }
 
     target_rc = rsc_op_expected_rc(op);
     xml_op = create_operation_update(parent, op, caller_version, target_rc, fsa_our_uname, src, LOG_DEBUG);
 
     if (xml_op) {
         append_restart_list(rsc, op, xml_op, caller_version);
     }
     return TRUE;
 }
 
 static gboolean
 is_rsc_active(lrm_state_t * lrm_state, const char *rsc_id)
 {
     rsc_history_t *entry = NULL;
 
     entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
     if (entry == NULL || entry->last == NULL) {
         return FALSE;
     }
 
     crm_trace("Processing %s: %s.%d=%d",
               rsc_id, entry->last->op_type, entry->last->interval, entry->last->rc);
     if (entry->last->rc == PCMK_OCF_OK && safe_str_eq(entry->last->op_type, CRMD_ACTION_STOP)) {
         return FALSE;
 
     } else if (entry->last->rc == PCMK_OCF_OK
                && safe_str_eq(entry->last->op_type, CRMD_ACTION_MIGRATE)) {
         /* a stricter check is too complex...
          * leave that to the PE
          */
         return FALSE;
 
     } else if (entry->last->rc == PCMK_OCF_NOT_RUNNING) {
         return FALSE;
 
     } else if (entry->last->interval == 0 && entry->last->rc == PCMK_OCF_NOT_CONFIGURED) {
         /* Badly configured resources can't be reliably stopped */
         return FALSE;
     }
 
     return TRUE;
 }
 
 static gboolean
 build_active_RAs(lrm_state_t * lrm_state, xmlNode * rsc_list)
 {
     GHashTableIter iter;
     rsc_history_t *entry = NULL;
 
     g_hash_table_iter_init(&iter, lrm_state->resource_history);
     while (g_hash_table_iter_next(&iter, NULL, (void **)&entry)) {
 
         GList *gIter = NULL;
         xmlNode *xml_rsc = create_xml_node(rsc_list, XML_LRM_TAG_RESOURCE);
 
         crm_xml_add(xml_rsc, XML_ATTR_ID, entry->id);
         crm_xml_add(xml_rsc, XML_ATTR_TYPE, entry->rsc.type);
         crm_xml_add(xml_rsc, XML_AGENT_ATTR_CLASS, entry->rsc.class);
         crm_xml_add(xml_rsc, XML_AGENT_ATTR_PROVIDER, entry->rsc.provider);
 
         if (entry->last && entry->last->params) {
             const char *container = g_hash_table_lookup(entry->last->params, CRM_META"_"XML_RSC_ATTR_CONTAINER);
             if (container) {
                 crm_trace("Resource %s is a part of container resource %s", entry->id, container);
                 crm_xml_add(xml_rsc, XML_RSC_ATTR_CONTAINER, container);
             }
         }
         build_operation_update(xml_rsc, &(entry->rsc), entry->failed, __FUNCTION__);
         build_operation_update(xml_rsc, &(entry->rsc), entry->last, __FUNCTION__);
         for (gIter = entry->recurring_op_list; gIter != NULL; gIter = gIter->next) {
             build_operation_update(xml_rsc, &(entry->rsc), gIter->data, __FUNCTION__);
         }
     }
 
     return FALSE;
 }
 
 xmlNode *
 do_lrm_query_internal(lrm_state_t * lrm_state, gboolean is_replace)
 {
     xmlNode *xml_state = NULL;
     xmlNode *xml_data = NULL;
     xmlNode *rsc_list = NULL;
     const char *uuid = NULL;
 
     if (safe_str_eq(lrm_state->node_name, fsa_our_uname)) {
         crm_node_t *peer = crm_get_peer(0, lrm_state->node_name);
         xml_state = do_update_node_cib(peer, node_update_cluster|node_update_peer, NULL, __FUNCTION__);
         /* The next two lines shouldn't be necessary for newer DCs */
         crm_xml_add(xml_state, XML_NODE_JOIN_STATE, CRMD_JOINSTATE_MEMBER);
         crm_xml_add(xml_state, XML_NODE_EXPECTED, CRMD_JOINSTATE_MEMBER);
         uuid = fsa_our_uuid;
 
     } else {
         xml_state = create_xml_node(NULL, XML_CIB_TAG_STATE);
         crm_xml_add(xml_state, XML_NODE_IS_REMOTE, "true");
         crm_xml_add(xml_state, XML_ATTR_ID, lrm_state->node_name);
         crm_xml_add(xml_state, XML_ATTR_UNAME, lrm_state->node_name);
         uuid = lrm_state->node_name;
     }
 
     xml_data = create_xml_node(xml_state, XML_CIB_TAG_LRM);
     crm_xml_add(xml_data, XML_ATTR_ID, uuid);
     rsc_list = create_xml_node(xml_data, XML_LRM_TAG_RESOURCES);
 
     /* Build a list of active (not always running) resources */
     build_active_RAs(lrm_state, rsc_list);
 
     crm_log_xml_trace(xml_state, "Current state of the LRM");
 
     return xml_state;
 }
 
 xmlNode *
 do_lrm_query(gboolean is_replace, const char *node_name)
 {
     lrm_state_t *lrm_state = lrm_state_find(node_name);
 
     if (!lrm_state) {
         crm_err("Could not query lrm state for lrmd node %s", node_name);
         return NULL;
     }
     return do_lrm_query_internal(lrm_state, is_replace);
 }
 
 static void
 notify_deleted(lrm_state_t * lrm_state, ha_msg_input_t * input, const char *rsc_id, int rc)
 {
     lrmd_event_data_t *op = NULL;
     const char *from_sys = crm_element_value(input->msg, F_CRM_SYS_FROM);
     const char *from_host = crm_element_value(input->msg, F_CRM_HOST_FROM);
 
     crm_info("Notifying %s on %s that %s was%s deleted",
              from_sys, from_host, rsc_id, rc == pcmk_ok ? "" : " not");
 
     op = construct_op(lrm_state, input->xml, rsc_id, CRMD_ACTION_DELETE);
     CRM_ASSERT(op != NULL);
 
     if (rc == pcmk_ok) {
         op->op_status = PCMK_LRM_OP_DONE;
         op->rc = PCMK_OCF_OK;
     } else {
         op->op_status = PCMK_LRM_OP_ERROR;
         op->rc = PCMK_OCF_UNKNOWN_ERROR;
     }
 
     send_direct_ack(from_host, from_sys, NULL, op, rsc_id);
     lrmd_free_event(op);
 
     if (safe_str_neq(from_sys, CRM_SYSTEM_TENGINE)) {
         /* this isn't expected - trigger a new transition */
         time_t now = time(NULL);
         char *now_s = crm_itoa(now);
 
         crm_debug("Triggering a refresh after %s deleted %s from the LRM", from_sys, rsc_id);
 
         update_attr_delegate(fsa_cib_conn, cib_none, XML_CIB_TAG_CRMCONFIG, NULL, NULL, NULL, NULL,
                              "last-lrm-refresh", now_s, FALSE, NULL, NULL);
 
         free(now_s);
     }
 }
 
 static gboolean
 lrm_remove_deleted_rsc(gpointer key, gpointer value, gpointer user_data)
 {
     struct delete_event_s *event = user_data;
     struct pending_deletion_op_s *op = value;
 
     if (crm_str_eq(event->rsc, op->rsc, TRUE)) {
         notify_deleted(event->lrm_state, op->input, event->rsc, event->rc);
         return TRUE;
     }
     return FALSE;
 }
 
 static gboolean
 lrm_remove_deleted_op(gpointer key, gpointer value, gpointer user_data)
 {
     const char *rsc = user_data;
     struct recurring_op_s *pending = value;
 
     if (crm_str_eq(rsc, pending->rsc_id, TRUE)) {
         crm_info("Removing op %s:%d for deleted resource %s",
                  pending->op_key, pending->call_id, rsc);
         return TRUE;
     }
     return FALSE;
 }
 
 /*
  * Remove the rsc from the CIB
  *
  * Avoids refreshing the entire LRM section of this host
  */
 #define rsc_template "//"XML_CIB_TAG_STATE"[@uname='%s']//"XML_LRM_TAG_RESOURCE"[@id='%s']"
 
 static int
 delete_rsc_status(lrm_state_t * lrm_state, const char *rsc_id, int call_options,
                   const char *user_name)
 {
     char *rsc_xpath = NULL;
     int max = 0;
     int rc = pcmk_ok;
 
     CRM_CHECK(rsc_id != NULL, return -ENXIO);
 
     max = strlen(rsc_template) + strlen(rsc_id) + strlen(lrm_state->node_name) + 1;
     rsc_xpath = calloc(1, max);
     snprintf(rsc_xpath, max, rsc_template, lrm_state->node_name, rsc_id);
 
     rc = cib_internal_op(fsa_cib_conn, CIB_OP_DELETE, NULL, rsc_xpath,
                          NULL, NULL, call_options | cib_xpath, user_name);
 
     free(rsc_xpath);
     return rc;
 }
 
 static void
 delete_rsc_entry(lrm_state_t * lrm_state, ha_msg_input_t * input, const char *rsc_id,
                  GHashTableIter * rsc_gIter, int rc, const char *user_name)
 {
     struct delete_event_s event;
 
     CRM_CHECK(rsc_id != NULL, return);
 
     if (rc == pcmk_ok) {
         char *rsc_id_copy = strdup(rsc_id);
 
         if (rsc_gIter)
             g_hash_table_iter_remove(rsc_gIter);
         else
             g_hash_table_remove(lrm_state->resource_history, rsc_id_copy);
         crm_debug("sync: Sending delete op for %s", rsc_id_copy);
         delete_rsc_status(lrm_state, rsc_id_copy, cib_quorum_override, user_name);
 
         g_hash_table_foreach_remove(lrm_state->pending_ops, lrm_remove_deleted_op, rsc_id_copy);
         free(rsc_id_copy);
     }
 
     if (input) {
         notify_deleted(lrm_state, input, rsc_id, rc);
     }
 
     event.rc = rc;
     event.rsc = rsc_id;
     event.lrm_state = lrm_state;
     g_hash_table_foreach_remove(lrm_state->deletion_ops, lrm_remove_deleted_rsc, &event);
 }
 
 /*
  * Remove the op from the CIB
  *
  * Avoids refreshing the entire LRM section of this host
  */
 
 #define op_template "//"XML_CIB_TAG_STATE"[@uname='%s']//"XML_LRM_TAG_RESOURCE"[@id='%s']/"XML_LRM_TAG_RSC_OP"[@id='%s']"
 #define op_call_template "//"XML_CIB_TAG_STATE"[@uname='%s']//"XML_LRM_TAG_RESOURCE"[@id='%s']/"XML_LRM_TAG_RSC_OP"[@id='%s' and @"XML_LRM_ATTR_CALLID"='%d']"
 
 static void
 delete_op_entry(lrm_state_t * lrm_state, lrmd_event_data_t * op, const char *rsc_id,
                 const char *key, int call_id)
 {
     xmlNode *xml_top = NULL;
 
     if (op != NULL) {
         xml_top = create_xml_node(NULL, XML_LRM_TAG_RSC_OP);
         crm_xml_add_int(xml_top, XML_LRM_ATTR_CALLID, op->call_id);
         crm_xml_add(xml_top, XML_ATTR_TRANSITION_KEY, op->user_data);
 
         if (op->interval > 0) {
             char *op_id = generate_op_key(op->rsc_id, op->op_type, op->interval);
 
             /* Avoid deleting last_failure too (if it was a result of this recurring op failing) */
             crm_xml_add(xml_top, XML_ATTR_ID, op_id);
             free(op_id);
         }
 
         crm_debug("async: Sending delete op for %s_%s_%d (call=%d)",
                   op->rsc_id, op->op_type, op->interval, op->call_id);
 
         fsa_cib_conn->cmds->delete(fsa_cib_conn, XML_CIB_TAG_STATUS, xml_top, cib_quorum_override);
 
     } else if (rsc_id != NULL && key != NULL) {
         int max = 0;
         char *op_xpath = NULL;
 
         if (call_id > 0) {
             max =
                 strlen(op_call_template) + strlen(rsc_id) + strlen(lrm_state->node_name) +
                 strlen(key) + 10;
             op_xpath = calloc(1, max);
             snprintf(op_xpath, max, op_call_template, lrm_state->node_name, rsc_id, key, call_id);
 
         } else {
             max =
                 strlen(op_template) + strlen(rsc_id) + strlen(lrm_state->node_name) + strlen(key) +
                 1;
             op_xpath = calloc(1, max);
             snprintf(op_xpath, max, op_template, lrm_state->node_name, rsc_id, key);
         }
 
         crm_debug("sync: Sending delete op for %s (call=%d)", rsc_id, call_id);
         fsa_cib_conn->cmds->delete(fsa_cib_conn, op_xpath, NULL, cib_quorum_override | cib_xpath);
 
         free(op_xpath);
 
     } else {
         crm_err("Not enough information to delete op entry: rsc=%p key=%p", rsc_id, key);
         return;
     }
 
     crm_log_xml_trace(xml_top, "op:cancel");
     free_xml(xml_top);
 }
 
 void
 lrm_clear_last_failure(const char *rsc_id, const char *node_name)
 {
     char *attr = NULL;
     GHashTableIter iter;
     GList *lrm_state_list = lrm_state_get_list();
     GList *state_entry;
     rsc_history_t *entry = NULL;
 
     attr = generate_op_key(rsc_id, "last_failure", 0);
 
     /* This clears last failure for every lrm state that has this rsc.*/
     for (state_entry = lrm_state_list; state_entry != NULL; state_entry = state_entry->next) {
         lrm_state_t *lrm_state = state_entry->data;
 
         if (node_name != NULL) {
             if (strcmp(node_name, lrm_state->node_name) != 0) {
                 /* filter by node_name if node_name is present */
                 continue;
             }
         }
 
         delete_op_entry(lrm_state, NULL, rsc_id, attr, 0);
 
         if (!lrm_state->resource_history) {
             continue;
         }
 
         g_hash_table_iter_init(&iter, lrm_state->resource_history);
         while (g_hash_table_iter_next(&iter, NULL, (void **)&entry)) {
             if (crm_str_eq(rsc_id, entry->id, TRUE)) {
                 lrmd_free_event(entry->failed);
                 entry->failed = NULL;
             }
         }
     }
     free(attr);
     g_list_free(lrm_state_list);
 }
 
 /* Returns: gboolean - cancellation is in progress */
 static gboolean
 cancel_op(lrm_state_t * lrm_state, const char *rsc_id, const char *key, int op, gboolean remove)
 {
     int rc = pcmk_ok;
     char *local_key = NULL;
     struct recurring_op_s *pending = NULL;
 
     CRM_CHECK(op != 0, return FALSE);
     CRM_CHECK(rsc_id != NULL, return FALSE);
     if (key == NULL) {
         local_key = make_stop_id(rsc_id, op);
         key = local_key;
     }
     pending = g_hash_table_lookup(lrm_state->pending_ops, key);
 
     if (pending) {
         if (remove && pending->remove == FALSE) {
             pending->remove = TRUE;
             crm_debug("Scheduling %s for removal", key);
         }
 
         if (pending->cancelled) {
             crm_debug("Operation %s already cancelled", key);
             free(local_key);
             return FALSE;
         }
 
         pending->cancelled = TRUE;
 
     } else {
         crm_info("No pending op found for %s", key);
         free(local_key);
         return FALSE;
     }
 
     crm_debug("Cancelling op %d for %s (%s)", op, rsc_id, key);
     rc = lrm_state_cancel(lrm_state, pending->rsc_id, pending->op_type, pending->interval);
     if (rc == pcmk_ok) {
         crm_debug("Op %d for %s (%s): cancelled", op, rsc_id, key);
         free(local_key);
         return TRUE;
     }
 
     crm_debug("Op %d for %s (%s): Nothing to cancel", op, rsc_id, key);
     /* The caller needs to make sure the entry is
      * removed from the pending_ops list
      *
      * Usually by returning TRUE inside the worker function
      * supplied to g_hash_table_foreach_remove()
      *
      * Not removing the entry from pending_ops will block
      * the node from shutting down
      */
     free(local_key);
     return FALSE;
 }
 
 struct cancel_data {
     gboolean done;
     gboolean remove;
     const char *key;
     lrmd_rsc_info_t *rsc;
     lrm_state_t *lrm_state;
 };
 
 static gboolean
 cancel_action_by_key(gpointer key, gpointer value, gpointer user_data)
 {
     gboolean remove = FALSE;
     struct cancel_data *data = user_data;
     struct recurring_op_s *op = (struct recurring_op_s *)value;
 
     if (crm_str_eq(op->op_key, data->key, TRUE)) {
         data->done = TRUE;
         remove = !cancel_op(data->lrm_state, data->rsc->id, key, op->call_id, data->remove);
     }
     return remove;
 }
 
 static gboolean
 cancel_op_key(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, const char *key, gboolean remove)
 {
     guint removed = 0;
     struct cancel_data data;
 
     CRM_CHECK(rsc != NULL, return FALSE);
     CRM_CHECK(key != NULL, return FALSE);
 
     data.key = key;
     data.rsc = rsc;
     data.done = FALSE;
     data.remove = remove;
     data.lrm_state = lrm_state;
 
     removed = g_hash_table_foreach_remove(lrm_state->pending_ops, cancel_action_by_key, &data);
     crm_trace("Removed %u op cache entries, new size: %u",
               removed, g_hash_table_size(lrm_state->pending_ops));
     return data.done;
 }
 
 static lrmd_rsc_info_t *
 get_lrm_resource(lrm_state_t * lrm_state, xmlNode * resource, xmlNode * op_msg, gboolean do_create)
 {
     lrmd_rsc_info_t *rsc = NULL;
     const char *id = ID(resource);
     const char *type = crm_element_value(resource, XML_ATTR_TYPE);
     const char *class = crm_element_value(resource, XML_AGENT_ATTR_CLASS);
     const char *provider = crm_element_value(resource, XML_AGENT_ATTR_PROVIDER);
     const char *long_id = crm_element_value(resource, XML_ATTR_ID_LONG);
 
     crm_trace("Retrieving %s from the LRM.", id);
     CRM_CHECK(id != NULL, return NULL);
 
     rsc = lrm_state_get_rsc_info(lrm_state, id, 0);
 
     if (!rsc && long_id) {
         rsc = lrm_state_get_rsc_info(lrm_state, long_id, 0);
     }
 
     if (!rsc && do_create) {
         CRM_CHECK(class != NULL, return NULL);
         CRM_CHECK(type != NULL, return NULL);
 
         crm_trace("Adding rsc %s before operation", id);
 
         lrm_state_register_rsc(lrm_state, id, class, provider, type, lrmd_opt_drop_recurring);
 
         rsc = lrm_state_get_rsc_info(lrm_state, id, 0);
 
         if (!rsc) {
             fsa_data_t *msg_data = NULL;
 
             crm_err("Could not add resource %s to LRM %s", id, lrm_state->node_name);
             /* only register this as a internal error if this involves the local
              * lrmd. Otherwise we're likely dealing with an unresponsive remote-node
              * which is not a FSA failure. */
             if (lrm_state_is_local(lrm_state) == TRUE) {
                 register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL);
             }
         }
     }
 
     return rsc;
 }
 
 static void
 delete_resource(lrm_state_t * lrm_state,
                 const char *id,
                 lrmd_rsc_info_t * rsc,
                 GHashTableIter * gIter,
                 const char *sys,
                 const char *host,
                 const char *user,
                 ha_msg_input_t * request,
                 gboolean unregister)
 {
     int rc = pcmk_ok;
 
     crm_info("Removing resource %s for %s (%s) on %s", id, sys, user ? user : "internal", host);
 
     if (rsc && unregister) {
         rc = lrm_state_unregister_rsc(lrm_state, id, 0);
     }
 
     if (rc == pcmk_ok) {
         crm_trace("Resource '%s' deleted", id);
     } else if (rc == -EINPROGRESS) {
         crm_info("Deletion of resource '%s' pending", id);
         if (request) {
             struct pending_deletion_op_s *op = NULL;
             char *ref = crm_element_value_copy(request->msg, XML_ATTR_REFERENCE);
 
             op = calloc(1, sizeof(struct pending_deletion_op_s));
             op->rsc = strdup(rsc->id);
             op->input = copy_ha_msg_input(request);
             g_hash_table_insert(lrm_state->deletion_ops, ref, op);
         }
         return;
     } else {
         crm_warn("Deletion of resource '%s' for %s (%s) on %s failed: %d",
                  id, sys, user ? user : "internal", host, rc);
     }
 
     delete_rsc_entry(lrm_state, request, id, gIter, rc, user);
 }
 
 static int
 get_fake_call_id(lrm_state_t *lrm_state, const char *rsc_id)
 {
     int call_id = 0;
     rsc_history_t *entry = NULL;
 
     entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
 
     /* Make sure the call id is greater than the last successful operation,
      * otherwise the failure will not result in a possible recovery of the resource
      * as it could appear the failure occurred before the successful start */
     if (entry) {
         call_id = entry->last_callid + 1;
     }
 
     if (call_id < 0) {
         call_id = 1;
     }
     return call_id;
 }
 
 static void
 force_reprobe(lrm_state_t *lrm_state, const char *from_sys, const char *from_host, const char *user_name, gboolean is_remote_node)
 {
         GHashTableIter gIter;
         rsc_history_t *entry = NULL;
 
 
         crm_info("clearing resource history on node %s", lrm_state->node_name);
         g_hash_table_iter_init(&gIter, lrm_state->resource_history);
         while (g_hash_table_iter_next(&gIter, NULL, (void **)&entry)) {
             /* only unregister the resource during a reprobe if it is not a remote connection
              * resource. otherwise unregistering the connection will terminate remote-node
              * membership */
             gboolean unregister = TRUE;
 
             if (is_remote_lrmd_ra(NULL, NULL, entry->id)) {
                 lrm_state_t *remote_lrm_state = lrm_state_find(entry->id);
                 if (remote_lrm_state) {
                     /* when forcing a reprobe, make sure to clear remote node before
                      * clearing the remote node's connection resource */ 
                     force_reprobe(remote_lrm_state, from_sys, from_host, user_name, TRUE);
                 }
                 unregister = FALSE;
             }
 
             delete_resource(lrm_state, entry->id, &entry->rsc, &gIter, from_sys, from_host,
                             user_name, NULL, unregister);
         }
 
         /* Now delete the copy in the CIB */
         erase_status_tag(lrm_state->node_name, XML_CIB_TAG_LRM, cib_scope_local);
 
         /* And finally, _delete_ the value in attrd
          * Setting it to FALSE results in the PE sending us back here again
          */
         update_attrd(lrm_state->node_name, CRM_OP_PROBED, NULL, user_name, is_remote_node);
 }
 
 
 /*	 A_LRM_INVOKE	*/
 void
 do_lrm_invoke(long long action,
               enum crmd_fsa_cause cause,
               enum crmd_fsa_state cur_state,
               enum crmd_fsa_input current_input, fsa_data_t * msg_data)
 {
     gboolean create_rsc = TRUE;
     lrm_state_t *lrm_state = NULL;
     const char *crm_op = NULL;
     const char *from_sys = NULL;
     const char *from_host = NULL;
     const char *operation = NULL;
     ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg);
     const char *user_name = NULL;
     const char *target_node = NULL;
     gboolean is_remote_node = FALSE;
     gboolean crm_rsc_delete = FALSE;
 
     if (input->xml != NULL) {
         /* Remote node operations are routed here to their remote connections */
         target_node = crm_element_value(input->xml, XML_LRM_ATTR_TARGET);
     }
     if (target_node == NULL) {
         target_node = fsa_our_uname;
     } else if (safe_str_neq(target_node, fsa_our_uname)) {
         is_remote_node = TRUE;
     }
 
     lrm_state = lrm_state_find(target_node);
 
     if (lrm_state == NULL && is_remote_node) {
         crm_err("no lrmd connection for remote node %s found on cluster node %s. Can not process request.",
             target_node, fsa_our_uname);
         return;
     }
 
     CRM_ASSERT(lrm_state != NULL);
 
 #if ENABLE_ACL
     user_name = crm_acl_get_set_user(input->msg, F_CRM_USER, NULL);
     crm_trace("LRM command from user '%s'", user_name);
 #endif
 
     crm_op = crm_element_value(input->msg, F_CRM_TASK);
     from_sys = crm_element_value(input->msg, F_CRM_SYS_FROM);
     if (safe_str_neq(from_sys, CRM_SYSTEM_TENGINE)) {
         from_host = crm_element_value(input->msg, F_CRM_HOST_FROM);
     }
 
     crm_trace("LRM command from: %s", from_sys);
 
     if (safe_str_eq(crm_op, CRM_OP_LRM_DELETE)) {
         /* remember this delete op came from crm_resource */
         crm_rsc_delete = TRUE;
         operation = CRMD_ACTION_DELETE;
 
     } else if (safe_str_eq(crm_op, CRM_OP_LRM_REFRESH)) {
         operation = CRM_OP_LRM_REFRESH;
 
     } else if (safe_str_eq(crm_op, CRM_OP_LRM_FAIL)) {
         lrmd_event_data_t *op = NULL;
         lrmd_rsc_info_t *rsc = NULL;
         xmlNode *xml_rsc = find_xml_node(input->xml, XML_CIB_TAG_RESOURCE, TRUE);
 
         CRM_CHECK(xml_rsc != NULL, return);
 
         /* The lrmd can not fail a resource, it does not understand the
          * concept of success or failure in relation to a resource, it simply
          * executes operations and reports the results. We determine what a failure is.
          * Becaues of this, if we want to fail a resource we have to fake what we
          * understand a failure to look like.
          *
          * To do this we create a fake lrmd operation event for the resource
          * we want to fail.  We then pass that event to the lrmd client callback
          * so it will be processed as if it actually came from the lrmd. */
         op = construct_op(lrm_state, input->xml, ID(xml_rsc), "asyncmon");
         CRM_ASSERT(op != NULL);
 
         free((char *)op->user_data);
         op->user_data = NULL;
         op->call_id = get_fake_call_id(lrm_state, op->rsc_id);
         op->interval = 0;
         op->op_status = PCMK_LRM_OP_DONE;
         op->rc = PCMK_OCF_UNKNOWN_ERROR;
         op->t_run = time(NULL);
         op->t_rcchange = op->t_run;
 
 #if ENABLE_ACL
         if (user_name && is_privileged(user_name) == FALSE) {
             crm_err("%s does not have permission to fail %s", user_name, ID(xml_rsc));
             send_direct_ack(from_host, from_sys, NULL, op, ID(xml_rsc));
             lrmd_free_event(op);
             return;
         }
 #endif
 
         rsc = get_lrm_resource(lrm_state, xml_rsc, input->xml, create_rsc);
         if (rsc) {
             crm_info("Failing resource %s...", rsc->id);
             process_lrm_event(lrm_state, op);
             op->op_status = PCMK_LRM_OP_DONE;
             op->rc = PCMK_OCF_OK;
             lrmd_free_rsc_info(rsc);
         } else {
             crm_info("Cannot find/create resource in order to fail it...");
             crm_log_xml_warn(input->msg, "bad input");
         }
 
         send_direct_ack(from_host, from_sys, NULL, op, ID(xml_rsc));
         lrmd_free_event(op);
         return;
 
     } else if (input->xml != NULL) {
         operation = crm_element_value(input->xml, XML_LRM_ATTR_TASK);
     }
 
     if (safe_str_eq(crm_op, CRM_OP_LRM_REFRESH)) {
         int rc = pcmk_ok;
         xmlNode *fragment = do_lrm_query_internal(lrm_state, TRUE);
 
         fsa_cib_update(XML_CIB_TAG_STATUS, fragment, cib_quorum_override, rc, user_name);
         crm_info("Forced a local LRM refresh: call=%d", rc);
 
         if(strcmp(CRM_SYSTEM_CRMD, from_sys) != 0) {
             xmlNode *reply = create_request(
                 CRM_OP_INVOKE_LRM, fragment,
                 from_host, from_sys, CRM_SYSTEM_LRMD, fsa_our_uuid);
 
             crm_debug("ACK'ing refresh from %s (%s)", from_sys, from_host);
 
             if (relay_message(reply, TRUE) == FALSE) {
                 crm_log_xml_err(reply, "Unable to route reply");
             }
             free_xml(reply);
         }
 
         free_xml(fragment);
 
     } else if (safe_str_eq(crm_op, CRM_OP_LRM_QUERY)) {
         xmlNode *data = do_lrm_query_internal(lrm_state, FALSE);
         xmlNode *reply = create_reply(input->msg, data);
 
         if (relay_message(reply, TRUE) == FALSE) {
             crm_err("Unable to route reply");
             crm_log_xml_err(reply, "reply");
         }
         free_xml(reply);
         free_xml(data);
 
     } else if (safe_str_eq(operation, CRM_OP_PROBED)) {
         update_attrd(lrm_state->node_name, CRM_OP_PROBED, XML_BOOLEAN_TRUE, user_name, is_remote_node);
 
     } else if (safe_str_eq(operation, CRM_OP_REPROBE) || safe_str_eq(crm_op, CRM_OP_REPROBE)) {
         crm_notice("Forcing the status of all resources to be redetected");
 
         force_reprobe(lrm_state, from_sys, from_host, user_name, is_remote_node);
 
         if(strcmp(CRM_SYSTEM_TENGINE, from_sys) != 0
            && strcmp(CRM_SYSTEM_TENGINE, from_sys) != 0) {
             xmlNode *reply = create_request(
                 CRM_OP_INVOKE_LRM, NULL,
                 from_host, from_sys, CRM_SYSTEM_LRMD, fsa_our_uuid);
 
             crm_debug("ACK'ing re-probe from %s (%s)", from_sys, from_host);
 
             if (relay_message(reply, TRUE) == FALSE) {
                 crm_log_xml_err(reply, "Unable to route reply");
             }
             free_xml(reply);
         }
 
     } else if (operation != NULL) {
         lrmd_rsc_info_t *rsc = NULL;
         xmlNode *params = NULL;
         xmlNode *xml_rsc = find_xml_node(input->xml, XML_CIB_TAG_RESOURCE, TRUE);
 
         CRM_CHECK(xml_rsc != NULL, return);
 
         /* only the first 16 chars are used by the LRM */
         params = find_xml_node(input->xml, XML_TAG_ATTRS, TRUE);
 
         if (safe_str_eq(operation, CRMD_ACTION_DELETE)) {
             create_rsc = FALSE;
         }
 
         rsc = get_lrm_resource(lrm_state, xml_rsc, input->xml, create_rsc);
 
         if (rsc == NULL && create_rsc) {
             lrmd_event_data_t *op = NULL;
 
             /* if the operation couldn't complete because we can't register
              * the resource, return a generic error */
             op = construct_op(lrm_state, input->xml, ID(xml_rsc), operation);
             CRM_ASSERT(op != NULL);
 
             op->op_status = PCMK_LRM_OP_DONE;
             op->rc = PCMK_OCF_UNKNOWN_ERROR;
             op->t_run = time(NULL);
             op->t_rcchange = op->t_run;
 
             send_direct_ack(from_host, from_sys, NULL, op, ID(xml_rsc));
             lrmd_free_event(op);
 
             crm_err("Invalid resource definition");
             crm_log_xml_warn(input->msg, "bad input");
 
         } else if (rsc == NULL) {
             lrmd_event_data_t *op = NULL;
 
             crm_notice("Not creating resource for a %s event: %s", operation, ID(input->xml));
             delete_rsc_entry(lrm_state, input, ID(xml_rsc), NULL, pcmk_ok, user_name);
 
             op = construct_op(lrm_state, input->xml, ID(xml_rsc), operation);
             op->op_status = PCMK_LRM_OP_DONE;
             op->rc = PCMK_OCF_OK;
             CRM_ASSERT(op != NULL);
             send_direct_ack(from_host, from_sys, NULL, op, ID(xml_rsc));
             lrmd_free_event(op);
 
         } else if (safe_str_eq(operation, CRMD_ACTION_CANCEL)) {
             char *op_key = NULL;
             char *meta_key = NULL;
             int call = 0;
             const char *call_id = NULL;
             const char *op_task = NULL;
             const char *op_interval = NULL;
             gboolean in_progress = FALSE;
 
             CRM_CHECK(params != NULL, crm_log_xml_warn(input->xml, "Bad command");
                       return);
 
             meta_key = crm_meta_name(XML_LRM_ATTR_INTERVAL);
             op_interval = crm_element_value(params, meta_key);
             free(meta_key);
 
             meta_key = crm_meta_name(XML_LRM_ATTR_TASK);
             op_task = crm_element_value(params, meta_key);
             free(meta_key);
 
             meta_key = crm_meta_name(XML_LRM_ATTR_CALLID);
             call_id = crm_element_value(params, meta_key);
             free(meta_key);
 
             CRM_CHECK(op_task != NULL, crm_log_xml_warn(input->xml, "Bad command");
                       return);
             CRM_CHECK(op_interval != NULL, crm_log_xml_warn(input->xml, "Bad command");
                       return);
 
             op_key = generate_op_key(rsc->id, op_task, crm_parse_int(op_interval, "0"));
 
             crm_debug("PE requested op %s (call=%s) be cancelled",
                       op_key, call_id ? call_id : "NA");
             call = crm_parse_int(call_id, "0");
             if (call == 0) {
                 /* the normal case when the PE cancels a recurring op */
                 in_progress = cancel_op_key(lrm_state, rsc, op_key, TRUE);
 
             } else {
                 /* the normal case when the PE cancels an orphan op */
                 in_progress = cancel_op(lrm_state, rsc->id, NULL, call, TRUE);
             }
 
             if (in_progress == FALSE) {
                 lrmd_event_data_t *op = construct_op(lrm_state, input->xml, rsc->id, op_task);
 
                 crm_info("Nothing known about operation %d for %s", call, op_key);
                 delete_op_entry(lrm_state, NULL, rsc->id, op_key, call);
 
                 CRM_ASSERT(op != NULL);
 
                 op->rc = PCMK_OCF_OK;
                 op->op_status = PCMK_LRM_OP_DONE;
                 send_direct_ack(from_host, from_sys, rsc, op, rsc->id);
                 lrmd_free_event(op);
 
                 /* needed?? surely not otherwise the cancel_op_(_key) wouldn't
                  * have failed in the first place
                  */
                 g_hash_table_remove(lrm_state->pending_ops, op_key);
             }
 
             free(op_key);
 
         } else if (rsc != NULL && safe_str_eq(operation, CRMD_ACTION_DELETE)) {
             gboolean unregister = TRUE;
 
 #if ENABLE_ACL
             int cib_rc = delete_rsc_status(lrm_state, rsc->id, cib_dryrun | cib_sync_call, user_name);
             if (cib_rc != pcmk_ok) {
                 lrmd_event_data_t *op = NULL;
 
                 crm_err
                     ("Attempted deletion of resource status '%s' from CIB for %s (user=%s) on %s failed: (rc=%d) %s",
                      rsc->id, from_sys, user_name ? user_name : "unknown", from_host, cib_rc,
                      pcmk_strerror(cib_rc));
 
                 op = construct_op(lrm_state, input->xml, rsc->id, operation);
                 op->op_status = PCMK_LRM_OP_ERROR;
 
                 if (cib_rc == -EACCES) {
                     op->rc = PCMK_OCF_INSUFFICIENT_PRIV;
                 } else {
                     op->rc = PCMK_OCF_UNKNOWN_ERROR;
                 }
                 send_direct_ack(from_host, from_sys, NULL, op, rsc->id);
                 lrmd_free_event(op);
                 return;
             }
 #endif
             if (crm_rsc_delete == TRUE && is_remote_lrmd_ra(NULL, NULL, rsc->id)) {
                 unregister = FALSE;
             }
 
             delete_resource(lrm_state, rsc->id, rsc, NULL, from_sys, from_host, user_name, input, unregister);
 
         } else if (rsc != NULL) {
             do_lrm_rsc_op(lrm_state, rsc, operation, input->xml, input->msg);
         }
 
         lrmd_free_rsc_info(rsc);
 
     } else {
         crm_err("Operation was neither a lrm_query, nor a rsc op.  %s", crm_str(crm_op));
         register_fsa_error(C_FSA_INTERNAL, I_ERROR, NULL);
     }
 }
 
 static lrmd_event_data_t *
 construct_op(lrm_state_t * lrm_state, xmlNode * rsc_op, const char *rsc_id, const char *operation)
 {
     lrmd_event_data_t *op = NULL;
     const char *op_delay = NULL;
     const char *op_timeout = NULL;
     const char *op_interval = NULL;
     GHashTable *params = NULL;
 
     const char *transition = NULL;
 
     CRM_ASSERT(rsc_id != NULL);
 
     op = calloc(1, sizeof(lrmd_event_data_t));
     op->type = lrmd_event_exec_complete;
     op->op_type = strdup(operation);
     op->op_status = PCMK_LRM_OP_PENDING;
     op->rc = -1;
     op->rsc_id = strdup(rsc_id);
     op->interval = 0;
     op->timeout = 0;
     op->start_delay = 0;
 
     if (rsc_op == NULL) {
         CRM_LOG_ASSERT(safe_str_eq(CRMD_ACTION_STOP, operation));
         op->user_data = NULL;
         /* the stop_all_resources() case
          * by definition there is no DC (or they'd be shutting
          *   us down).
          * So we should put our version here.
          */
         op->params = g_hash_table_new_full(crm_str_hash, g_str_equal,
                                            g_hash_destroy_str, g_hash_destroy_str);
 
         g_hash_table_insert(op->params, strdup(XML_ATTR_CRM_VERSION), strdup(CRM_FEATURE_SET));
 
         crm_trace("Constructed %s op for %s", operation, rsc_id);
         return op;
     }
 
     params = xml2list(rsc_op);
     g_hash_table_remove(params, CRM_META "_op_target_rc");
 
     op_delay = crm_meta_value(params, XML_OP_ATTR_START_DELAY);
     op_timeout = crm_meta_value(params, XML_ATTR_TIMEOUT);
     op_interval = crm_meta_value(params, XML_LRM_ATTR_INTERVAL);
 
     op->interval = crm_parse_int(op_interval, "0");
     op->timeout = crm_parse_int(op_timeout, "0");
     op->start_delay = crm_parse_int(op_delay, "0");
 
     if (safe_str_neq(operation, RSC_STOP)) {
         op->params = params;
 
     } else {
         rsc_history_t *entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
 
         /* If we do not have stop parameters cached, use
          * whatever we are given */
         if (!entry || !entry->stop_params) {
             op->params = params;
         } else {
             /* Copy the cached parameter list so that we stop the resource
              * with the old attributes, not the new ones */
             op->params = g_hash_table_new_full(crm_str_hash, g_str_equal,
                                                g_hash_destroy_str, g_hash_destroy_str);
 
             g_hash_table_foreach(params, copy_meta_keys, op->params);
             g_hash_table_foreach(entry->stop_params, copy_instance_keys, op->params);
             g_hash_table_destroy(params);
             params = NULL;
         }
     }
 
     /* sanity */
     if (op->interval < 0) {
         op->interval = 0;
     }
     if (op->timeout <= 0) {
         op->timeout = op->interval;
     }
     if (op->start_delay < 0) {
         op->start_delay = 0;
     }
 
     transition = crm_element_value(rsc_op, XML_ATTR_TRANSITION_KEY);
     CRM_CHECK(transition != NULL, return op);
 
     op->user_data = strdup(transition);
 
     if (op->interval != 0) {
         if (safe_str_eq(operation, CRMD_ACTION_START)
             || safe_str_eq(operation, CRMD_ACTION_STOP)) {
             crm_err("Start and Stop actions cannot have an interval: %d", op->interval);
             op->interval = 0;
         }
     }
 
     crm_trace("Constructed %s op for %s: interval=%d", operation, rsc_id, op->interval);
 
     return op;
 }
 
 void
 send_direct_ack(const char *to_host, const char *to_sys,
                 lrmd_rsc_info_t * rsc, lrmd_event_data_t * op, const char *rsc_id)
 {
     xmlNode *reply = NULL;
     xmlNode *update, *iter;
     crm_node_t *peer = NULL;
 
     CRM_CHECK(op != NULL, return);
     if (op->rsc_id == NULL) {
         CRM_ASSERT(rsc_id != NULL);
         op->rsc_id = strdup(rsc_id);
     }
     if (to_sys == NULL) {
         to_sys = CRM_SYSTEM_TENGINE;
     }
 
     peer = crm_get_peer(0, fsa_our_uname);
     update = do_update_node_cib(peer, node_update_none, NULL, __FUNCTION__);
 
     iter = create_xml_node(update, XML_CIB_TAG_LRM);
     crm_xml_add(iter, XML_ATTR_ID, fsa_our_uuid);
     iter = create_xml_node(iter, XML_LRM_TAG_RESOURCES);
     iter = create_xml_node(iter, XML_LRM_TAG_RESOURCE);
 
     crm_xml_add(iter, XML_ATTR_ID, op->rsc_id);
 
     build_operation_update(iter, rsc, op, __FUNCTION__);
     reply = create_request(CRM_OP_INVOKE_LRM, update, to_host, to_sys, CRM_SYSTEM_LRMD, NULL);
 
     crm_log_xml_trace(update, "ACK Update");
 
     crm_debug("ACK'ing resource op %s_%s_%d from %s: %s",
               op->rsc_id, op->op_type, op->interval, op->user_data,
               crm_element_value(reply, XML_ATTR_REFERENCE));
 
     if (relay_message(reply, TRUE) == FALSE) {
         crm_log_xml_err(reply, "Unable to route reply");
     }
 
     free_xml(update);
     free_xml(reply);
 }
 
 gboolean
 verify_stopped(enum crmd_fsa_state cur_state, int log_level)
 {
     gboolean res = TRUE;
     GList *lrm_state_list = lrm_state_get_list();
     GList *state_entry;
 
     for (state_entry = lrm_state_list; state_entry != NULL; state_entry = state_entry->next) {
         lrm_state_t *lrm_state = state_entry->data;
 
         if (!lrm_state_verify_stopped(lrm_state, cur_state, log_level)) {
             /* keep iterating through all even when false is returned */
             res = FALSE;
         }
     }
 
     set_bit(fsa_input_register, R_SENT_RSC_STOP);
     g_list_free(lrm_state_list); lrm_state_list = NULL;
     return res;
 }
 
 struct stop_recurring_action_s {
     lrmd_rsc_info_t *rsc;
     lrm_state_t *lrm_state;
 };
 
 static gboolean
 stop_recurring_action_by_rsc(gpointer key, gpointer value, gpointer user_data)
 {
     gboolean remove = FALSE;
     struct stop_recurring_action_s *event = user_data;
     struct recurring_op_s *op = (struct recurring_op_s *)value;
 
     if (op->interval != 0 && crm_str_eq(op->rsc_id, event->rsc->id, TRUE)) {
         crm_debug("Cancelling op %d for %s (%s)", op->call_id, op->rsc_id, key);
         remove = !cancel_op(event->lrm_state, event->rsc->id, key, op->call_id, FALSE);
     }
 
     return remove;
 }
 
 static gboolean
 stop_recurring_actions(gpointer key, gpointer value, gpointer user_data)
 {
     gboolean remove = FALSE;
     lrm_state_t *lrm_state = user_data;
     struct recurring_op_s *op = (struct recurring_op_s *)value;
 
     if (op->interval != 0) {
         crm_info("Cancelling op %d for %s (%s)", op->call_id, op->rsc_id, key);
         remove = !cancel_op(lrm_state, op->rsc_id, key, op->call_id, FALSE);
     }
 
     return remove;
 }
 
 static void
 do_lrm_rsc_op(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, const char *operation, xmlNode * msg,
               xmlNode * request)
 {
     int call_id = 0;
     char *op_id = NULL;
     lrmd_event_data_t *op = NULL;
     lrmd_key_value_t *params = NULL;
     fsa_data_t *msg_data = NULL;
     const char *transition = NULL;
     gboolean stop_recurring = FALSE;
 
     CRM_CHECK(rsc != NULL, return);
     CRM_CHECK(operation != NULL, return);
 
     if (msg != NULL) {
         transition = crm_element_value(msg, XML_ATTR_TRANSITION_KEY);
         if (transition == NULL) {
             crm_log_xml_err(msg, "Missing transition number");
         }
     }
 
     op = construct_op(lrm_state, msg, rsc->id, operation);
     CRM_CHECK(op != NULL, return);
 
     if (is_remote_lrmd_ra(NULL, NULL, rsc->id)
         && op->interval == 0
         && strcmp(operation, CRMD_ACTION_MIGRATE) == 0) {
 
         /* pcmk remote connections are a special use case.
          * We never ever want to stop monitoring a connection resource until
          * the entire migration has completed. If the connection is ever unexpected
          * severed, even during a migration, this is an event we must detect.*/
         stop_recurring = FALSE;
 
     } else if (op->interval == 0
         && strcmp(operation, CRMD_ACTION_STATUS) != 0
         && strcmp(operation, CRMD_ACTION_NOTIFY) != 0) {
 
         /* stop any previous monitor operations before changing the resource state */
         stop_recurring = TRUE;
     }
 
     if (stop_recurring == TRUE) {
         guint removed = 0;
         struct stop_recurring_action_s data;
 
         data.rsc = rsc;
         data.lrm_state = lrm_state;
         removed = g_hash_table_foreach_remove(
             lrm_state->pending_ops, stop_recurring_action_by_rsc, &data);
 
         crm_debug("Stopped %u recurring operations in preparation for %s_%s_%d",
                   removed, rsc->id, operation, op->interval);
     }
 
     /* now do the op */
     crm_info("Performing key=%s op=%s_%s_%d", transition, rsc->id, operation, op->interval);
 
     if (fsa_state != S_NOT_DC && fsa_state != S_POLICY_ENGINE && fsa_state != S_TRANSITION_ENGINE) {
         if (safe_str_neq(operation, "fail")
             && safe_str_neq(operation, CRMD_ACTION_STOP)) {
             crm_info("Discarding attempt to perform action %s on %s in state %s",
                      operation, rsc->id, fsa_state2string(fsa_state));
             op->rc = 99;
             op->op_status = PCMK_LRM_OP_ERROR;
             send_direct_ack(NULL, NULL, rsc, op, rsc->id);
             lrmd_free_event(op);
             free(op_id);
             return;
         }
     }
 
     op_id = generate_op_key(rsc->id, op->op_type, op->interval);
 
     if (op->interval > 0) {
         /* cancel it so we can then restart it without conflict */
         cancel_op_key(lrm_state, rsc, op_id, FALSE);
     }
 
     if (op->params) {
         char *key = NULL;
         char *value = NULL;
         GHashTableIter iter;
 
         g_hash_table_iter_init(&iter, op->params);
         while (g_hash_table_iter_next(&iter, (gpointer *) & key, (gpointer *) & value)) {
             params = lrmd_key_value_add(params, key, value);
         }
     }
 
     call_id = lrm_state_exec(lrm_state,
                              rsc->id,
                              op->op_type,
                              op->user_data, op->interval, op->timeout, op->start_delay, params);
 
     if (call_id <= 0 && lrm_state_is_local(lrm_state)) {
         crm_err("Operation %s on %s failed: %d", operation, rsc->id, call_id);
         register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL);
 
     } else if (call_id <= 0) {
 
         crm_err("Operation %s on resource %s failed to execute on remote node %s: %d", operation, rsc->id, lrm_state->node_name, call_id);
         op->call_id = get_fake_call_id(lrm_state, rsc->id);
         op->op_status = PCMK_LRM_OP_DONE;
         op->rc = PCMK_OCF_UNKNOWN_ERROR;
         op->t_run = time(NULL);
         op->t_rcchange = op->t_run;
         process_lrm_event(lrm_state, op);
 
     } else {
         /* record all operations so we can wait
          * for them to complete during shutdown
          */
         char *call_id_s = make_stop_id(rsc->id, call_id);
         struct recurring_op_s *pending = NULL;
 
         pending = calloc(1, sizeof(struct recurring_op_s));
         crm_trace("Recording pending op: %d - %s %s", call_id, op_id, call_id_s);
 
         pending->call_id = call_id;
         pending->interval = op->interval;
         pending->op_type = strdup(operation);
         pending->op_key = strdup(op_id);
         pending->rsc_id = strdup(rsc->id);
         g_hash_table_replace(lrm_state->pending_ops, call_id_s, pending);
 
         if (op->interval > 0 && op->start_delay > START_DELAY_THRESHOLD) {
             char *uuid = NULL;
             int dummy = 0, target_rc = 0;
 
             crm_info("Faking confirmation of %s: execution postponed for over 5 minutes", op_id);
 
             decode_transition_key(op->user_data, &uuid, &dummy, &dummy, &target_rc);
             free(uuid);
 
             op->rc = target_rc;
             op->op_status = PCMK_LRM_OP_DONE;
             send_direct_ack(NULL, NULL, rsc, op, rsc->id);
         }
     }
 
     free(op_id);
     lrmd_free_event(op);
     return;
 }
 
 int last_resource_update = 0;
 
 static void
 cib_rsc_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data)
 {
     switch (rc) {
         case pcmk_ok:
         case -pcmk_err_diff_failed:
         case -pcmk_err_diff_resync:
             crm_trace("Resource update %d complete: rc=%d", call_id, rc);
             break;
         default:
             crm_warn("Resource update %d failed: (rc=%d) %s", call_id, rc, pcmk_strerror(rc));
     }
 
     if (call_id == last_resource_update) {
         last_resource_update = 0;
         trigger_fsa(fsa_source);
     }
 }
 
 static void
 remote_node_init_status(const char *node_name, int call_opt)
 {
     int call_id = 0;
     xmlNode *update = create_xml_node(NULL, XML_CIB_TAG_STATUS);
+    xmlNode *state;
 
-    simple_remote_node_status(node_name, update,__FUNCTION__);
+    state = simple_remote_node_status(node_name, update,__FUNCTION__);
+    crm_xml_add(state, XML_NODE_IS_REMOTE_FENCED, "false");
 
     fsa_cib_update(XML_CIB_TAG_STATUS, update, call_opt, call_id, NULL);
     if (call_id != pcmk_ok) {
         crm_debug("Failed to init status section for remote-node %s", node_name);
     }
     free_xml(update);
 }
 
 static void
 remote_node_clear_status(const char *node_name, int call_opt)
 {
     if (node_name == NULL) {
         return;
     }
     remote_node_init_status(node_name, call_opt);
     erase_status_tag(node_name, XML_CIB_TAG_LRM, call_opt);
     erase_status_tag(node_name, XML_TAG_TRANSIENT_NODEATTRS, call_opt);
 }
 
 static int
 do_update_resource(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, lrmd_event_data_t * op)
 {
 /*
   <status>
   <nodes_status id=uname>
   <lrm>
   <lrm_resources>
   <lrm_resource id=...>
   </...>
 */
     int rc = pcmk_ok;
     xmlNode *update, *iter = NULL;
     int call_opt = cib_quorum_override;
     const char *uuid = NULL;
 
     CRM_CHECK(op != NULL, return 0);
 
     if (fsa_state == S_ELECTION || fsa_state == S_PENDING) {
         crm_info("Sending update to local CIB in state: %s", fsa_state2string(fsa_state));
         call_opt |= cib_scope_local;
     }
 
     iter = create_xml_node(iter, XML_CIB_TAG_STATUS);
     update = iter;
     iter = create_xml_node(iter, XML_CIB_TAG_STATE);
 
     if (safe_str_eq(lrm_state->node_name, fsa_our_uname)) {
         uuid = fsa_our_uuid;
 
     } else {
         /* remote nodes uuid and uname are equal */
         uuid = lrm_state->node_name;
         crm_xml_add(iter, XML_NODE_IS_REMOTE, "true");
     }
 
     CRM_LOG_ASSERT(uuid != NULL);
     if(uuid == NULL) {
         rc = -EINVAL;
         goto done;
     }
 
     crm_xml_add(iter, XML_ATTR_UUID,  uuid);
     crm_xml_add(iter, XML_ATTR_UNAME, lrm_state->node_name);
     crm_xml_add(iter, XML_ATTR_ORIGIN, __FUNCTION__);
 
     iter = create_xml_node(iter, XML_CIB_TAG_LRM);
     crm_xml_add(iter, XML_ATTR_ID, uuid);
 
     iter = create_xml_node(iter, XML_LRM_TAG_RESOURCES);
     iter = create_xml_node(iter, XML_LRM_TAG_RESOURCE);
     crm_xml_add(iter, XML_ATTR_ID, op->rsc_id);
 
     build_operation_update(iter, rsc, op, __FUNCTION__);
 
     if (rsc) {
         const char *container = NULL;
 
         crm_xml_add(iter, XML_ATTR_TYPE, rsc->type);
         crm_xml_add(iter, XML_AGENT_ATTR_CLASS, rsc->class);
         crm_xml_add(iter, XML_AGENT_ATTR_PROVIDER, rsc->provider);
 
         if (op->params) {
             container = g_hash_table_lookup(op->params, CRM_META"_"XML_RSC_ATTR_CONTAINER);
         }
         if (container) {
             crm_trace("Resource %s is a part of container resource %s", op->rsc_id, container);
             crm_xml_add(iter, XML_RSC_ATTR_CONTAINER, container);
         }
 
         CRM_CHECK(rsc->type != NULL, crm_err("Resource %s has no value for type", op->rsc_id));
         CRM_CHECK(rsc->class != NULL, crm_err("Resource %s has no value for class", op->rsc_id));
 
         /* check to see if we need to initialize remote-node related status sections */
         if (safe_str_eq(op->op_type, "start") && op->rc == 0 && op->op_status == PCMK_LRM_OP_DONE) {
             const char *remote_node = g_hash_table_lookup(op->params, CRM_META"_remote_node");
 
             if (remote_node) {
                 /* A container for a remote-node has started, initalize remote-node's status */
                 crm_info("Initalizing lrm status for container remote-node %s. Container successfully started.", remote_node);
                 remote_node_clear_status(remote_node, call_opt);
             } else if (container == FALSE && safe_str_eq(rsc->type, "remote") && safe_str_eq(rsc->provider, "pacemaker")) {
                 /* baremetal remote node connection resource has started, initalize remote-node's status */
                 crm_info("Initializing lrm status for baremetal remote-node %s", rsc->id);
                 remote_node_clear_status(rsc->id, call_opt);
             }
         }
 
     } else {
         crm_warn("Resource %s no longer exists in the lrmd", op->rsc_id);
         send_direct_ack(NULL, NULL, rsc, op, op->rsc_id);
         goto cleanup;
     }
 
     crm_log_xml_trace(update, __FUNCTION__);
 
     /* make it an asyncronous call and be done with it
      *
      * Best case:
      *   the resource state will be discovered during
      *   the next signup or election.
      *
      * Bad case:
      *   we are shutting down and there is no DC at the time,
      *   but then why were we shutting down then anyway?
      *   (probably because of an internal error)
      *
      * Worst case:
      *   we get shot for having resources "running" when the really weren't
      *
      * the alternative however means blocking here for too long, which
      * isnt acceptable
      */
     fsa_cib_update(XML_CIB_TAG_STATUS, update, call_opt, rc, NULL);
 
     if (rc > 0) {
         last_resource_update = rc;
     }
   done:
     /* the return code is a call number, not an error code */
     crm_trace("Sent resource state update message: %d for %s=%d on %s", rc,
               op->op_type, op->interval, op->rsc_id);
     fsa_register_cib_callback(rc, FALSE, NULL, cib_rsc_callback);
 
   cleanup:
     free_xml(update);
     return rc;
 }
 
 void
 do_lrm_event(long long action,
              enum crmd_fsa_cause cause,
              enum crmd_fsa_state cur_state, enum crmd_fsa_input cur_input, fsa_data_t * msg_data)
 {
     CRM_CHECK(FALSE, return);
 }
 
 gboolean
 process_lrm_event(lrm_state_t * lrm_state, lrmd_event_data_t * op)
 {
     char *op_id = NULL;
     char *op_key = NULL;
 
     int update_id = 0;
     gboolean removed = FALSE;
     lrmd_rsc_info_t *rsc = NULL;
 
     struct recurring_op_s *pending = NULL;
 
     CRM_CHECK(op != NULL, return FALSE);
 
     CRM_CHECK(op->rsc_id != NULL, return FALSE);
     op_id = make_stop_id(op->rsc_id, op->call_id);
     pending = g_hash_table_lookup(lrm_state->pending_ops, op_id);
     op_key = generate_op_key(op->rsc_id, op->op_type, op->interval);
     rsc = lrm_state_get_rsc_info(lrm_state, op->rsc_id, 0);
 
     if (op->op_status == PCMK_LRM_OP_ERROR) {
         switch(op->rc) {
             case PCMK_OCF_NOT_RUNNING:
             case PCMK_OCF_RUNNING_MASTER:
             case PCMK_OCF_DEGRADED:
             case PCMK_OCF_DEGRADED_MASTER:
                 /* Leave it up to the TE/PE to decide if this is an error */
                 op->op_status = PCMK_LRM_OP_DONE;
                 break;
             default:
                 /* Nothing to do */
                 break;
         }
     }
 
     if (op->op_status != PCMK_LRM_OP_CANCELLED) {
         if (safe_str_eq(op->op_type, RSC_NOTIFY)) {
             /* Keep notify ops out of the CIB */
             send_direct_ack(NULL, NULL, NULL, op, op->rsc_id);
         } else {
             update_id = do_update_resource(lrm_state, rsc, op);
         }
     } else if (op->interval == 0) {
         /* This will occur when "crm resource cleanup" is called while actions are in-flight */
         crm_err("Op %s (call=%d): Cancelled", op_key, op->call_id);
         send_direct_ack(NULL, NULL, NULL, op, op->rsc_id);
 
     } else if (pending == NULL) {
         /* We don't need to do anything for cancelled ops
          * that are not in our pending op list. There are no
          * transition actions waiting on these operations. */
 
     } else if (op->user_data == NULL) {
         /* At this point we have a pending entry, but no transition
          * key present in the user_data field. report this */
         crm_err("Op %s (call=%d): No user data", op_key, op->call_id);
 
     } else if (pending->remove) {
         /* The tengine canceled this op, we have been waiting for the cancel to finish. */
         delete_op_entry(lrm_state, op, op->rsc_id, op_key, op->call_id);
 
     } else if (pending && op->rsc_deleted) {
         /* The tengine initiated this op, but it was cancelled outside of the
          * tengine's control during a resource cleanup/re-probe request. The tengine
          * must be alerted that this operation completed, otherwise the tengine
          * will continue waiting for this update to occur until it is timed out.
          * We don't want this update going to the cib though, so use a direct ack. */
         crm_trace("Op %s (call=%d): cancelled due to rsc deletion", op_key, op->call_id);
         send_direct_ack(NULL, NULL, NULL, op, op->rsc_id);
 
     } else {
         /* Before a stop is called, no need to direct ack */
         crm_trace("Op %s (call=%d): no delete event required", op_key, op->call_id);
     }
 
     if ((op->interval == 0) && g_hash_table_remove(lrm_state->pending_ops, op_id)) {
         removed = TRUE;
         crm_trace("Op %s (call=%d, stop-id=%s, remaining=%u): Confirmed",
                   op_key, op->call_id, op_id, g_hash_table_size(lrm_state->pending_ops));
 
     } else if(op->interval != 0 && op->op_status == PCMK_LRM_OP_CANCELLED) {
         removed = TRUE;
         g_hash_table_remove(lrm_state->pending_ops, op_id);
     }
 
     switch (op->op_status) {
         case PCMK_LRM_OP_CANCELLED:
             crm_info("Operation %s: %s (node=%s, call=%d, confirmed=%s)",
                      op_key, services_lrm_status_str(op->op_status), lrm_state->node_name,
                      op->call_id, removed ? "true" : "false");
             break;
 
         case PCMK_LRM_OP_DONE:
             crm_notice("Operation %s: %s (node=%s, call=%d, rc=%d, cib-update=%d, confirmed=%s)",
                        op_key, services_ocf_exitcode_str(op->rc), lrm_state->node_name,
                        op->call_id, op->rc, update_id, removed ? "true" : "false");
             break;
 
         case PCMK_LRM_OP_TIMEOUT:
             crm_err("Operation %s: %s (node=%s, call=%d, timeout=%dms)",
                     op_key, services_lrm_status_str(op->op_status), lrm_state->node_name, op->call_id, op->timeout);
             break;
 
         default:
             crm_err("Operation %s (node=%s, call=%d, status=%d, cib-update=%d, confirmed=%s) %s",
                     op_key, lrm_state->node_name, op->call_id, op->op_status, update_id, removed ? "true" : "false",
                     services_lrm_status_str(op->op_status));
     }
 
     if (op->output) {
         char *prefix =
             crm_strdup_printf("%s-%s_%s_%d:%d", lrm_state->node_name, op->rsc_id, op->op_type, op->interval, op->call_id);
 
         if (op->rc) {
             crm_log_output(LOG_NOTICE, prefix, op->output);
         } else {
             crm_log_output(LOG_DEBUG, prefix, op->output);
         }
         free(prefix);
     }
 
     if (op->rsc_deleted) {
         crm_info("Deletion of resource '%s' complete after %s", op->rsc_id, op_key);
         delete_rsc_entry(lrm_state, NULL, op->rsc_id, NULL, pcmk_ok, NULL);
     }
 
     /* If a shutdown was escalated while operations were pending,
      * then the FSA will be stalled right now... allow it to continue
      */
     mainloop_set_trigger(fsa_source);
     update_history_cache(lrm_state, rsc, op);
 
     lrmd_free_rsc_info(rsc);
     free(op_key);
     free(op_id);
 
     return TRUE;
 }
diff --git a/crmd/te_actions.c b/crmd/te_actions.c
index c001b60886..db65681478 100644
--- a/crmd/te_actions.c
+++ b/crmd/te_actions.c
@@ -1,733 +1,739 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public
  * License as published by the Free Software Foundation; either
  * version 2 of the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * General Public License for more details.
  *
  * You should have received a copy of the GNU General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 #include <crm_internal.h>
 
 #include <sys/param.h>
 #include <crm/crm.h>
 #include <crm/cib.h>
 #include <crm/msg_xml.h>
 
 #include <crm/common/xml.h>
 #include <tengine.h>
 
 #include <crmd_fsa.h>
 #include <crmd_messages.h>
 #include <crm/cluster.h>
 #include <throttle.h>
 
 char *te_uuid = NULL;
 GHashTable *te_targets = NULL;
 void send_rsc_command(crm_action_t * action);
 static void te_update_job_count(crm_action_t * action, int offset);
 
 static void
 te_start_action_timer(crm_graph_t * graph, crm_action_t * action)
 {
     action->timer = calloc(1, sizeof(crm_action_timer_t));
     action->timer->timeout = action->timeout;
     action->timer->reason = timeout_action;
     action->timer->action = action;
     action->timer->source_id = g_timeout_add(action->timer->timeout + graph->network_delay,
                                              action_timer_callback, (void *)action->timer);
 
     CRM_ASSERT(action->timer->source_id != 0);
 }
 
 static gboolean
 te_pseudo_action(crm_graph_t * graph, crm_action_t * pseudo)
 {
     crm_debug("Pseudo action %d fired and confirmed", pseudo->id);
     te_action_confirmed(pseudo);
     update_graph(graph, pseudo);
     trigger_graph();
     return TRUE;
 }
 
 void
 send_stonith_update(crm_action_t * action, const char *target, const char *uuid)
 {
     int rc = pcmk_ok;
     crm_node_t *peer = NULL;
 
     /* zero out the node-status & remove all LRM status info */
     xmlNode *node_state = NULL;
 
     CRM_CHECK(target != NULL, return);
     CRM_CHECK(uuid != NULL, return);
 
     /* Make sure the membership and join caches are accurate */
     peer = crm_get_peer_full(0, target, CRM_GET_PEER_CLUSTER | CRM_GET_PEER_REMOTE);
 
     CRM_CHECK(peer != NULL, return);
 
     if (peer->uuid == NULL) {
         crm_info("Recording uuid '%s' for node '%s'", uuid, target);
         peer->uuid = strdup(uuid);
     }
 
     crmd_peer_down(peer, TRUE);
     node_state =
         do_update_node_cib(peer,
                            node_update_cluster | node_update_peer | node_update_join |
                            node_update_expected, NULL, __FUNCTION__);
 
+
+    /* we have to mark whether or not remote nodes have already been fenced */
+    if (peer->flags & crm_remote_node) {
+        crm_xml_add(node_state, XML_NODE_IS_REMOTE_FENCED, "true");
+    }
+
     /* Force our known ID */
     crm_xml_add(node_state, XML_ATTR_UUID, uuid);
 
     rc = fsa_cib_conn->cmds->update(fsa_cib_conn, XML_CIB_TAG_STATUS, node_state,
                                     cib_quorum_override | cib_scope_local | cib_can_create);
 
     /* Delay processing the trigger until the update completes */
     crm_debug("Sending fencing update %d for %s", rc, target);
     fsa_register_cib_callback(rc, FALSE, strdup(target), cib_fencing_updated);
 
     /* Make sure it sticks */
     /* fsa_cib_conn->cmds->bump_epoch(fsa_cib_conn, cib_quorum_override|cib_scope_local);    */
 
     erase_status_tag(peer->uname, XML_CIB_TAG_LRM, cib_scope_local);
     erase_status_tag(peer->uname, XML_TAG_TRANSIENT_NODEATTRS, cib_scope_local);
 
     free_xml(node_state);
     return;
 }
 
 static gboolean
 te_fence_node(crm_graph_t * graph, crm_action_t * action)
 {
     int rc = 0;
     const char *id = NULL;
     const char *uuid = NULL;
     const char *target = NULL;
     const char *type = NULL;
     gboolean invalid_action = FALSE;
     enum stonith_call_options options = st_opt_none;
 
     id = ID(action->xml);
     target = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
     uuid = crm_element_value(action->xml, XML_LRM_ATTR_TARGET_UUID);
     type = crm_meta_value(action->params, "stonith_action");
 
     CRM_CHECK(id != NULL, invalid_action = TRUE);
     CRM_CHECK(uuid != NULL, invalid_action = TRUE);
     CRM_CHECK(type != NULL, invalid_action = TRUE);
     CRM_CHECK(target != NULL, invalid_action = TRUE);
 
     if (invalid_action) {
         crm_log_xml_warn(action->xml, "BadAction");
         return FALSE;
     }
 
     crm_notice("Executing %s fencing operation (%s) on %s (timeout=%d)",
                type, id, target, transition_graph->stonith_timeout);
 
     /* Passing NULL means block until we can connect... */
     te_connect_stonith(NULL);
 
     if (crmd_join_phase_count(crm_join_confirmed) == 1) {
         options |= st_opt_allow_suicide;
     }
 
     rc = stonith_api->cmds->fence(stonith_api, options, target, type,
                                   transition_graph->stonith_timeout / 1000, 0);
 
     stonith_api->cmds->register_callback(stonith_api, rc, transition_graph->stonith_timeout / 1000,
                                          st_opt_timeout_updates,
                                          generate_transition_key(transition_graph->id, action->id,
                                                                  0, te_uuid),
                                          "tengine_stonith_callback", tengine_stonith_callback);
 
     return TRUE;
 }
 
 static int
 get_target_rc(crm_action_t * action)
 {
     const char *target_rc_s = crm_meta_value(action->params, XML_ATTR_TE_TARGET_RC);
 
     if (target_rc_s != NULL) {
         return crm_parse_int(target_rc_s, "0");
     }
     return 0;
 }
 
 static gboolean
 te_crm_command(crm_graph_t * graph, crm_action_t * action)
 {
     char *counter = NULL;
     xmlNode *cmd = NULL;
     gboolean is_local = FALSE;
 
     const char *id = NULL;
     const char *task = NULL;
     const char *value = NULL;
     const char *on_node = NULL;
     const char *router_node = NULL;
 
     gboolean rc = TRUE;
     gboolean no_wait = FALSE;
 
     id = ID(action->xml);
     task = crm_element_value(action->xml, XML_LRM_ATTR_TASK);
     on_node = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
     router_node = crm_element_value(action->xml, XML_LRM_ATTR_ROUTER_NODE);
 
     if (!router_node) {
         router_node = on_node;
     }
 
     CRM_CHECK(on_node != NULL && strlen(on_node) != 0,
               crm_err("Corrupted command (id=%s) %s: no node", crm_str(id), crm_str(task));
               return FALSE);
 
     crm_info("Executing crm-event (%s): %s on %s%s%s",
              crm_str(id), crm_str(task), on_node,
              is_local ? " (local)" : "", no_wait ? " - no waiting" : "");
 
     if (safe_str_eq(router_node, fsa_our_uname)) {
         is_local = TRUE;
     }
 
     value = crm_meta_value(action->params, XML_ATTR_TE_NOWAIT);
     if (crm_is_true(value)) {
         no_wait = TRUE;
     }
 
     if (is_local && safe_str_eq(task, CRM_OP_SHUTDOWN)) {
         /* defer until everything else completes */
         crm_info("crm-event (%s) is a local shutdown", crm_str(id));
         graph->completion_action = tg_shutdown;
         graph->abort_reason = "local shutdown";
         te_action_confirmed(action);
         update_graph(graph, action);
         trigger_graph();
         return TRUE;
 
     } else if (safe_str_eq(task, CRM_OP_SHUTDOWN)) {
         crm_node_t *peer = crm_get_peer(0, router_node);
         crm_update_peer_expected(__FUNCTION__, peer, CRMD_JOINSTATE_DOWN);
     }
 
     cmd = create_request(task, action->xml, router_node, CRM_SYSTEM_CRMD, CRM_SYSTEM_TENGINE, NULL);
 
     counter =
         generate_transition_key(transition_graph->id, action->id, get_target_rc(action), te_uuid);
     crm_xml_add(cmd, XML_ATTR_TRANSITION_KEY, counter);
 
     rc = send_cluster_message(crm_get_peer(0, router_node), crm_msg_crmd, cmd, TRUE);
     free(counter);
     free_xml(cmd);
 
     if (rc == FALSE) {
         crm_err("Action %d failed: send", action->id);
         return FALSE;
 
     } else if (no_wait) {
         te_action_confirmed(action);
         update_graph(graph, action);
         trigger_graph();
 
     } else {
         if (action->timeout <= 0) {
             crm_err("Action %d: %s on %s had an invalid timeout (%dms).  Using %dms instead",
                     action->id, task, on_node, action->timeout, graph->network_delay);
             action->timeout = graph->network_delay;
         }
         te_start_action_timer(graph, action);
     }
 
     return TRUE;
 }
 
 gboolean
 cib_action_update(crm_action_t * action, int status, int op_rc)
 {
     lrmd_event_data_t *op = NULL;
     xmlNode *state = NULL;
     xmlNode *rsc = NULL;
     xmlNode *xml_op = NULL;
     xmlNode *action_rsc = NULL;
 
     int rc = pcmk_ok;
 
     const char *name = NULL;
     const char *value = NULL;
     const char *rsc_id = NULL;
     const char *task = crm_element_value(action->xml, XML_LRM_ATTR_TASK);
     const char *target = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
     const char *task_uuid = crm_element_value(action->xml, XML_LRM_ATTR_TASK_KEY);
     const char *target_uuid = crm_element_value(action->xml, XML_LRM_ATTR_TARGET_UUID);
 
     int call_options = cib_quorum_override | cib_scope_local;
     int target_rc = get_target_rc(action);
 
     if (status == PCMK_LRM_OP_PENDING) {
         crm_debug("%s %d: Recording pending operation %s on %s",
                   crm_element_name(action->xml), action->id, task_uuid, target);
     } else {
         crm_warn("%s %d: %s on %s timed out",
                  crm_element_name(action->xml), action->id, task_uuid, target);
     }
 
     action_rsc = find_xml_node(action->xml, XML_CIB_TAG_RESOURCE, TRUE);
     if (action_rsc == NULL) {
         return FALSE;
     }
 
     rsc_id = ID(action_rsc);
     CRM_CHECK(rsc_id != NULL, crm_log_xml_err(action->xml, "Bad:action");
               return FALSE);
 
 /*
   update the CIB
 
 <node_state id="hadev">
       <lrm>
         <lrm_resources>
           <lrm_resource id="rsc2" last_op="start" op_code="0" target="hadev"/>
 */
 
     state = create_xml_node(NULL, XML_CIB_TAG_STATE);
 
     crm_xml_add(state, XML_ATTR_UUID, target_uuid);
     crm_xml_add(state, XML_ATTR_UNAME, target);
 
     rsc = create_xml_node(state, XML_CIB_TAG_LRM);
     crm_xml_add(rsc, XML_ATTR_ID, target_uuid);
 
     rsc = create_xml_node(rsc, XML_LRM_TAG_RESOURCES);
     rsc = create_xml_node(rsc, XML_LRM_TAG_RESOURCE);
     crm_xml_add(rsc, XML_ATTR_ID, rsc_id);
 
     name = XML_ATTR_TYPE;
     value = crm_element_value(action_rsc, name);
     crm_xml_add(rsc, name, value);
     name = XML_AGENT_ATTR_CLASS;
     value = crm_element_value(action_rsc, name);
     crm_xml_add(rsc, name, value);
     name = XML_AGENT_ATTR_PROVIDER;
     value = crm_element_value(action_rsc, name);
     crm_xml_add(rsc, name, value);
 
     op = convert_graph_action(NULL, action, status, op_rc);
     op->call_id = -1;
     op->user_data = generate_transition_key(transition_graph->id, action->id, target_rc, te_uuid);
 
     xml_op = create_operation_update(rsc, op, CRM_FEATURE_SET, target_rc, target, __FUNCTION__, LOG_INFO);
     lrmd_free_event(op);
 
     crm_trace("Updating CIB with \"%s\" (%s): %s %s on %s",
               status < 0 ? "new action" : XML_ATTR_TIMEOUT,
               crm_element_name(action->xml), crm_str(task), rsc_id, target);
     crm_log_xml_trace(xml_op, "Op");
 
     rc = fsa_cib_conn->cmds->update(fsa_cib_conn, XML_CIB_TAG_STATUS, state, call_options);
 
     crm_trace("Updating CIB with %s action %d: %s on %s (call_id=%d)",
               services_lrm_status_str(status), action->id, task_uuid, target, rc);
 
     fsa_register_cib_callback(rc, FALSE, NULL, cib_action_updated);
     free_xml(state);
 
     action->sent_update = TRUE;
 
     if (rc < pcmk_ok) {
         return FALSE;
     }
 
     return TRUE;
 }
 
 static gboolean
 te_rsc_command(crm_graph_t * graph, crm_action_t * action)
 {
     /* never overwrite stop actions in the CIB with
      *   anything other than completed results
      *
      * Writing pending stops makes it look like the
      *   resource is running again
      */
     xmlNode *cmd = NULL;
     xmlNode *rsc_op = NULL;
 
     gboolean rc = TRUE;
     gboolean no_wait = FALSE;
     gboolean is_local = FALSE;
 
     char *counter = NULL;
     const char *task = NULL;
     const char *value = NULL;
     const char *on_node = NULL;
     const char *router_node = NULL;
     const char *task_uuid = NULL;
 
     CRM_ASSERT(action != NULL);
     CRM_ASSERT(action->xml != NULL);
 
     action->executed = FALSE;
     on_node = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
 
     CRM_CHECK(on_node != NULL && strlen(on_node) != 0,
               crm_err("Corrupted command(id=%s) %s: no node", ID(action->xml), crm_str(task));
               return FALSE);
 
     rsc_op = action->xml;
     task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
     task_uuid = crm_element_value(action->xml, XML_LRM_ATTR_TASK_KEY);
     router_node = crm_element_value(rsc_op, XML_LRM_ATTR_ROUTER_NODE);
 
     if (!router_node) {
         router_node = on_node;
     }
 
     counter =
         generate_transition_key(transition_graph->id, action->id, get_target_rc(action), te_uuid);
     crm_xml_add(rsc_op, XML_ATTR_TRANSITION_KEY, counter);
 
     if (safe_str_eq(router_node, fsa_our_uname)) {
         is_local = TRUE;
     }
 
     value = crm_meta_value(action->params, XML_ATTR_TE_NOWAIT);
     if (crm_is_true(value)) {
         no_wait = TRUE;
     }
 
     crm_notice("Initiating action %d: %s %s on %s%s%s",
                action->id, task, task_uuid, on_node,
                is_local ? " (local)" : "", no_wait ? " - no waiting" : "");
 
     cmd = create_request(CRM_OP_INVOKE_LRM, rsc_op, router_node,
                          CRM_SYSTEM_LRMD, CRM_SYSTEM_TENGINE, NULL);
 
     if (is_local) {
         /* shortcut local resource commands */
         ha_msg_input_t data = {
             .msg = cmd,
             .xml = rsc_op,
         };
 
         fsa_data_t msg = {
             .id = 0,
             .data = &data,
             .data_type = fsa_dt_ha_msg,
             .fsa_input = I_NULL,
             .fsa_cause = C_FSA_INTERNAL,
             .actions = A_LRM_INVOKE,
             .origin = __FUNCTION__,
         };
 
         do_lrm_invoke(A_LRM_INVOKE, C_FSA_INTERNAL, fsa_state, I_NULL, &msg);
 
     } else {
         rc = send_cluster_message(crm_get_peer(0, router_node), crm_msg_lrmd, cmd, TRUE);
     }
 
     free(counter);
     free_xml(cmd);
 
     action->executed = TRUE;
 
     if (rc == FALSE) {
         crm_err("Action %d failed: send", action->id);
         return FALSE;
 
     } else if (no_wait) {
         crm_info("Action %d confirmed - no wait", action->id);
         action->confirmed = TRUE; /* Just mark confirmed.
                                    * Don't bump the job count only to immediately decrement it
                                    */
         update_graph(transition_graph, action);
         trigger_graph();
 
     } else if (action->confirmed == TRUE) {
         crm_debug("Action %d: %s %s on %s(timeout %dms) was already confirmed.",
                   action->id, task, task_uuid, on_node, action->timeout);
     } else {
         if (action->timeout <= 0) {
             crm_err("Action %d: %s %s on %s had an invalid timeout (%dms).  Using %dms instead",
                     action->id, task, task_uuid, on_node, action->timeout, graph->network_delay);
             action->timeout = graph->network_delay;
         }
         te_update_job_count(action, 1);
         te_start_action_timer(graph, action);
     }
 
     value = crm_meta_value(action->params, XML_OP_ATTR_PENDING);
     if (crm_is_true(value)
         && safe_str_neq(task, CRMD_ACTION_CANCEL)
         && safe_str_neq(task, CRMD_ACTION_DELETE)) {
         /* write a "pending" entry to the CIB, inhibit notification */
         crm_debug("Recording pending op %s in the CIB", task_uuid);
         cib_action_update(action, PCMK_LRM_OP_PENDING, PCMK_OCF_UNKNOWN);
     }
 
     return TRUE;
 }
 
 struct te_peer_s
 {
         char *name;
         int jobs;
         int migrate_jobs;
 };
 
 static void te_peer_free(gpointer p)
 {
     struct te_peer_s *peer = p;
 
     free(peer->name);
     free(peer);
 }
 
 void te_reset_job_counts(void)
 {
     GHashTableIter iter;
     struct te_peer_s *peer = NULL;
 
     if(te_targets == NULL) {
         te_targets = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, te_peer_free);
     }
 
     g_hash_table_iter_init(&iter, te_targets);
     while (g_hash_table_iter_next(&iter, NULL, (gpointer *) & peer)) {
         peer->jobs = 0;
         peer->migrate_jobs = 0;
     }
 }
 
 static void
 te_update_job_count_on(const char *target, int offset, bool migrate)
 {
     struct te_peer_s *r = NULL;
 
     if(target == NULL || te_targets == NULL) {
         return;
     }
 
     r = g_hash_table_lookup(te_targets, target);
     if(r == NULL) {
         r = calloc(1, sizeof(struct te_peer_s));
         r->name = strdup(target);
         g_hash_table_insert(te_targets, r->name, r);
     }
 
     r->jobs += offset;
     if(migrate) {
         r->migrate_jobs += offset;
     }
     crm_trace("jobs[%s] = %d", target, r->jobs);
 }
 
 static void
 te_update_job_count(crm_action_t * action, int offset)
 {
     const char *task = crm_element_value(action->xml, XML_LRM_ATTR_TASK);
     const char *target = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
 
     if (action->type != action_type_rsc || target == NULL) {
         /* No limit on these */
         return;
     }
 
     /* if we have a router node, this means the action is performing
      * on a remote node. For now, we count all action occuring on a
      * remote node against the job list on the cluster node hosting
      * the connection resources */
     target = crm_element_value(action->xml, XML_LRM_ATTR_ROUTER_NODE);
 
     if ((target == NULL) &&
         (safe_str_eq(task, CRMD_ACTION_MIGRATE) || safe_str_eq(task, CRMD_ACTION_MIGRATED))) {
 
         const char *t1 = crm_meta_value(action->params, XML_LRM_ATTR_MIGRATE_SOURCE);
         const char *t2 = crm_meta_value(action->params, XML_LRM_ATTR_MIGRATE_TARGET);
 
         te_update_job_count_on(t1, offset, TRUE);
         te_update_job_count_on(t2, offset, TRUE);
         return;
     } else if (target == NULL) {
         target = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
     }
 
     te_update_job_count_on(target, offset, FALSE);
 }
 
 static gboolean
 te_should_perform_action_on(crm_graph_t * graph, crm_action_t * action, const char *target)
 {
     int limit = 0;
     struct te_peer_s *r = NULL;
     const char *task = crm_element_value(action->xml, XML_LRM_ATTR_TASK);
     const char *id = crm_element_value(action->xml, XML_LRM_ATTR_TASK_KEY);
 
     if(target == NULL) {
         /* No limit on these */
         return TRUE;
 
     } else if(te_targets == NULL) {
         return FALSE;
     }
 
     r = g_hash_table_lookup(te_targets, target);
     limit = throttle_get_job_limit(target);
 
     if(r == NULL) {
         r = calloc(1, sizeof(struct te_peer_s));
         r->name = strdup(target);
         g_hash_table_insert(te_targets, r->name, r);
     }
 
     if(limit <= r->jobs) {
         crm_trace("Peer %s is over their job limit of %d (%d): deferring %s",
                   target, limit, r->jobs, id);
         return FALSE;
 
     } else if(graph->migration_limit > 0 && r->migrate_jobs >= graph->migration_limit) {
         if (safe_str_eq(task, CRMD_ACTION_MIGRATE) || safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
             crm_trace("Peer %s is over their migration job limit of %d (%d): deferring %s",
                       target, graph->migration_limit, r->migrate_jobs, id);
             return FALSE;
         }
     }
 
     crm_trace("Peer %s has not hit their limit yet. current jobs = %d limit= %d limit", target, r->jobs, limit);
 
     return TRUE;
 }
 
 static gboolean
 te_should_perform_action(crm_graph_t * graph, crm_action_t * action)
 {
     const char *target = NULL;
     const char *task = crm_element_value(action->xml, XML_LRM_ATTR_TASK);
 
     if (action->type != action_type_rsc) {
         /* No limit on these */
         return TRUE;
     }
 
     /* if we have a router node, this means the action is performing
      * on a remote node. For now, we count all action occuring on a
      * remote node against the job list on the cluster node hosting
      * the connection resources */
     target = crm_element_value(action->xml, XML_LRM_ATTR_ROUTER_NODE);
 
     if ((target == NULL) &&
         (safe_str_eq(task, CRMD_ACTION_MIGRATE) || safe_str_eq(task, CRMD_ACTION_MIGRATED))) {
 
         target = crm_meta_value(action->params, XML_LRM_ATTR_MIGRATE_SOURCE);
         if(te_should_perform_action_on(graph, action, target) == FALSE) {
             return FALSE;
         }
 
         target = crm_meta_value(action->params, XML_LRM_ATTR_MIGRATE_TARGET);
 
     } else if (target == NULL) {
         target = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
     }
 
     return te_should_perform_action_on(graph, action, target);
 }
 
 void
 te_action_confirmed(crm_action_t * action)
 {
     const char *target = crm_element_value(action->xml, XML_LRM_ATTR_TARGET);
 
     if (action->confirmed == FALSE && action->type == action_type_rsc && target != NULL) {
         te_update_job_count(action, -1);
     }
     action->confirmed = TRUE;
 }
 
 
 crm_graph_functions_t te_graph_fns = {
     te_pseudo_action,
     te_rsc_command,
     te_crm_command,
     te_fence_node,
     te_should_perform_action,
 };
 
 void
 notify_crmd(crm_graph_t * graph)
 {
     const char *type = "unknown";
     enum crmd_fsa_input event = I_NULL;
 
     crm_debug("Processing transition completion in state %s", fsa_state2string(fsa_state));
 
     CRM_CHECK(graph->complete, graph->complete = TRUE);
 
     switch (graph->completion_action) {
         case tg_stop:
             type = "stop";
             if (fsa_state == S_TRANSITION_ENGINE) {
                 event = I_TE_SUCCESS;
             }
             break;
         case tg_done:
             type = "done";
             if (fsa_state == S_TRANSITION_ENGINE) {
                 event = I_TE_SUCCESS;
             }
             break;
 
         case tg_restart:
             type = "restart";
             if (fsa_state == S_TRANSITION_ENGINE) {
                 if (too_many_st_failures() == FALSE) {
                     if (transition_timer->period_ms > 0) {
                         crm_timer_stop(transition_timer);
                         crm_timer_start(transition_timer);
                     } else {
                         event = I_PE_CALC;
                     }
                 } else {
                     event = I_TE_SUCCESS;
                 }
 
             } else if (fsa_state == S_POLICY_ENGINE) {
                 register_fsa_action(A_PE_INVOKE);
             }
             break;
 
         case tg_shutdown:
             type = "shutdown";
             if (is_set(fsa_input_register, R_SHUTDOWN)) {
                 event = I_STOP;
 
             } else {
                 crm_err("We didn't ask to be shut down, yet our" " PE is telling us too.");
                 event = I_TERMINATE;
             }
     }
 
     crm_debug("Transition %d status: %s - %s", graph->id, type, crm_str(graph->abort_reason));
 
     graph->abort_reason = NULL;
     graph->completion_action = tg_done;
     clear_bit(fsa_input_register, R_IN_TRANSITION);
 
     if (event != I_NULL) {
         register_fsa_input(C_FSA_INTERNAL, event, NULL);
 
     } else if (fsa_source) {
         mainloop_set_trigger(fsa_source);
     }
 }
diff --git a/include/crm/msg_xml.h b/include/crm/msg_xml.h
index 9d2737b76a..1a2515ff93 100644
--- a/include/crm/msg_xml.h
+++ b/include/crm/msg_xml.h
@@ -1,406 +1,407 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public
  * License as published by the Free Software Foundation; either
  * version 2 of the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * General Public License for more details.
  *
  * You should have received a copy of the GNU General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 #ifndef XML_TAGS__H
 #  define XML_TAGS__H
 
 #  ifndef F_ORIG
 #    define F_ORIG    "src"
 #  endif
 
 #  ifndef F_SEQ
 #    define F_SEQ		"seq"
 #  endif
 
 #  ifndef F_SUBTYPE
 #    define F_SUBTYPE "subt"
 #  endif
 
 #  ifndef F_TYPE
 #    define F_TYPE    "t"
 #  endif
 
 #  ifndef F_CLIENTNAME
 #    define	F_CLIENTNAME	"cn"
 #  endif
 
 #  ifndef F_XML_TAGNAME
 #    define F_XML_TAGNAME	"__name__"
 #  endif
 
 #  ifndef T_CRM
 #    define T_CRM     "crmd"
 #  endif
 
 #  ifndef T_ATTRD
 #    define T_ATTRD     "attrd"
 #  endif
 
 #  define CIB_OPTIONS_FIRST "cib-bootstrap-options"
 
 #  define F_CRM_DATA			"crm_xml"
 #  define F_CRM_TASK			"crm_task"
 #  define F_CRM_HOST_TO			"crm_host_to"
 #  define F_CRM_MSG_TYPE		F_SUBTYPE
 #  define F_CRM_SYS_TO			"crm_sys_to"
 #  define F_CRM_SYS_FROM		"crm_sys_from"
 #  define F_CRM_HOST_FROM		F_ORIG
 #  define F_CRM_REFERENCE		XML_ATTR_REFERENCE
 #  define F_CRM_VERSION			XML_ATTR_VERSION
 #  define F_CRM_ORIGIN			"origin"
 #  define F_CRM_USER			"crm_user"
 #  define F_CRM_JOIN_ID			"join_id"
 #  define F_CRM_ELECTION_ID		"election-id"
 #  define F_CRM_ELECTION_AGE_S		"election-age-sec"
 #  define F_CRM_ELECTION_AGE_US		"election-age-nano-sec"
 #  define F_CRM_ELECTION_OWNER		"election-owner"
 #  define F_CRM_TGRAPH			"crm-tgraph"
 #  define F_CRM_TGRAPH_INPUT		"crm-tgraph-in"
 
 #  define F_CRM_THROTTLE_MODE		"crm-limit-mode"
 #  define F_CRM_THROTTLE_MAX		"crm-limit-max"
 
 /*---- Common tags/attrs */
 #  define XML_DIFF_MARKER		"__crm_diff_marker__"
 #  define XML_ATTR_TAGNAME		F_XML_TAGNAME
 #  define XML_TAG_CIB			"cib"
 #  define XML_TAG_FAILED		"failed"
 
 #  define XML_ATTR_CRM_VERSION		"crm_feature_set"
 #  define XML_ATTR_DIGEST		"digest"
 #  define XML_ATTR_VALIDATION		"validate-with"
 
 #  define XML_ATTR_QUORUM_PANIC		"no-quorum-panic"
 #  define XML_ATTR_HAVE_QUORUM		"have-quorum"
 #  define XML_ATTR_HAVE_WATCHDOG	"have-watchdog"
 #  define XML_ATTR_EXPECTED_VOTES	"expected-quorum-votes"
 #  define XML_ATTR_GENERATION		"epoch"
 #  define XML_ATTR_GENERATION_ADMIN	"admin_epoch"
 #  define XML_ATTR_NUMUPDATES		"num_updates"
 #  define XML_ATTR_TIMEOUT		"timeout"
 #  define XML_ATTR_ORIGIN		"crm-debug-origin"
 #  define XML_ATTR_TSTAMP		"crm-timestamp"
 #  define XML_CIB_ATTR_WRITTEN		"cib-last-written"
 #  define XML_ATTR_VERSION		"version"
 #  define XML_ATTR_DESC			"description"
 #  define XML_ATTR_ID			"id"
 #  define XML_ATTR_IDREF			"id-ref"
 #  define XML_ATTR_ID_LONG		"long-id"
 #  define XML_ATTR_TYPE			"type"
 #  define XML_ATTR_FILTER_TYPE		"type-filter"
 #  define XML_ATTR_FILTER_ID		"id-filter"
 #  define XML_ATTR_FILTER_PRIORITY	"priority-filter"
 #  define XML_ATTR_VERBOSE		"verbose"
 #  define XML_ATTR_OP			"op"
 #  define XML_ATTR_DC			"is_dc"
 #  define XML_ATTR_DC_UUID		"dc-uuid"
 #  define XML_ATTR_UPDATE_ORIG		"update-origin"
 #  define XML_ATTR_UPDATE_CLIENT	"update-client"
 #  define XML_ATTR_UPDATE_USER		"update-user"
 
 #  define XML_BOOLEAN_TRUE		"true"
 #  define XML_BOOLEAN_FALSE		"false"
 #  define XML_BOOLEAN_YES		XML_BOOLEAN_TRUE
 #  define XML_BOOLEAN_NO		XML_BOOLEAN_FALSE
 
 #  define XML_TAG_OPTIONS		"options"
 
 /*---- top level tags/attrs */
 #  define XML_MSG_TAG			"crm_message"
 #  define XML_MSG_TAG_DATA		"msg_data"
 #  define XML_ATTR_REQUEST		"request"
 #  define XML_ATTR_RESPONSE		"response"
 
 #  define XML_ATTR_UNAME		"uname"
 #  define XML_ATTR_UUID			"id"
 #  define XML_ATTR_REFERENCE		"reference"
 
 #  define XML_FAIL_TAG_RESOURCE		"failed_resource"
 
 #  define XML_FAILRES_ATTR_RESID	"resource_id"
 #  define XML_FAILRES_ATTR_REASON	"reason"
 #  define XML_FAILRES_ATTR_RESSTATUS	"resource_status"
 
 #  define XML_CRM_TAG_PING		"ping_response"
 #  define XML_PING_ATTR_STATUS		"result"
 #  define XML_PING_ATTR_SYSFROM		"crm_subsystem"
 
 #  define XML_TAG_FRAGMENT		"cib_fragment"
 #  define XML_ATTR_RESULT		"result"
 #  define XML_ATTR_SECTION		"section"
 
 #  define XML_FAIL_TAG_CIB		"failed_update"
 
 #  define XML_FAILCIB_ATTR_ID		"id"
 #  define XML_FAILCIB_ATTR_OBJTYPE	"object_type"
 #  define XML_FAILCIB_ATTR_OP		"operation"
 #  define XML_FAILCIB_ATTR_REASON	"reason"
 
 /*---- CIB specific tags/attrs */
 #  define XML_CIB_TAG_SECTION_ALL	"all"
 #  define XML_CIB_TAG_CONFIGURATION	"configuration"
 #  define XML_CIB_TAG_STATUS       	"status"
 #  define XML_CIB_TAG_RESOURCES		"resources"
 #  define XML_CIB_TAG_NODES         	"nodes"
 #  define XML_CIB_TAG_DOMAINS         	"domains"
 #  define XML_CIB_TAG_CONSTRAINTS   	"constraints"
 #  define XML_CIB_TAG_CRMCONFIG   	"crm_config"
 #  define XML_CIB_TAG_OPCONFIG		"op_defaults"
 #  define XML_CIB_TAG_RSCCONFIG   	"rsc_defaults"
 #  define XML_CIB_TAG_ACLS   		"acls"
 
 #  define XML_CIB_TAG_STATE         	"node_state"
 #  define XML_CIB_TAG_NODE          	"node"
 #  define XML_CIB_TAG_DOMAIN          	"domain"
 #  define XML_CIB_TAG_CONSTRAINT    	"constraint"
 #  define XML_CIB_TAG_NVPAIR        	"nvpair"
 
 #  define XML_CIB_TAG_PROPSET	   	"cluster_property_set"
 #  define XML_TAG_ATTR_SETS	   	"instance_attributes"
 #  define XML_TAG_META_SETS	   	"meta_attributes"
 #  define XML_TAG_ATTRS			"attributes"
 #  define XML_TAG_PARAMS		"parameters"
 #  define XML_TAG_PARAM			"param"
 #  define XML_TAG_UTILIZATION		"utilization"
 
 #  define XML_TAG_RESOURCE_REF		"resource_ref"
 #  define XML_CIB_TAG_RESOURCE	  	"primitive"
 #  define XML_CIB_TAG_GROUP	  	"group"
 #  define XML_CIB_TAG_INCARNATION	"clone"
 #  define XML_CIB_TAG_MASTER		"master"
 
 #  define XML_CIB_TAG_RSC_TEMPLATE	"template"
 
 #  define XML_RSC_ATTR_ISOLATION_INSTANCE	"isolation-instance"
 #  define XML_RSC_ATTR_ISOLATION_WRAPPER    "isolation-wrapper"
 #  define XML_RSC_ATTR_ISOLATION_HOST   "isolation-host"
 #  define XML_RSC_ATTR_ISOLATION    	"isolation"
 #  define XML_RSC_ATTR_RESTART	  	"restart-type"
 #  define XML_RSC_ATTR_ORDERED		"ordered"
 #  define XML_RSC_ATTR_INTERLEAVE	"interleave"
 #  define XML_RSC_ATTR_INCARNATION	"clone"
 #  define XML_RSC_ATTR_INCARNATION_MAX	"clone-max"
 #  define XML_RSC_ATTR_INCARNATION_NODEMAX	"clone-node-max"
 #  define XML_RSC_ATTR_MASTER_MAX	"master-max"
 #  define XML_RSC_ATTR_MASTER_NODEMAX	"master-node-max"
 #  define XML_RSC_ATTR_STATE		"clone-state"
 #  define XML_RSC_ATTR_MANAGED		"is-managed"
 #  define XML_RSC_ATTR_TARGET_ROLE	"target-role"
 #  define XML_RSC_ATTR_UNIQUE		"globally-unique"
 #  define XML_RSC_ATTR_NOTIFY		"notify"
 #  define XML_RSC_ATTR_STICKINESS	"resource-stickiness"
 #  define XML_RSC_ATTR_FAIL_STICKINESS	"migration-threshold"
 #  define XML_RSC_ATTR_FAIL_TIMEOUT	"failure-timeout"
 #  define XML_RSC_ATTR_MULTIPLE		"multiple-active"
 #  define XML_RSC_ATTR_PRIORITY		"priority"
 #  define XML_RSC_ATTR_REQUIRES		"requires"
 #  define XML_RSC_ATTR_PROVIDES		"provides"
 #  define XML_RSC_ATTR_CONTAINER	"container"
 #  define XML_RSC_ATTR_INTERNAL_RSC	"internal_rsc"
 #  define XML_RSC_ATTR_MAINTENANCE	"maintenance"
 #  define XML_RSC_ATTR_REMOTE_NODE  	"remote-node"
 
 #  define XML_REMOTE_ATTR_RECONNECT_INTERVAL "reconnect_interval"
 
 #  define XML_OP_ATTR_ON_FAIL		"on-fail"
 #  define XML_OP_ATTR_START_DELAY	"start-delay"
 #  define XML_OP_ATTR_ALLOW_MIGRATE	"allow-migrate"
 #  define XML_OP_ATTR_DEPENDENT "dependent-on"
 #  define XML_OP_ATTR_ORIGIN		"interval-origin"
 #  define XML_OP_ATTR_PENDING		"record-pending"
 
 #  define XML_CIB_TAG_LRM		"lrm"
 #  define XML_LRM_TAG_RESOURCES     	"lrm_resources"
 #  define XML_LRM_TAG_RESOURCE     	"lrm_resource"
 #  define XML_LRM_TAG_AGENTS	     	"lrm_agents"
 #  define XML_LRM_TAG_AGENT		"lrm_agent"
 #  define XML_LRM_TAG_RSC_OP		"lrm_rsc_op"
 #  define XML_AGENT_ATTR_CLASS		"class"
 #  define XML_AGENT_ATTR_PROVIDER	"provider"
 #  define XML_LRM_TAG_ATTRIBUTES	"attributes"
 
 #  define XML_CIB_ATTR_REPLACE       	"replace"
 #  define XML_CIB_ATTR_SOURCE       	"source"
 
 #  define XML_CIB_ATTR_HEALTH       	"health"
 #  define XML_CIB_ATTR_WEIGHT       	"weight"
 #  define XML_CIB_ATTR_PRIORITY     	"priority"
 #  define XML_CIB_ATTR_CLEAR        	"clear_on"
 #  define XML_CIB_ATTR_SOURCE       	"source"
 
 #  define XML_NODE_JOIN_STATE    	"join"
 #  define XML_NODE_EXPECTED     	"expected"
 #  define XML_NODE_IN_CLUSTER        	"in_ccm"
 #  define XML_NODE_IS_PEER    	"crmd"
 #  define XML_NODE_IS_REMOTE    	"remote_node"
+#  define XML_NODE_IS_REMOTE_FENCED  	"remote_node_fenced"
 
 #  define XML_CIB_ATTR_SHUTDOWN       	"shutdown"
 #  define XML_CIB_ATTR_STONITH	    	"stonith"
 
 #  define XML_LRM_ATTR_INTERVAL		"interval"
 #  define XML_LRM_ATTR_TASK		"operation"
 #  define XML_LRM_ATTR_TASK_KEY		"operation_key"
 #  define XML_LRM_ATTR_TARGET		"on_node"
 /*! used for remote nodes.
  *  For remote nodes the action is routed to the 'on_node'
  *  node location, and then from there if 'exec_on' is set
  *  the host will execute the action on the remote node
  *  it controls. */
 #  define XML_LRM_ATTR_ROUTER_NODE  "router_node"
 #  define XML_LRM_ATTR_TARGET_UUID	"on_node_uuid"
 #  define XML_LRM_ATTR_RSCID		"rsc-id"
 #  define XML_LRM_ATTR_OPSTATUS		"op-status"
 #  define XML_LRM_ATTR_RC		"rc-code"
 #  define XML_LRM_ATTR_CALLID		"call-id"
 #  define XML_LRM_ATTR_OP_DIGEST	"op-digest"
 #  define XML_LRM_ATTR_OP_RESTART	"op-force-restart"
 #  define XML_LRM_ATTR_RESTART_DIGEST	"op-restart-digest"
 #  define XML_LRM_ATTR_EXIT_REASON		"exit-reason"
 
 #  define XML_RSC_OP_LAST_CHANGE        "last-rc-change"
 #  define XML_RSC_OP_LAST_RUN           "last-run"
 #  define XML_RSC_OP_T_EXEC             "exec-time"
 #  define XML_RSC_OP_T_QUEUE            "queue-time"
 
 #  define XML_LRM_ATTR_MIGRATE_SOURCE	"migrate_source"
 #  define XML_LRM_ATTR_MIGRATE_TARGET	"migrate_target"
 
 #  define XML_TAG_GRAPH			"transition_graph"
 #  define XML_GRAPH_TAG_RSC_OP		"rsc_op"
 #  define XML_GRAPH_TAG_PSEUDO_EVENT	"pseudo_event"
 #  define XML_GRAPH_TAG_CRM_EVENT	"crm_event"
 
 #  define XML_TAG_RULE			"rule"
 #  define XML_RULE_ATTR_SCORE		"score"
 #  define XML_RULE_ATTR_SCORE_ATTRIBUTE	"score-attribute"
 #  define XML_RULE_ATTR_SCORE_MANGLED	"score-attribute-mangled"
 #  define XML_RULE_ATTR_ROLE		"role"
 #  define XML_RULE_ATTR_RESULT		"result"
 #  define XML_RULE_ATTR_BOOLEAN_OP	"boolean-op"
 
 #  define XML_TAG_EXPRESSION		"expression"
 #  define XML_EXPR_ATTR_ATTRIBUTE	"attribute"
 #  define XML_EXPR_ATTR_OPERATION	"operation"
 #  define XML_EXPR_ATTR_VALUE		"value"
 #  define XML_EXPR_ATTR_TYPE		"type"
 
 #  define XML_CONS_TAG_RSC_DEPEND	"rsc_colocation"
 #  define XML_CONS_TAG_RSC_ORDER	"rsc_order"
 #  define XML_CONS_TAG_RSC_LOCATION	"rsc_location"
 #  define XML_CONS_TAG_RSC_TICKET	"rsc_ticket"
 #  define XML_CONS_TAG_RSC_SET		"resource_set"
 #  define XML_CONS_ATTR_SYMMETRICAL	"symmetrical"
 
 #  define XML_LOCATION_ATTR_DISCOVERY	"resource-discovery"
 
 #  define XML_COLOC_ATTR_SOURCE		"rsc"
 #  define XML_COLOC_ATTR_SOURCE_ROLE	"rsc-role"
 #  define XML_COLOC_ATTR_TARGET		"with-rsc"
 #  define XML_COLOC_ATTR_TARGET_ROLE	"with-rsc-role"
 #  define XML_COLOC_ATTR_NODE_ATTR	"node-attribute"
 #  define XML_COLOC_ATTR_SOURCE_INSTANCE	"rsc-instance"
 #  define XML_COLOC_ATTR_TARGET_INSTANCE	"with-rsc-instance"
 
 #  define XML_ORDER_ATTR_FIRST		"first"
 #  define XML_ORDER_ATTR_THEN		"then"
 #  define XML_ORDER_ATTR_FIRST_ACTION	"first-action"
 #  define XML_ORDER_ATTR_THEN_ACTION	"then-action"
 #  define XML_ORDER_ATTR_FIRST_INSTANCE	"first-instance"
 #  define XML_ORDER_ATTR_THEN_INSTANCE	"then-instance"
 #  define XML_ORDER_ATTR_KIND		"kind"
 
 #  define XML_TICKET_ATTR_TICKET	"ticket"
 #  define XML_TICKET_ATTR_LOSS_POLICY	"loss-policy"
 
 #  define XML_NVPAIR_ATTR_NAME        	"name"
 #  define XML_NVPAIR_ATTR_VALUE        	"value"
 
 #  define XML_NODE_ATTR_STATE		"state"
 #  define XML_NODE_ATTR_RSC_DISCOVERY   "resource-discovery-enabled"
 
 #  define XML_CONFIG_ATTR_DC_DEADTIME	"dc-deadtime"
 #  define XML_CONFIG_ATTR_ELECTION_FAIL	"election-timeout"
 #  define XML_CONFIG_ATTR_FORCE_QUIT	"shutdown-escalation"
 #  define XML_CONFIG_ATTR_RECHECK	"cluster-recheck-interval"
 
 #  define XML_CIB_TAG_GENERATION_TUPPLE	"generation_tuple"
 
 #  define XML_ATTR_TRANSITION_MAGIC	"transition-magic"
 #  define XML_ATTR_TRANSITION_KEY	"transition-key"
 
 #  define XML_ATTR_TE_NOWAIT		"op_no_wait"
 #  define XML_ATTR_TE_TARGET_RC		"op_target_rc"
 #  define XML_ATTR_TE_ALLOWFAIL		"op_allow_fail"
 #  define XML_ATTR_LRM_PROBE		"lrm-is-probe"
 #  define XML_TAG_TRANSIENT_NODEATTRS	"transient_attributes"
 
 #  define XML_TAG_DIFF_ADDED		"diff-added"
 #  define XML_TAG_DIFF_REMOVED		"diff-removed"
 
 #  define XML_ACL_TAG_USER		"acl_target"
 #  define XML_ACL_TAG_USERv1		"acl_user"
 #  define XML_ACL_TAG_GROUP		"acl_group"
 #  define XML_ACL_TAG_ROLE		"acl_role"
 #  define XML_ACL_TAG_PERMISSION	"acl_permission"
 #  define XML_ACL_TAG_ROLE_REF 		"role"
 #  define XML_ACL_TAG_ROLE_REFv1	"role_ref"
 #  define XML_ACL_ATTR_KIND		"kind"
 #  define XML_ACL_TAG_READ		"read"
 #  define XML_ACL_TAG_WRITE		"write"
 #  define XML_ACL_TAG_DENY		"deny"
 #  define XML_ACL_ATTR_REF		"reference"
 #  define XML_ACL_ATTR_REFv1		"ref"
 #  define XML_ACL_ATTR_TAG		"object-type"
 #  define XML_ACL_ATTR_TAGv1		"tag"
 #  define XML_ACL_ATTR_XPATH		"xpath"
 #  define XML_ACL_ATTR_ATTRIBUTE	"attribute"
 
 #  define XML_CIB_TAG_TICKETS		"tickets"
 #  define XML_CIB_TAG_TICKET_STATE	"ticket_state"
 
 #  define XML_CIB_TAG_TAGS   		"tags"
 #  define XML_CIB_TAG_TAG   		"tag"
 #  define XML_CIB_TAG_OBJ_REF 		"obj_ref"
 
 #  define XML_TAG_FENCING_TOPOLOGY      "fencing-topology"
 #  define XML_TAG_FENCING_LEVEL         "fencing-level"
 #  define XML_ATTR_STONITH_INDEX        "index"
 #  define XML_ATTR_STONITH_TARGET       "target"
 #  define XML_ATTR_STONITH_DEVICES      "devices"
 
 #  define XML_TAG_DIFF                  "diff"
 #  define XML_DIFF_VERSION              "version"
 #  define XML_DIFF_VSOURCE              "source"
 #  define XML_DIFF_VTARGET              "target"
 #  define XML_DIFF_CHANGE               "change"
 #  define XML_DIFF_LIST                 "change-list"
 #  define XML_DIFF_ATTR                 "change-attr"
 #  define XML_DIFF_RESULT               "change-result"
 #  define XML_DIFF_OP                   "operation"
 #  define XML_DIFF_PATH                 "path"
 #  define XML_DIFF_POSITION             "position"
 
 #  include <crm/common/xml.h>
 
 #  define ID(x) crm_element_value(x, XML_ATTR_ID)
 #  define INSTANCE(x) crm_element_value(x, XML_CIB_ATTR_INSTANCE)
 #  define TSTAMP(x) crm_element_value(x, XML_ATTR_TSTAMP)
 #  define TYPE(x) crm_element_name(x)
 #  define NAME(x) crm_element_value(x, XML_NVPAIR_ATTR_NAME)
 #  define VALUE(x) crm_element_value(x, XML_NVPAIR_ATTR_VALUE)
 
 #endif
diff --git a/include/crm/pengine/status.h b/include/crm/pengine/status.h
index c3ca21d190..7572930946 100644
--- a/include/crm/pengine/status.h
+++ b/include/crm/pengine/status.h
@@ -1,403 +1,404 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public
  * License as published by the Free Software Foundation; either
  * version 2 of the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * General Public License for more details.
  *
  * You should have received a copy of the GNU General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 #ifndef PENGINE_STATUS__H
 #  define PENGINE_STATUS__H
 
 #  include <glib.h>
 #  include <crm/common/iso8601.h>
 #  include <crm/pengine/common.h>
 
 typedef struct node_s pe_node_t;
 typedef struct node_s node_t;
 typedef struct pe_action_s action_t;
 typedef struct pe_action_s pe_action_t;
 typedef struct resource_s resource_t;
 typedef struct ticket_s ticket_t;
 
 typedef enum no_quorum_policy_e {
     no_quorum_freeze,
     no_quorum_stop,
     no_quorum_ignore,
     no_quorum_suicide
 } no_quorum_policy_t;
 
 enum node_type {
     node_ping,
     node_member,
     node_remote
 };
 
 enum pe_restart {
     pe_restart_restart,
     pe_restart_ignore
 };
 
 enum pe_find {
     pe_find_renamed = 0x001,
     pe_find_clone = 0x004,
     pe_find_current = 0x008,
     pe_find_inactive = 0x010,
 };
 
 #  define pe_flag_have_quorum		0x00000001ULL
 #  define pe_flag_symmetric_cluster	0x00000002ULL
 #  define pe_flag_is_managed_default	0x00000004ULL
 #  define pe_flag_maintenance_mode	0x00000008ULL
 
 #  define pe_flag_stonith_enabled	0x00000010ULL
 #  define pe_flag_have_stonith_resource	0x00000020ULL
 #  define pe_flag_enable_unfencing	0x00000040ULL
 
 #  define pe_flag_stop_rsc_orphans	0x00000100ULL
 #  define pe_flag_stop_action_orphans	0x00000200ULL
 #  define pe_flag_stop_everything	0x00000400ULL
 
 #  define pe_flag_start_failure_fatal	0x00001000ULL
 #  define pe_flag_remove_after_stop	0x00002000ULL
 
 #  define pe_flag_startup_probes	0x00010000ULL
 #  define pe_flag_have_status		0x00020000ULL
 #  define pe_flag_have_remote_nodes	0x00040000ULL
 
 #  define pe_flag_quick_location  	0x00100000ULL
 
 typedef struct pe_working_set_s {
     xmlNode *input;
     crm_time_t *now;
 
     /* options extracted from the input */
     char *dc_uuid;
     node_t *dc_node;
     const char *stonith_action;
     const char *placement_strategy;
 
     unsigned long long flags;
 
     int stonith_timeout;
     int default_resource_stickiness;
     no_quorum_policy_t no_quorum_policy;
 
     GHashTable *config_hash;
     GHashTable *tickets;
     GHashTable *singletons; /* Actions for which there can be only one - ie. fence nodeX */
 
     GListPtr nodes;
     GListPtr resources;
     GListPtr placement_constraints;
     GListPtr ordering_constraints;
     GListPtr colocation_constraints;
     GListPtr ticket_constraints;
 
     GListPtr actions;
     xmlNode *failed;
     xmlNode *op_defaults;
     xmlNode *rsc_defaults;
 
     /* stats */
     int num_synapse;
     int max_valid_nodes;
     int order_id;
     int action_id;
 
     /* final output */
     xmlNode *graph;
 
     GHashTable *template_rsc_sets;
     const char *localhost;
     GHashTable *tags;
 
 } pe_working_set_t;
 
 struct node_shared_s {
     const char *id;
     const char *uname;
 /* Make all these flags into a bitfield one day */
     gboolean online;
     gboolean standby;
     gboolean standby_onfail;
     gboolean pending;
     gboolean unclean;
     gboolean unseen;
     gboolean shutdown;
     gboolean expected_up;
     gboolean is_dc;
     gboolean rsc_discovery_enabled;
 
     gboolean remote_requires_reset;
+    gboolean remote_was_fenced;
 
     int num_resources;
     GListPtr running_rsc;       /* resource_t* */
     GListPtr allocated_rsc;     /* resource_t* */
 
     resource_t *remote_rsc;
 
     GHashTable *attrs;          /* char* => char* */
     enum node_type type;
 
     GHashTable *utilization;
 
     /*! cache of calculated rsc digests for this node. */
     GHashTable *digest_cache;
 
     gboolean maintenance;
 };
 
 struct node_s {
     int weight;
     gboolean fixed;
     int rsc_discover_mode;
     int count;
     struct node_shared_s *details;
 };
 
 #  include <crm/pengine/complex.h>
 
 #  define pe_rsc_orphan		0x00000001ULL
 #  define pe_rsc_managed	0x00000002ULL
 #  define pe_rsc_block          0x00000004ULL   /* Further operations are prohibited due to failure policy */
 #  define pe_rsc_orphan_container_filler	0x00000008ULL
 
 #  define pe_rsc_notify		0x00000010ULL
 #  define pe_rsc_unique		0x00000020ULL
 #  define pe_rsc_fence_device   0x00000040ULL
 
 #  define pe_rsc_provisional	0x00000100ULL
 #  define pe_rsc_allocating	0x00000200ULL
 #  define pe_rsc_merging	0x00000400ULL
 #  define pe_rsc_munging	0x00000800ULL
 
 #  define pe_rsc_try_reload     0x00001000ULL
 #  define pe_rsc_reload         0x00002000ULL
 
 #  define pe_rsc_failed		0x00010000ULL
 #  define pe_rsc_shutdown	0x00020000ULL
 #  define pe_rsc_runnable	0x00040000ULL
 #  define pe_rsc_start_pending	0x00080000ULL
 
 #  define pe_rsc_starting       0x00100000ULL
 #  define pe_rsc_stopping       0x00200000ULL
 #  define pe_rsc_migrating      0x00400000ULL
 #  define pe_rsc_allow_migrate  0x00800000ULL
 
 #  define pe_rsc_failure_ignored 0x01000000ULL
 #  define pe_rsc_unexpectedly_running 0x02000000ULL
 #  define pe_rsc_maintenance	 0x04000000ULL
 
 #  define pe_rsc_needs_quorum	 0x10000000ULL
 #  define pe_rsc_needs_fencing	 0x20000000ULL
 #  define pe_rsc_needs_unfencing 0x40000000ULL
 #  define pe_rsc_have_unfencing  0x80000000ULL
 
 enum pe_graph_flags {
     pe_graph_none = 0x00000,
     pe_graph_updated_first = 0x00001,
     pe_graph_updated_then = 0x00002,
     pe_graph_disable = 0x00004,
 };
 
 /* *INDENT-OFF* */
 enum pe_action_flags {
     pe_action_pseudo = 0x00001,
     pe_action_runnable = 0x00002,
     pe_action_optional = 0x00004,
     pe_action_print_always = 0x00008,
 
     pe_action_have_node_attrs = 0x00010,
     pe_action_failure_is_fatal = 0x00020,
     pe_action_implied_by_stonith = 0x00040,
     pe_action_migrate_runnable =   0x00080,
 
     pe_action_dumped = 0x00100,
     pe_action_processed = 0x00200,
     pe_action_clear = 0x00400,
     pe_action_dangle = 0x00800,
 
     pe_action_requires_any = 0x01000, /* This action requires one or mre of its dependancies to be runnable
                                        * We use this to clear the runnable flag before checking dependancies
                                        */
     pe_action_reschedule = 0x02000,
 };
 /* *INDENT-ON* */
 
 struct resource_s {
     char *id;
     char *clone_name;
     xmlNode *xml;
     xmlNode *orig_xml;
     xmlNode *ops_xml;
 
     resource_t *parent;
     void *variant_opaque;
     enum pe_obj_types variant;
     resource_object_functions_t *fns;
     resource_alloc_functions_t *cmds;
 
     enum rsc_recovery_type recovery_type;
     enum pe_restart restart_type;
 
     int priority;
     int stickiness;
     int sort_index;
     int failure_timeout;
     int remote_reconnect_interval;
     int effective_priority;
     int migration_threshold;
 
     gboolean is_remote_node;
     gboolean exclusive_discover;
 
     unsigned long long flags;
 
     GListPtr rsc_cons_lhs;      /* rsc_colocation_t* */
     GListPtr rsc_cons;          /* rsc_colocation_t* */
     GListPtr rsc_location;      /* rsc_to_node_t*    */
     GListPtr actions;           /* action_t*         */
     GListPtr rsc_tickets;       /* rsc_ticket*       */
 
     node_t *allocated_to;
     GListPtr running_on;        /* node_t*   */
     GHashTable *known_on;       /* node_t*   */
     GHashTable *allowed_nodes;  /* node_t*   */
 
     enum rsc_role_e role;
     enum rsc_role_e next_role;
 
     GHashTable *meta;
     GHashTable *parameters;
     GHashTable *utilization;
 
     GListPtr children;          /* resource_t*   */
     GListPtr dangling_migrations;       /* node_t*       */
 
     node_t *partial_migration_target;
     node_t *partial_migration_source;
 
     resource_t *container;
     GListPtr fillers;
 
     char *pending_task;
 
     const char *isolation_wrapper;
 };
 
 struct pe_action_s {
     int id;
     int priority;
 
     resource_t *rsc;
     node_t *node;
     xmlNode *op_entry;
 
     char *task;
     char *uuid;
     char *cancel_task;
 
     enum pe_action_flags flags;
     enum rsc_start_requirement needs;
     enum action_fail_response on_fail;
     enum rsc_role_e fail_role;
 
     action_t *pre_notify;
     action_t *pre_notified;
     action_t *post_notify;
     action_t *post_notified;
 
     int seen_count;
 
     GHashTable *meta;
     GHashTable *extra;
 
     GListPtr actions_before;    /* action_warpper_t* */
     GListPtr actions_after;     /* action_warpper_t* */
 };
 
 struct ticket_s {
     char *id;
     gboolean granted;
     time_t last_granted;
     gboolean standby;
     GHashTable *state;
 };
 
 typedef struct tag_s {
     char *id;
     GListPtr refs;
 } tag_t;
 
 enum pe_link_state {
     pe_link_not_dumped,
     pe_link_dumped,
     pe_link_dup,
 };
 
 /* *INDENT-OFF* */
 enum pe_ordering {
     pe_order_none                  = 0x0,       /* deleted */
     pe_order_optional              = 0x1,       /* pure ordering, nothing implied */
     pe_order_apply_first_non_migratable = 0x2,  /* Only apply this constraint's ordering if first is not migratable. */
 
     pe_order_implies_first         = 0x10,      /* If 'then' is required, ensure 'first' is too */
     pe_order_implies_then          = 0x20,      /* If 'first' is required, ensure 'then' is too */
     pe_order_implies_first_master  = 0x40,      /* Imply 'first' is required when 'then' is required and then's rsc holds Master role. */
 
     /* first requires then to be both runnable and migrate runnable. */
     pe_order_implies_first_migratable  = 0x80,
 
     pe_order_runnable_left         = 0x100,     /* 'then' requires 'first' to be runnable */
 
     pe_order_pseudo_left           = 0x200,     /* 'then' can only be pseudo if 'first' is runnable */
     pe_order_implies_then_on_node  = 0x400,     /* If 'first' is required on 'nodeX',
                                                  * ensure instances of 'then' on 'nodeX' are too.
                                                  * Only really useful if 'then' is a clone and 'first' is not
                                                  */
 
     pe_order_restart               = 0x1000,    /* 'then' is runnable if 'first' is optional or runnable */
     pe_order_stonith_stop          = 0x2000,    /* only applies if the action is non-pseudo */
     pe_order_serialize_only        = 0x4000,    /* serialize */
 
     pe_order_implies_first_printed = 0x10000,   /* Like ..implies_first but only ensures 'first' is printed, not manditory */
     pe_order_implies_then_printed  = 0x20000,   /* Like ..implies_then but only ensures 'then' is printed, not manditory */
 
     pe_order_asymmetrical          = 0x100000,  /* Indicates asymmetrical one way ordering constraint. */
     pe_order_load                  = 0x200000,  /* Only relevant if... */
     pe_order_one_or_more           = 0x400000,  /* 'then' is only runnable if one or more of it's dependancies are too */
     pe_order_anti_colocation       = 0x800000,
 
     pe_order_preserve              = 0x1000000, /* Hack for breaking user ordering constraints with container resources */
     pe_order_trace                 = 0x4000000, /* test marker */
 };
 /* *INDENT-ON* */
 
 typedef struct action_wrapper_s action_wrapper_t;
 struct action_wrapper_s {
     enum pe_ordering type;
     enum pe_link_state state;
     action_t *action;
 };
 
 const char *rsc_printable_id(resource_t *rsc);
 gboolean cluster_status(pe_working_set_t * data_set);
 void set_working_set_defaults(pe_working_set_t * data_set);
 void cleanup_calculations(pe_working_set_t * data_set);
 resource_t *pe_find_resource(GListPtr rsc_list, const char *id_rh);
 node_t *pe_find_node(GListPtr node_list, const char *uname);
 node_t *pe_find_node_id(GListPtr node_list, const char *id);
 node_t *pe_find_node_any(GListPtr node_list, const char *id, const char *uname);
 GListPtr find_operations(const char *rsc, const char *node, gboolean active_filter,
                          pe_working_set_t * data_set);
 #endif
diff --git a/lib/pengine/unpack.c b/lib/pengine/unpack.c
index 6646c8709b..4406578b22 100644
--- a/lib/pengine/unpack.c
+++ b/lib/pengine/unpack.c
@@ -1,3348 +1,3354 @@
 /*
  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
  * License as published by the Free Software Foundation; either
  * version 2.1 of the License, or (at your option) any later version.
  *
  * This library is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  */
 #include <crm_internal.h>
 
 #include <glib.h>
 
 #include <crm/crm.h>
 #include <crm/services.h>
 #include <crm/msg_xml.h>
 #include <crm/common/xml.h>
 
 #include <crm/common/util.h>
 #include <crm/pengine/rules.h>
 #include <crm/pengine/internal.h>
 #include <unpack.h>
 
 CRM_TRACE_INIT_DATA(pe_status);
 
 #define set_config_flag(data_set, option, flag) do {			\
 	const char *tmp = pe_pref(data_set->config_hash, option);	\
 	if(tmp) {							\
 	    if(crm_is_true(tmp)) {					\
 		set_bit(data_set->flags, flag);			\
 	    } else {							\
 		clear_bit(data_set->flags, flag);		\
 	    }								\
 	}								\
     } while(0)
 
 gboolean unpack_rsc_op(resource_t * rsc, node_t * node, xmlNode * xml_op,
                        enum action_fail_response *failed, pe_working_set_t * data_set);
 static gboolean determine_remote_online_status(node_t * this_node);
 
 static gboolean
 is_dangling_container_remote_node(node_t *node)
 {
     /* we are looking for a remote-node that was supposed to be mapped to a
      * container resource, but all traces of that container have disappeared 
      * from both the config and the status section. */
     if (is_remote_node(node) &&
         node->details->remote_rsc &&
         node->details->remote_rsc->container == NULL &&
         is_set(node->details->remote_rsc->flags, pe_rsc_orphan_container_filler)) {
         return TRUE;
     }
 
     return FALSE;
 }
 
 void
 pe_fence_node(pe_working_set_t * data_set, node_t * node, const char *reason)
 {
     CRM_CHECK(node, return);
 
     /* fence remote nodes living in a container by marking the container as failed. */
     if (is_container_remote_node(node)) {
         resource_t *rsc = node->details->remote_rsc->container;
         if (is_set(rsc->flags, pe_rsc_failed) == FALSE) {
             crm_warn("Remote node %s will be fenced by recovering container resource %s",
                 node->details->uname, rsc->id, reason);
             set_bit(rsc->flags, pe_rsc_failed);
         }
     } else if (is_dangling_container_remote_node(node)) {
         crm_info("Fencing remote node %s has already occurred, container no longer exists. cleaning up dangling connection resource:  %s",
                   node->details->uname, reason);
         set_bit(node->details->remote_rsc->flags, pe_rsc_failed);
 
     } else if (is_baremetal_remote_node(node)) {
         if(pe_can_fence(data_set, node)) {
             crm_warn("Node %s will be fenced %s", node->details->uname, reason);
         } else {
             crm_warn("Node %s is unclean %s", node->details->uname, reason);
         }
         node->details->unclean = TRUE;
         node->details->remote_requires_reset = TRUE;
 
     } else if (node->details->unclean == FALSE) {
         if(pe_can_fence(data_set, node)) {
             crm_warn("Node %s will be fenced %s", node->details->uname, reason);
         } else {
             crm_warn("Node %s is unclean %s", node->details->uname, reason);
         }
         node->details->unclean = TRUE;
     } else {
         crm_trace("Huh? %s %s", node->details->uname, reason);
     }
 }
 
 gboolean
 unpack_config(xmlNode * config, pe_working_set_t * data_set)
 {
     const char *value = NULL;
     GHashTable *config_hash =
         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, g_hash_destroy_str);
 
     xmlXPathObjectPtr xpathObj = NULL;
 
     if(is_not_set(data_set->flags, pe_flag_enable_unfencing)) {
         xpathObj = xpath_search(data_set->input, "//nvpair[@name='provides' and @value='unfencing']");
         if(xpathObj && numXpathResults(xpathObj) > 0) {
             set_bit(data_set->flags, pe_flag_enable_unfencing);
         }
         freeXpathObject(xpathObj);
     }
 
     if(is_not_set(data_set->flags, pe_flag_enable_unfencing)) {
         xpathObj = xpath_search(data_set->input, "//nvpair[@name='requires' and @value='unfencing']");
         if(xpathObj && numXpathResults(xpathObj) > 0) {
             set_bit(data_set->flags, pe_flag_enable_unfencing);
         }
         freeXpathObject(xpathObj);
     }
 
 
 #ifdef REDHAT_COMPAT_6
     if(is_not_set(data_set->flags, pe_flag_enable_unfencing)) {
         xpathObj = xpath_search(data_set->input, "//primitive[@type='fence_scsi']");
         if(xpathObj && numXpathResults(xpathObj) > 0) {
             set_bit(data_set->flags, pe_flag_enable_unfencing);
         }
         freeXpathObject(xpathObj);
     }
 #endif
 
     data_set->config_hash = config_hash;
 
     unpack_instance_attributes(data_set->input, config, XML_CIB_TAG_PROPSET, NULL, config_hash,
                                CIB_OPTIONS_FIRST, FALSE, data_set->now);
 
     verify_pe_options(data_set->config_hash);
 
     set_config_flag(data_set, "enable-startup-probes", pe_flag_startup_probes);
     if(is_not_set(data_set->flags, pe_flag_startup_probes)) {
         crm_info("Startup probes: disabled (dangerous)");
     }
 
     value = pe_pref(data_set->config_hash, XML_ATTR_HAVE_WATCHDOG);
     if (value && crm_is_true(value)) {
         crm_notice("Relying on watchdog integration for fencing");
         set_bit(data_set->flags, pe_flag_have_stonith_resource);
     }
 
     value = pe_pref(data_set->config_hash, "stonith-timeout");
     data_set->stonith_timeout = crm_get_msec(value);
     crm_debug("STONITH timeout: %d", data_set->stonith_timeout);
 
     set_config_flag(data_set, "stonith-enabled", pe_flag_stonith_enabled);
     crm_debug("STONITH of failed nodes is %s",
               is_set(data_set->flags, pe_flag_stonith_enabled) ? "enabled" : "disabled");
 
     data_set->stonith_action = pe_pref(data_set->config_hash, "stonith-action");
     crm_trace("STONITH will %s nodes", data_set->stonith_action);
 
     set_config_flag(data_set, "stop-all-resources", pe_flag_stop_everything);
     crm_debug("Stop all active resources: %s",
               is_set(data_set->flags, pe_flag_stop_everything) ? "true" : "false");
 
     set_config_flag(data_set, "symmetric-cluster", pe_flag_symmetric_cluster);
     if (is_set(data_set->flags, pe_flag_symmetric_cluster)) {
         crm_debug("Cluster is symmetric" " - resources can run anywhere by default");
     }
 
     value = pe_pref(data_set->config_hash, "default-resource-stickiness");
     data_set->default_resource_stickiness = char2score(value);
     crm_debug("Default stickiness: %d", data_set->default_resource_stickiness);
 
     value = pe_pref(data_set->config_hash, "no-quorum-policy");
 
     if (safe_str_eq(value, "ignore")) {
         data_set->no_quorum_policy = no_quorum_ignore;
 
     } else if (safe_str_eq(value, "freeze")) {
         data_set->no_quorum_policy = no_quorum_freeze;
 
     } else if (safe_str_eq(value, "suicide")) {
         gboolean do_panic = FALSE;
 
         crm_element_value_int(data_set->input, XML_ATTR_QUORUM_PANIC, &do_panic);
 
         if (is_set(data_set->flags, pe_flag_stonith_enabled) == FALSE) {
             crm_config_err
                 ("Setting no-quorum-policy=suicide makes no sense if stonith-enabled=false");
         }
 
         if (do_panic && is_set(data_set->flags, pe_flag_stonith_enabled)) {
             data_set->no_quorum_policy = no_quorum_suicide;
 
         } else if (is_set(data_set->flags, pe_flag_have_quorum) == FALSE && do_panic == FALSE) {
             crm_notice("Resetting no-quorum-policy to 'stop': The cluster has never had quorum");
             data_set->no_quorum_policy = no_quorum_stop;
         }
 
     } else {
         data_set->no_quorum_policy = no_quorum_stop;
     }
 
     switch (data_set->no_quorum_policy) {
         case no_quorum_freeze:
             crm_debug("On loss of CCM Quorum: Freeze resources");
             break;
         case no_quorum_stop:
             crm_debug("On loss of CCM Quorum: Stop ALL resources");
             break;
         case no_quorum_suicide:
             crm_notice("On loss of CCM Quorum: Fence all remaining nodes");
             break;
         case no_quorum_ignore:
             crm_notice("On loss of CCM Quorum: Ignore");
             break;
     }
 
     set_config_flag(data_set, "stop-orphan-resources", pe_flag_stop_rsc_orphans);
     crm_trace("Orphan resources are %s",
               is_set(data_set->flags, pe_flag_stop_rsc_orphans) ? "stopped" : "ignored");
 
     set_config_flag(data_set, "stop-orphan-actions", pe_flag_stop_action_orphans);
     crm_trace("Orphan resource actions are %s",
               is_set(data_set->flags, pe_flag_stop_action_orphans) ? "stopped" : "ignored");
 
     set_config_flag(data_set, "remove-after-stop", pe_flag_remove_after_stop);
     crm_trace("Stopped resources are removed from the status section: %s",
               is_set(data_set->flags, pe_flag_remove_after_stop) ? "true" : "false");
 
     set_config_flag(data_set, "maintenance-mode", pe_flag_maintenance_mode);
     crm_trace("Maintenance mode: %s",
               is_set(data_set->flags, pe_flag_maintenance_mode) ? "true" : "false");
 
     if (is_set(data_set->flags, pe_flag_maintenance_mode)) {
         clear_bit(data_set->flags, pe_flag_is_managed_default);
     } else {
         set_config_flag(data_set, "is-managed-default", pe_flag_is_managed_default);
     }
     crm_trace("By default resources are %smanaged",
               is_set(data_set->flags, pe_flag_is_managed_default) ? "" : "not ");
 
     set_config_flag(data_set, "start-failure-is-fatal", pe_flag_start_failure_fatal);
     crm_trace("Start failures are %s",
               is_set(data_set->flags,
                      pe_flag_start_failure_fatal) ? "always fatal" : "handled by failcount");
 
     node_score_red = char2score(pe_pref(data_set->config_hash, "node-health-red"));
     node_score_green = char2score(pe_pref(data_set->config_hash, "node-health-green"));
     node_score_yellow = char2score(pe_pref(data_set->config_hash, "node-health-yellow"));
 
     crm_debug("Node scores: 'red' = %s, 'yellow' = %s, 'green' = %s",
              pe_pref(data_set->config_hash, "node-health-red"),
              pe_pref(data_set->config_hash, "node-health-yellow"),
              pe_pref(data_set->config_hash, "node-health-green"));
 
     data_set->placement_strategy = pe_pref(data_set->config_hash, "placement-strategy");
     crm_trace("Placement strategy: %s", data_set->placement_strategy);
 
     return TRUE;
 }
 
 static void
 destroy_digest_cache(gpointer ptr)
 {
     op_digest_cache_t *data = ptr;
 
     free_xml(data->params_all);
     free_xml(data->params_restart);
     free(data->digest_all_calc);
     free(data->digest_restart_calc);
     free(data);
 }
 
 static node_t *
 create_node(const char *id, const char *uname, const char *type, const char *score, pe_working_set_t * data_set)
 {
     node_t *new_node = NULL;
 
     if (pe_find_node(data_set->nodes, uname) != NULL) {
         crm_config_warn("Detected multiple node entries with uname=%s"
                         " - this is rarely intended", uname);
     }
 
     new_node = calloc(1, sizeof(node_t));
     if (new_node == NULL) {
         return NULL;
     }
 
     new_node->weight = char2score(score);
     new_node->fixed = FALSE;
     new_node->details = calloc(1, sizeof(struct node_shared_s));
 
     if (new_node->details == NULL) {
         free(new_node);
         return NULL;
     }
 
     crm_trace("Creating node for entry %s/%s", uname, id);
     new_node->details->id = id;
     new_node->details->uname = uname;
     new_node->details->online = FALSE;
     new_node->details->shutdown = FALSE;
     new_node->details->rsc_discovery_enabled = TRUE;
     new_node->details->running_rsc = NULL;
     new_node->details->type = node_ping;
 
     if (safe_str_eq(type, "remote")) {
         new_node->details->type = node_remote;
         set_bit(data_set->flags, pe_flag_have_remote_nodes);
     } else if (type == NULL || safe_str_eq(type, "member")
         || safe_str_eq(type, NORMALNODE)) {
         new_node->details->type = node_member;
     }
 
     new_node->details->attrs = g_hash_table_new_full(crm_str_hash, g_str_equal,
                                                      g_hash_destroy_str,
                                                      g_hash_destroy_str);
 
     if (is_remote_node(new_node)) {
         g_hash_table_insert(new_node->details->attrs, strdup("#kind"), strdup("remote"));
     } else {
         g_hash_table_insert(new_node->details->attrs, strdup("#kind"), strdup("cluster"));
     }
 
     new_node->details->utilization =
         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str,
                               g_hash_destroy_str);
 
     new_node->details->digest_cache =
         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str,
                               destroy_digest_cache);
 
     data_set->nodes = g_list_insert_sorted(data_set->nodes, new_node, sort_node_uname);
     return new_node;
 }
 
 static const char *
 expand_remote_rsc_meta(xmlNode *xml_obj, xmlNode *parent, GHashTable **rsc_name_check)
 {
     xmlNode *xml_rsc = NULL;
     xmlNode *xml_tmp = NULL;
     xmlNode *attr_set = NULL;
     xmlNode *attr = NULL;
 
     const char *container_id = ID(xml_obj);
     const char *remote_name = NULL;
     const char *remote_server = NULL;
     const char *remote_port = NULL;
     const char *connect_timeout = "60s";
     const char *remote_allow_migrate=NULL;
     char *tmp_id = NULL;
 
     for (attr_set = __xml_first_child(xml_obj); attr_set != NULL; attr_set = __xml_next(attr_set)) {
         if (safe_str_neq((const char *)attr_set->name, XML_TAG_META_SETS)) {
             continue;
         }
 
         for (attr = __xml_first_child(attr_set); attr != NULL; attr = __xml_next(attr)) {
             const char *value = crm_element_value(attr, XML_NVPAIR_ATTR_VALUE);
             const char *name = crm_element_value(attr, XML_NVPAIR_ATTR_NAME);
 
             if (safe_str_eq(name, XML_RSC_ATTR_REMOTE_NODE)) {
                 remote_name = value;
             } else if (safe_str_eq(name, "remote-addr")) {
                 remote_server = value;
             } else if (safe_str_eq(name, "remote-port")) {
                 remote_port = value;
             } else if (safe_str_eq(name, "remote-connect-timeout")) {
                 connect_timeout = value;
             } else if (safe_str_eq(name, "remote-allow-migrate")) {
                 remote_allow_migrate=value;
             }
         }
     }
 
     if (remote_name == NULL) {
         return NULL;
     }
 
     if (*rsc_name_check == NULL) {
         *rsc_name_check = g_hash_table_new(crm_str_hash, g_str_equal);
         for (xml_rsc = __xml_first_child(parent); xml_rsc != NULL; xml_rsc = __xml_next(xml_rsc)) {
             const char *id = ID(xml_rsc);
 
             /* avoiding heap allocation here because we know the duration of this hashtable allows us to */
             g_hash_table_insert(*rsc_name_check, (char *) id, (char *) id);
         }
     }
 
     if (g_hash_table_lookup(*rsc_name_check, remote_name)) {
 
         crm_err("Naming conflict with remote-node=%s.  remote-nodes can not have the same name as a resource.",
                 remote_name);
         return NULL;
     }
 
     xml_rsc = create_xml_node(parent, XML_CIB_TAG_RESOURCE);
 
     crm_xml_add(xml_rsc, XML_ATTR_ID, remote_name);
     crm_xml_add(xml_rsc, XML_AGENT_ATTR_CLASS, "ocf");
     crm_xml_add(xml_rsc, XML_AGENT_ATTR_PROVIDER, "pacemaker");
     crm_xml_add(xml_rsc, XML_ATTR_TYPE, "remote");
 
     xml_tmp = create_xml_node(xml_rsc, XML_TAG_META_SETS);
     tmp_id = crm_concat(remote_name, XML_TAG_META_SETS, '_');
     crm_xml_add(xml_tmp, XML_ATTR_ID, tmp_id);
     free(tmp_id);
 
     attr = create_xml_node(xml_tmp, XML_CIB_TAG_NVPAIR);
     tmp_id = crm_concat(remote_name, "meta-attributes-container", '_');
     crm_xml_add(attr, XML_ATTR_ID, tmp_id);
     crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, XML_RSC_ATTR_CONTAINER);
     crm_xml_add(attr, XML_NVPAIR_ATTR_VALUE, container_id);
     free(tmp_id);
 
     attr = create_xml_node(xml_tmp, XML_CIB_TAG_NVPAIR);
     tmp_id = crm_concat(remote_name, "meta-attributes-internal", '_');
     crm_xml_add(attr, XML_ATTR_ID, tmp_id);
     crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, XML_RSC_ATTR_INTERNAL_RSC);
     crm_xml_add(attr, XML_NVPAIR_ATTR_VALUE, "true");
     free(tmp_id);
 
     if (remote_allow_migrate) {
         attr = create_xml_node(xml_tmp, XML_CIB_TAG_NVPAIR);
         tmp_id = crm_concat(remote_name, "meta-attributes-container", '_');
         crm_xml_add(attr, XML_ATTR_ID, tmp_id);
         crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, XML_OP_ATTR_ALLOW_MIGRATE);
         crm_xml_add(attr, XML_NVPAIR_ATTR_VALUE, remote_allow_migrate);
         free(tmp_id);
     }
 
     xml_tmp = create_xml_node(xml_rsc, "operations");
     attr = create_xml_node(xml_tmp, XML_ATTR_OP);
     tmp_id = crm_concat(remote_name, "monitor-interval-30s", '_');
     crm_xml_add(attr, XML_ATTR_ID, tmp_id);
     crm_xml_add(attr, XML_ATTR_TIMEOUT, "30s");
     crm_xml_add(attr, XML_LRM_ATTR_INTERVAL, "30s");
     crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, "monitor");
     free(tmp_id);
 
     if (connect_timeout) {
         attr = create_xml_node(xml_tmp, XML_ATTR_OP);
         tmp_id = crm_concat(remote_name, "start-interval-0", '_');
         crm_xml_add(attr, XML_ATTR_ID, tmp_id);
         crm_xml_add(attr, XML_ATTR_TIMEOUT, connect_timeout);
         crm_xml_add(attr, XML_LRM_ATTR_INTERVAL, "0");
         crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, "start");
         free(tmp_id);
     }
 
     if (remote_port || remote_server) {
         xml_tmp = create_xml_node(xml_rsc, XML_TAG_ATTR_SETS);
         tmp_id = crm_concat(remote_name, XML_TAG_ATTR_SETS, '_');
         crm_xml_add(xml_tmp, XML_ATTR_ID, tmp_id);
         free(tmp_id);
 
         if (remote_server) {
             attr = create_xml_node(xml_tmp, XML_CIB_TAG_NVPAIR);
             tmp_id = crm_concat(remote_name, "instance-attributes-addr", '_');
             crm_xml_add(attr, XML_ATTR_ID, tmp_id);
             crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, "addr");
             crm_xml_add(attr, XML_NVPAIR_ATTR_VALUE, remote_server);
             free(tmp_id);
         }
         if (remote_port) {
             attr = create_xml_node(xml_tmp, XML_CIB_TAG_NVPAIR);
             tmp_id = crm_concat(remote_name, "instance-attributes-port", '_');
             crm_xml_add(attr, XML_ATTR_ID, tmp_id);
             crm_xml_add(attr, XML_NVPAIR_ATTR_NAME, "port");
             crm_xml_add(attr, XML_NVPAIR_ATTR_VALUE, remote_port);
             free(tmp_id);
         }
     }
 
     return remote_name;
 }
 
 static void
 handle_startup_fencing(pe_working_set_t *data_set, node_t *new_node)
 {
     static const char *blind_faith = NULL;
     static gboolean unseen_are_unclean = TRUE;
 
     if ((new_node->details->type == node_remote) && (new_node->details->remote_rsc == NULL)) {
         /* ignore fencing remote-nodes that don't have a conneciton resource associated
          * with them. This happens when remote-node entries get left in the nodes section
          * after the connection resource is removed */
         return;
     }
 
     blind_faith = pe_pref(data_set->config_hash, "startup-fencing");
 
     if (crm_is_true(blind_faith) == FALSE) {
         unseen_are_unclean = FALSE;
         crm_warn("Blind faith: not fencing unseen nodes");
     }
 
     if (is_set(data_set->flags, pe_flag_stonith_enabled) == FALSE
         || unseen_are_unclean == FALSE) {
         /* blind faith... */
         new_node->details->unclean = FALSE;
 
     } else {
         /* all nodes are unclean until we've seen their
          * status entry
          */
         new_node->details->unclean = TRUE;
     }
 
     /* We need to be able to determine if a node's status section
      * exists or not separate from whether the node is unclean. */
     new_node->details->unseen = TRUE;
 }
 
 gboolean
 unpack_nodes(xmlNode * xml_nodes, pe_working_set_t * data_set)
 {
     xmlNode *xml_obj = NULL;
     node_t *new_node = NULL;
     const char *id = NULL;
     const char *uname = NULL;
     const char *type = NULL;
     const char *score = NULL;
 
     for (xml_obj = __xml_first_child(xml_nodes); xml_obj != NULL; xml_obj = __xml_next(xml_obj)) {
         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_NODE, TRUE)) {
             new_node = NULL;
 
             id = crm_element_value(xml_obj, XML_ATTR_ID);
             uname = crm_element_value(xml_obj, XML_ATTR_UNAME);
             type = crm_element_value(xml_obj, XML_ATTR_TYPE);
             score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
             crm_trace("Processing node %s/%s", uname, id);
 
             if (id == NULL) {
                 crm_config_err("Must specify id tag in <node>");
                 continue;
             }
             new_node = create_node(id, uname, type, score, data_set);
 
             if (new_node == NULL) {
                 return FALSE;
             }
 
 /* 		if(data_set->have_quorum == FALSE */
 /* 		   && data_set->no_quorum_policy == no_quorum_stop) { */
 /* 			/\* start shutting resources down *\/ */
 /* 			new_node->weight = -INFINITY; */
 /* 		} */
 
             handle_startup_fencing(data_set, new_node);
 
             add_node_attrs(xml_obj, new_node, FALSE, data_set);
             unpack_instance_attributes(data_set->input, xml_obj, XML_TAG_UTILIZATION, NULL,
                                        new_node->details->utilization, NULL, FALSE, data_set->now);
 
             crm_trace("Done with node %s", crm_element_value(xml_obj, XML_ATTR_UNAME));
         }
     }
 
     if (data_set->localhost && pe_find_node(data_set->nodes, data_set->localhost) == NULL) {
         crm_info("Creating a fake local node");
         create_node(data_set->localhost, data_set->localhost, NULL, 0, data_set);
     }
 
     return TRUE;
 }
 
 static void
 setup_container(resource_t * rsc, pe_working_set_t * data_set)
 {
     const char *container_id = NULL;
 
     if (rsc->children) {
         GListPtr gIter = rsc->children;
 
         for (; gIter != NULL; gIter = gIter->next) {
             resource_t *child_rsc = (resource_t *) gIter->data;
 
             setup_container(child_rsc, data_set);
         }
         return;
     }
 
     container_id = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_CONTAINER);
     if (container_id && safe_str_neq(container_id, rsc->id)) {
         resource_t *container = pe_find_resource(data_set->resources, container_id);
 
         if (container) {
             rsc->container = container;
             container->fillers = g_list_append(container->fillers, rsc);
             pe_rsc_trace(rsc, "Resource %s's container is %s", rsc->id, container_id);
         } else {
             pe_err("Resource %s: Unknown resource container (%s)", rsc->id, container_id);
         }
     }
 }
 
 gboolean
 unpack_remote_nodes(xmlNode * xml_resources, pe_working_set_t * data_set)
 {
     xmlNode *xml_obj = NULL;
     GHashTable *rsc_name_check = NULL;
 
     /* generate remote nodes from resource config before unpacking resources */
     for (xml_obj = __xml_first_child(xml_resources); xml_obj != NULL; xml_obj = __xml_next(xml_obj)) {
         const char *new_node_id = NULL;
 
         /* remote rsc can be defined as primitive, or exist within the metadata of another rsc */
         if (xml_contains_remote_node(xml_obj)) {
             new_node_id = ID(xml_obj);
             /* This check is here to make sure we don't iterate over
              * an expanded node that has already been added to the node list. */
             if (new_node_id && pe_find_node(data_set->nodes, new_node_id) != NULL) {
                 continue;
             }
         } else {
             /* expands a metadata defined remote resource into the xml config
              * as an actual rsc primitive to be unpacked later. */
             new_node_id = expand_remote_rsc_meta(xml_obj, xml_resources, &rsc_name_check);
         }
 
         if (new_node_id) {
             crm_trace("detected remote node %s", new_node_id);
 
             /* only create the remote node entry if the node didn't already exist */
             if (pe_find_node(data_set->nodes, new_node_id) == NULL) {
                 create_node(new_node_id, new_node_id, "remote", NULL, data_set);
             }
 
         }
     }
     if (rsc_name_check) {
         g_hash_table_destroy(rsc_name_check);
     }
 
     return TRUE;
 }
 
 
 /* Call this after all the nodes and resources have been
  * unpacked, but before the status section is read.
  *
  * A remote node's online status is reflected by the state
  * of the remote node's connection resource. We need to link
  * the remote node to this connection resource so we can have
  * easy access to the connection resource during the PE calculations.
  */
 static void
 link_rsc2remotenode(pe_working_set_t *data_set, resource_t *new_rsc)
 {
     node_t *remote_node = NULL;
 
     if (new_rsc->is_remote_node == FALSE) {
         return;
     }
 
     if (is_set(data_set->flags, pe_flag_quick_location)) {
         /* remote_nodes and remote_resources are not linked in quick location calculations */
         return;
     }
 
     print_resource(LOG_DEBUG_3, "Linking remote-node connection resource, ", new_rsc, FALSE);
 
     remote_node = pe_find_node(data_set->nodes, new_rsc->id);
     CRM_CHECK(remote_node != NULL, return;);
 
     remote_node->details->remote_rsc = new_rsc;
     /* If this is a baremetal remote-node (no container resource
      * associated with it) then we need to handle startup fencing the same way
      * as cluster nodes. */
     if (new_rsc->container == NULL) {
         handle_startup_fencing(data_set, remote_node);
     } else {
         /* At this point we know if the remote node is a container or baremetal
          * remote node, update the #kind attribute if a container is involved */
         g_hash_table_replace(remote_node->details->attrs, strdup("#kind"), strdup("container"));
     }
 }
 
 static void
 destroy_tag(gpointer data)
 {
     tag_t *tag = data;
 
     if (tag) {
         free(tag->id);
         g_list_free_full(tag->refs, free);
         free(tag);
     }
 }
 
 gboolean
 unpack_resources(xmlNode * xml_resources, pe_working_set_t * data_set)
 {
     xmlNode *xml_obj = NULL;
     GListPtr gIter = NULL;
 
     data_set->template_rsc_sets =
         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str,
                               destroy_tag);
 
     for (xml_obj = __xml_first_child(xml_resources); xml_obj != NULL; xml_obj = __xml_next(xml_obj)) {
         resource_t *new_rsc = NULL;
 
         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_RSC_TEMPLATE, TRUE)) {
             const char *template_id = ID(xml_obj);
 
             if (template_id && g_hash_table_lookup_extended(data_set->template_rsc_sets,
                                                             template_id, NULL, NULL) == FALSE) {
                 /* Record the template's ID for the knowledge of its existence anyway. */
                 g_hash_table_insert(data_set->template_rsc_sets, strdup(template_id), NULL);
             }
             continue;
         }
 
         crm_trace("Beginning unpack... <%s id=%s... >", crm_element_name(xml_obj), ID(xml_obj));
         if (common_unpack(xml_obj, &new_rsc, NULL, data_set)) {
             data_set->resources = g_list_append(data_set->resources, new_rsc);
 
             if (xml_contains_remote_node(xml_obj)) {
                 new_rsc->is_remote_node = TRUE;
             }
             print_resource(LOG_DEBUG_3, "Added ", new_rsc, FALSE);
 
         } else {
             crm_config_err("Failed unpacking %s %s",
                            crm_element_name(xml_obj), crm_element_value(xml_obj, XML_ATTR_ID));
             if (new_rsc != NULL && new_rsc->fns != NULL) {
                 new_rsc->fns->free(new_rsc);
             }
         }
     }
 
     for (gIter = data_set->resources; gIter != NULL; gIter = gIter->next) {
         resource_t *rsc = (resource_t *) gIter->data;
 
         setup_container(rsc, data_set);
         link_rsc2remotenode(data_set, rsc);
     }
 
     data_set->resources = g_list_sort(data_set->resources, sort_rsc_priority);
     if (is_set(data_set->flags, pe_flag_quick_location)) {
         /* Ignore */
 
     } else if (is_set(data_set->flags, pe_flag_stonith_enabled)
                && is_set(data_set->flags, pe_flag_have_stonith_resource) == FALSE) {
 
         crm_config_err("Resource start-up disabled since no STONITH resources have been defined");
         crm_config_err("Either configure some or disable STONITH with the stonith-enabled option");
         crm_config_err("NOTE: Clusters with shared data need STONITH to ensure data integrity");
     }
 
     return TRUE;
 }
 
 gboolean
 unpack_tags(xmlNode * xml_tags, pe_working_set_t * data_set)
 {
     xmlNode *xml_tag = NULL;
 
     data_set->tags =
         g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, destroy_tag);
 
     for (xml_tag = __xml_first_child(xml_tags); xml_tag != NULL; xml_tag = __xml_next(xml_tag)) {
         xmlNode *xml_obj_ref = NULL;
         const char *tag_id = ID(xml_tag);
 
         if (crm_str_eq((const char *)xml_tag->name, XML_CIB_TAG_TAG, TRUE) == FALSE) {
             continue;
         }
 
         if (tag_id == NULL) {
             crm_config_err("Failed unpacking %s: %s should be specified",
                            crm_element_name(xml_tag), XML_ATTR_ID);
             continue;
         }
 
         for (xml_obj_ref = __xml_first_child(xml_tag); xml_obj_ref != NULL; xml_obj_ref = __xml_next(xml_obj_ref)) {
             const char *obj_ref = ID(xml_obj_ref);
 
             if (crm_str_eq((const char *)xml_obj_ref->name, XML_CIB_TAG_OBJ_REF, TRUE) == FALSE) {
                 continue;
             }
 
             if (obj_ref == NULL) {
                 crm_config_err("Failed unpacking %s for tag %s: %s should be specified",
                                crm_element_name(xml_obj_ref), tag_id, XML_ATTR_ID);
                 continue;
             }
 
             if (add_tag_ref(data_set->tags, tag_id, obj_ref) == FALSE) {
                 return FALSE;
             }
         }
     }
 
     return TRUE;
 }
 
 /* The ticket state section:
  * "/cib/status/tickets/ticket_state" */
 static gboolean
 unpack_ticket_state(xmlNode * xml_ticket, pe_working_set_t * data_set)
 {
     const char *ticket_id = NULL;
     const char *granted = NULL;
     const char *last_granted = NULL;
     const char *standby = NULL;
     xmlAttrPtr xIter = NULL;
 
     ticket_t *ticket = NULL;
 
     ticket_id = ID(xml_ticket);
     if (ticket_id == NULL || strlen(ticket_id) == 0) {
         return FALSE;
     }
 
     crm_trace("Processing ticket state for %s", ticket_id);
 
     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
     if (ticket == NULL) {
         ticket = ticket_new(ticket_id, data_set);
         if (ticket == NULL) {
             return FALSE;
         }
     }
 
     for (xIter = xml_ticket->properties; xIter; xIter = xIter->next) {
         const char *prop_name = (const char *)xIter->name;
         const char *prop_value = crm_element_value(xml_ticket, prop_name);
 
         if (crm_str_eq(prop_name, XML_ATTR_ID, TRUE)) {
             continue;
         }
         g_hash_table_replace(ticket->state, strdup(prop_name), strdup(prop_value));
     }
 
     granted = g_hash_table_lookup(ticket->state, "granted");
     if (granted && crm_is_true(granted)) {
         ticket->granted = TRUE;
         crm_info("We have ticket '%s'", ticket->id);
     } else {
         ticket->granted = FALSE;
         crm_info("We do not have ticket '%s'", ticket->id);
     }
 
     last_granted = g_hash_table_lookup(ticket->state, "last-granted");
     if (last_granted) {
         ticket->last_granted = crm_parse_int(last_granted, 0);
     }
 
     standby = g_hash_table_lookup(ticket->state, "standby");
     if (standby && crm_is_true(standby)) {
         ticket->standby = TRUE;
         if (ticket->granted) {
             crm_info("Granted ticket '%s' is in standby-mode", ticket->id);
         }
     } else {
         ticket->standby = FALSE;
     }
 
     crm_trace("Done with ticket state for %s", ticket_id);
 
     return TRUE;
 }
 
 static gboolean
 unpack_tickets_state(xmlNode * xml_tickets, pe_working_set_t * data_set)
 {
     xmlNode *xml_obj = NULL;
 
     for (xml_obj = __xml_first_child(xml_tickets); xml_obj != NULL; xml_obj = __xml_next(xml_obj)) {
         if (crm_str_eq((const char *)xml_obj->name, XML_CIB_TAG_TICKET_STATE, TRUE) == FALSE) {
             continue;
         }
         unpack_ticket_state(xml_obj, data_set);
     }
 
     return TRUE;
 }
 
 /* Compatibility with the deprecated ticket state section:
  * "/cib/status/tickets/instance_attributes" */
 static void
 get_ticket_state_legacy(gpointer key, gpointer value, gpointer user_data)
 {
     const char *long_key = key;
     char *state_key = NULL;
 
     const char *granted_prefix = "granted-ticket-";
     const char *last_granted_prefix = "last-granted-";
     static int granted_prefix_strlen = 0;
     static int last_granted_prefix_strlen = 0;
 
     const char *ticket_id = NULL;
     const char *is_granted = NULL;
     const char *last_granted = NULL;
     const char *sep = NULL;
 
     ticket_t *ticket = NULL;
     pe_working_set_t *data_set = user_data;
 
     if (granted_prefix_strlen == 0) {
         granted_prefix_strlen = strlen(granted_prefix);
     }
 
     if (last_granted_prefix_strlen == 0) {
         last_granted_prefix_strlen = strlen(last_granted_prefix);
     }
 
     if (strstr(long_key, granted_prefix) == long_key) {
         ticket_id = long_key + granted_prefix_strlen;
         if (strlen(ticket_id)) {
             state_key = strdup("granted");
             is_granted = value;
         }
     } else if (strstr(long_key, last_granted_prefix) == long_key) {
         ticket_id = long_key + last_granted_prefix_strlen;
         if (strlen(ticket_id)) {
             state_key = strdup("last-granted");
             last_granted = value;
         }
     } else if ((sep = strrchr(long_key, '-'))) {
         ticket_id = sep + 1;
         state_key = strndup(long_key, strlen(long_key) - strlen(sep));
     }
 
     if (ticket_id == NULL || strlen(ticket_id) == 0) {
         free(state_key);
         return;
     }
 
     if (state_key == NULL || strlen(state_key) == 0) {
         free(state_key);
         return;
     }
 
     ticket = g_hash_table_lookup(data_set->tickets, ticket_id);
     if (ticket == NULL) {
         ticket = ticket_new(ticket_id, data_set);
         if (ticket == NULL) {
             free(state_key);
             return;
         }
     }
 
     g_hash_table_replace(ticket->state, state_key, strdup(value));
 
     if (is_granted) {
         if (crm_is_true(is_granted)) {
             ticket->granted = TRUE;
             crm_info("We have ticket '%s'", ticket->id);
         } else {
             ticket->granted = FALSE;
             crm_info("We do not have ticket '%s'", ticket->id);
         }
 
     } else if (last_granted) {
         ticket->last_granted = crm_parse_int(last_granted, 0);
     }
 }
 
 /* remove nodes that are down, stopping */
 /* create +ve rsc_to_node constraints between resources and the nodes they are running on */
 /* anything else? */
 gboolean
 unpack_status(xmlNode * status, pe_working_set_t * data_set)
 {
     const char *id = NULL;
     const char *uname = NULL;
 
     xmlNode *state = NULL;
     xmlNode *lrm_rsc = NULL;
     node_t *this_node = NULL;
 
     crm_trace("Beginning unpack");
 
     if (data_set->tickets == NULL) {
         data_set->tickets =
             g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str, destroy_ticket);
     }
 
     for (state = __xml_first_child(status); state != NULL; state = __xml_next(state)) {
         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_TICKETS, TRUE)) {
             xmlNode *xml_tickets = state;
             GHashTable *state_hash = NULL;
 
             /* Compatibility with the deprecated ticket state section:
              * Unpack the attributes in the deprecated "/cib/status/tickets/instance_attributes" if it exists. */
             state_hash =
                 g_hash_table_new_full(crm_str_hash, g_str_equal, g_hash_destroy_str,
                                       g_hash_destroy_str);
 
             unpack_instance_attributes(data_set->input, xml_tickets, XML_TAG_ATTR_SETS, NULL,
                                        state_hash, NULL, TRUE, data_set->now);
 
             g_hash_table_foreach(state_hash, get_ticket_state_legacy, data_set);
 
             if (state_hash) {
                 g_hash_table_destroy(state_hash);
             }
 
             /* Unpack the new "/cib/status/tickets/ticket_state"s */
             unpack_tickets_state(xml_tickets, data_set);
         }
 
         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE)) {
             xmlNode *attrs = NULL;
             const char *resource_discovery_enabled = NULL;
 
             id = crm_element_value(state, XML_ATTR_ID);
             uname = crm_element_value(state, XML_ATTR_UNAME);
             this_node = pe_find_node_any(data_set->nodes, id, uname);
 
             if (uname == NULL) {
                 /* error */
                 continue;
 
             } else if (this_node == NULL) {
                 crm_config_warn("Node %s in status section no longer exists", uname);
                 continue;
 
             } else if (is_remote_node(this_node)) {
-                /* online state for remote nodes is determined by the rsc state
-                 * after all the unpacking is done. */
+                /* online state for remote nodes is determined by the
+                 * rsc state after all the unpacking is done. we do however
+                 * need to mark whether or not the node has been fenced as this plays
+                 * a role during unpacking cluster node resource state */
+                this_node->details->remote_was_fenced = crm_is_true(crm_element_value(state, XML_NODE_IS_REMOTE_FENCED));
                 continue;
             }
 
             crm_trace("Processing node id=%s, uname=%s", id, uname);
 
             /* Mark the node as provisionally clean
              * - at least we have seen it in the current cluster's lifetime
              */
             this_node->details->unclean = FALSE;
             this_node->details->unseen = FALSE;
             attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
             add_node_attrs(attrs, this_node, TRUE, data_set);
 
             if (crm_is_true(g_hash_table_lookup(this_node->details->attrs, "standby"))) {
                 crm_info("Node %s is in standby-mode", this_node->details->uname);
                 this_node->details->standby = TRUE;
             }
 
             if (crm_is_true(g_hash_table_lookup(this_node->details->attrs, "maintenance"))) {
                 crm_info("Node %s is in maintenance-mode", this_node->details->uname);
                 this_node->details->maintenance = TRUE;
             }
 
             resource_discovery_enabled = g_hash_table_lookup(this_node->details->attrs, XML_NODE_ATTR_RSC_DISCOVERY);
             if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
                 crm_warn("ignoring %s attribute on node %s, disabling resource discovery is not allowed on cluster nodes",
                     XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
             }
 
             crm_trace("determining node state");
             determine_online_status(state, this_node, data_set);
 
             if (this_node->details->online && data_set->no_quorum_policy == no_quorum_suicide) {
                 /* Everything else should flow from this automatically
                  * At least until the PE becomes able to migrate off healthy resources
                  */
                 pe_fence_node(data_set, this_node, "because the cluster does not have quorum");
             }
         }
     }
 
     /* Now that we know all node states, we can safely handle migration ops */
     for (state = __xml_first_child(status); state != NULL; state = __xml_next(state)) {
         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE) == FALSE) {
             continue;
         }
 
         id = crm_element_value(state, XML_ATTR_ID);
         uname = crm_element_value(state, XML_ATTR_UNAME);
         this_node = pe_find_node_any(data_set->nodes, id, uname);
 
         if (this_node == NULL) {
             crm_info("Node %s is unknown", id);
             continue;
 
         } else if (is_remote_node(this_node)) {
 
             /* online status of remote node can not be determined until all other
              * resource status is unpacked. */
             continue;
         } else if (this_node->details->online || is_set(data_set->flags, pe_flag_stonith_enabled)) {
             crm_trace("Processing lrm resource entries on healthy node: %s",
                       this_node->details->uname);
             lrm_rsc = find_xml_node(state, XML_CIB_TAG_LRM, FALSE);
             lrm_rsc = find_xml_node(lrm_rsc, XML_LRM_TAG_RESOURCES, FALSE);
             unpack_lrm_resources(this_node, lrm_rsc, data_set);
         }
     }
 
     /* now that the rest of the cluster's status is determined
      * calculate remote-nodes */
     unpack_remote_status(status, data_set);
 
     return TRUE;
 }
 
 gboolean
 unpack_remote_status(xmlNode * status, pe_working_set_t * data_set)
 {
     const char *id = NULL;
     const char *uname = NULL;
     GListPtr gIter = NULL;
 
     xmlNode *state = NULL;
     xmlNode *lrm_rsc = NULL;
     node_t *this_node = NULL;
 
     if (is_set(data_set->flags, pe_flag_have_remote_nodes) == FALSE) {
         crm_trace("no remote nodes to unpack");
         return TRUE;
     }
 
     /* get online status */
     for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
         this_node = gIter->data;
 
         if ((this_node == NULL) || (is_remote_node(this_node) == FALSE)) {
             continue;
         }
         determine_remote_online_status(this_node);
     }
 
     /* process attributes */
     for (state = __xml_first_child(status); state != NULL; state = __xml_next(state)) {
         const char *resource_discovery_enabled = NULL;
         xmlNode *attrs = NULL;
         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE) == FALSE) {
             continue;
         }
 
         id = crm_element_value(state, XML_ATTR_ID);
         uname = crm_element_value(state, XML_ATTR_UNAME);
         this_node = pe_find_node_any(data_set->nodes, id, uname);
 
         if ((this_node == NULL) || (is_remote_node(this_node) == FALSE)) {
             continue;
         }
         crm_trace("Processing remote node id=%s, uname=%s", id, uname);
 
         if (this_node->details->remote_requires_reset == FALSE) {
             this_node->details->unclean = FALSE;
             this_node->details->unseen = FALSE;
         }
         attrs = find_xml_node(state, XML_TAG_TRANSIENT_NODEATTRS, FALSE);
         add_node_attrs(attrs, this_node, TRUE, data_set);
 
         if (crm_is_true(g_hash_table_lookup(this_node->details->attrs, "standby"))) {
             crm_info("Node %s is in standby-mode", this_node->details->uname);
             this_node->details->standby = TRUE;
         }
 
         if (crm_is_true(g_hash_table_lookup(this_node->details->attrs, "maintenance"))) {
             crm_info("Node %s is in maintenance-mode", this_node->details->uname);
             this_node->details->maintenance = TRUE;
         }
 
         resource_discovery_enabled = g_hash_table_lookup(this_node->details->attrs, XML_NODE_ATTR_RSC_DISCOVERY);
         if (resource_discovery_enabled && !crm_is_true(resource_discovery_enabled)) {
             if (is_baremetal_remote_node(this_node) && is_not_set(data_set->flags, pe_flag_stonith_enabled)) {
                 crm_warn("ignoring %s attribute on baremetal remote node %s, disabling resource discovery requires stonith to be enabled.",
                     XML_NODE_ATTR_RSC_DISCOVERY, this_node->details->uname);
             } else {
                 /* if we're here, this is either a baremetal node and fencing is enabled,
                  * or this is a container node which we don't care if fencing is enabled 
                  * or not on. container nodes are 'fenced' by recovering the container resource
                  * regardless of whether fencing is enabled. */
                 crm_info("Node %s has resource discovery disabled", this_node->details->uname);
                 this_node->details->rsc_discovery_enabled = FALSE;
             }
         }
     }
 
     /* process node rsc status */
     for (state = __xml_first_child(status); state != NULL; state = __xml_next(state)) {
         if (crm_str_eq((const char *)state->name, XML_CIB_TAG_STATE, TRUE) == FALSE) {
             continue;
         }
 
         id = crm_element_value(state, XML_ATTR_ID);
         uname = crm_element_value(state, XML_ATTR_UNAME);
         this_node = pe_find_node_any(data_set->nodes, id, uname);
 
         if ((this_node == NULL) || (is_remote_node(this_node) == FALSE)) {
             continue;
         }
         crm_trace("Processing lrm resource entries on healthy remote node: %s",
                   this_node->details->uname);
         lrm_rsc = find_xml_node(state, XML_CIB_TAG_LRM, FALSE);
         lrm_rsc = find_xml_node(lrm_rsc, XML_LRM_TAG_RESOURCES, FALSE);
         unpack_lrm_resources(this_node, lrm_rsc, data_set);
     }
 
     return TRUE;
 }
 
 static gboolean
 determine_online_status_no_fencing(pe_working_set_t * data_set, xmlNode * node_state,
                                    node_t * this_node)
 {
     gboolean online = FALSE;
     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
 
     if (!crm_is_true(in_cluster)) {
         crm_trace("Node is down: in_cluster=%s", crm_str(in_cluster));
 
     } else if (safe_str_eq(is_peer, ONLINESTATUS)) {
         if (safe_str_eq(join, CRMD_JOINSTATE_MEMBER)) {
             online = TRUE;
         } else {
             crm_debug("Node is not ready to run resources: %s", join);
         }
 
     } else if (this_node->details->expected_up == FALSE) {
         crm_trace("CRMd is down: in_cluster=%s", crm_str(in_cluster));
         crm_trace("\tis_peer=%s, join=%s, expected=%s",
                   crm_str(is_peer), crm_str(join), crm_str(exp_state));
 
     } else {
         /* mark it unclean */
         pe_fence_node(data_set, this_node, "unexpectedly down");
         crm_info("\tin_cluster=%s, is_peer=%s, join=%s, expected=%s",
                  crm_str(in_cluster), crm_str(is_peer), crm_str(join), crm_str(exp_state));
     }
     return online;
 }
 
 static gboolean
 determine_online_status_fencing(pe_working_set_t * data_set, xmlNode * node_state,
                                 node_t * this_node)
 {
     gboolean online = FALSE;
     gboolean do_terminate = FALSE;
     const char *join = crm_element_value(node_state, XML_NODE_JOIN_STATE);
     const char *is_peer = crm_element_value(node_state, XML_NODE_IS_PEER);
     const char *in_cluster = crm_element_value(node_state, XML_NODE_IN_CLUSTER);
     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
     const char *terminate = g_hash_table_lookup(this_node->details->attrs, "terminate");
 
 /*
   - XML_NODE_IN_CLUSTER    ::= true|false
   - XML_NODE_IS_PEER       ::= true|false|online|offline
   - XML_NODE_JOIN_STATE    ::= member|down|pending|banned
   - XML_NODE_EXPECTED      ::= member|down
 */
 
     if (crm_is_true(terminate)) {
         do_terminate = TRUE;
 
     } else if (terminate != NULL && strlen(terminate) > 0) {
         /* could be a time() value */
         char t = terminate[0];
 
         if (t != '0' && isdigit(t)) {
             do_terminate = TRUE;
         }
     }
 
     crm_trace("%s: in_cluster=%s, is_peer=%s, join=%s, expected=%s, term=%d",
               this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
               crm_str(join), crm_str(exp_state), do_terminate);
 
     online = crm_is_true(in_cluster);
     if (safe_str_eq(is_peer, ONLINESTATUS)) {
         is_peer = XML_BOOLEAN_YES;
     }
     if (exp_state == NULL) {
         exp_state = CRMD_JOINSTATE_DOWN;
     }
 
     if (this_node->details->shutdown) {
         crm_debug("%s is shutting down", this_node->details->uname);
         online = crm_is_true(is_peer);  /* Slightly different criteria since we cant shut down a dead peer */
 
     } else if (in_cluster == NULL) {
         pe_fence_node(data_set, this_node, "because the peer has not been seen by the cluster");
 
     } else if (safe_str_eq(join, CRMD_JOINSTATE_NACK)) {
         pe_fence_node(data_set, this_node, "because it failed the pacemaker membership criteria");
 
     } else if (do_terminate == FALSE && safe_str_eq(exp_state, CRMD_JOINSTATE_DOWN)) {
 
         if (crm_is_true(in_cluster) || crm_is_true(is_peer)) {
             crm_info("- Node %s is not ready to run resources", this_node->details->uname);
             this_node->details->standby = TRUE;
             this_node->details->pending = TRUE;
 
         } else {
             crm_trace("%s is down or still coming up", this_node->details->uname);
         }
 
     } else if (do_terminate && safe_str_eq(join, CRMD_JOINSTATE_DOWN)
                && crm_is_true(in_cluster) == FALSE && crm_is_true(is_peer) == FALSE) {
         crm_info("Node %s was just shot", this_node->details->uname);
         online = FALSE;
 
     } else if (crm_is_true(in_cluster) == FALSE) {
         pe_fence_node(data_set, this_node, "because the node is no longer part of the cluster");
 
     } else if (crm_is_true(is_peer) == FALSE) {
         pe_fence_node(data_set, this_node, "because our peer process is no longer available");
 
         /* Everything is running at this point, now check join state */
     } else if (do_terminate) {
         pe_fence_node(data_set, this_node, "because termination was requested");
 
     } else if (safe_str_eq(join, CRMD_JOINSTATE_MEMBER)) {
         crm_info("Node %s is active", this_node->details->uname);
 
     } else if (safe_str_eq(join, CRMD_JOINSTATE_PENDING)
                || safe_str_eq(join, CRMD_JOINSTATE_DOWN)) {
         crm_info("Node %s is not ready to run resources", this_node->details->uname);
         this_node->details->standby = TRUE;
         this_node->details->pending = TRUE;
 
     } else {
         pe_fence_node(data_set, this_node, "because the peer was in an unknown state");
         crm_warn("%s: in-cluster=%s, is-peer=%s, join=%s, expected=%s, term=%d, shutdown=%d",
                  this_node->details->uname, crm_str(in_cluster), crm_str(is_peer),
                  crm_str(join), crm_str(exp_state), do_terminate, this_node->details->shutdown);
     }
 
     return online;
 }
 
 static gboolean
 determine_remote_online_status(node_t * this_node)
 {
     resource_t *rsc = this_node->details->remote_rsc;
     resource_t *container = NULL;
 
     if (rsc == NULL) {
         this_node->details->online = FALSE;
         goto remote_online_done;
     }
 
     container = rsc->container;
 
     CRM_ASSERT(rsc != NULL);
 
     /* If the resource is currently started, mark it online. */
     if (rsc->role == RSC_ROLE_STARTED) {
         crm_trace("Remote node %s is set to ONLINE. role == started", this_node->details->id);
         this_node->details->online = TRUE;
     }
 
     /* consider this node shutting down if transitioning start->stop */
     if (rsc->role == RSC_ROLE_STARTED && rsc->next_role == RSC_ROLE_STOPPED) {
         crm_trace("Remote node %s shutdown. transition from start to stop role", this_node->details->id);
         this_node->details->shutdown = TRUE;
     }
 
     /* Now check all the failure conditions. */
     if (is_set(rsc->flags, pe_rsc_failed) ||
         (rsc->role == RSC_ROLE_STOPPED) ||
         (container && is_set(container->flags, pe_rsc_failed)) ||
         (container && container->role == RSC_ROLE_STOPPED)) {
 
         crm_trace("Remote node %s is set to OFFLINE. node is stopped or rsc failed.", this_node->details->id);
         this_node->details->online = FALSE;
     }
 
 remote_online_done:
     crm_trace("Remote node %s online=%s",
         this_node->details->id, this_node->details->online ? "TRUE" : "FALSE");
     return this_node->details->online;
 }
 
 gboolean
 determine_online_status(xmlNode * node_state, node_t * this_node, pe_working_set_t * data_set)
 {
     gboolean online = FALSE;
     const char *shutdown = NULL;
     const char *exp_state = crm_element_value(node_state, XML_NODE_EXPECTED);
 
     if (this_node == NULL) {
         crm_config_err("No node to check");
         return online;
     }
 
     this_node->details->shutdown = FALSE;
     this_node->details->expected_up = FALSE;
     shutdown = g_hash_table_lookup(this_node->details->attrs, XML_CIB_ATTR_SHUTDOWN);
 
     if (shutdown != NULL && safe_str_neq("0", shutdown)) {
         this_node->details->shutdown = TRUE;
 
     } else if (safe_str_eq(exp_state, CRMD_JOINSTATE_MEMBER)) {
         this_node->details->expected_up = TRUE;
     }
 
     if (this_node->details->type == node_ping) {
         this_node->details->unclean = FALSE;
         online = FALSE;         /* As far as resource management is concerned,
                                  * the node is safely offline.
                                  * Anyone caught abusing this logic will be shot
                                  */
 
     } else if (is_set(data_set->flags, pe_flag_stonith_enabled) == FALSE) {
         online = determine_online_status_no_fencing(data_set, node_state, this_node);
 
     } else {
         online = determine_online_status_fencing(data_set, node_state, this_node);
     }
 
     if (online) {
         this_node->details->online = TRUE;
 
     } else {
         /* remove node from contention */
         this_node->fixed = TRUE;
         this_node->weight = -INFINITY;
     }
 
     if (online && this_node->details->shutdown) {
         /* dont run resources here */
         this_node->fixed = TRUE;
         this_node->weight = -INFINITY;
     }
 
     if (this_node->details->type == node_ping) {
         crm_info("Node %s is not a pacemaker node", this_node->details->uname);
 
     } else if (this_node->details->unclean) {
         pe_proc_warn("Node %s is unclean", this_node->details->uname);
 
     } else if (this_node->details->online) {
         crm_info("Node %s is %s", this_node->details->uname,
                  this_node->details->shutdown ? "shutting down" :
                  this_node->details->pending ? "pending" :
                  this_node->details->standby ? "standby" :
                  this_node->details->maintenance ? "maintenance" : "online");
 
     } else {
         crm_trace("Node %s is offline", this_node->details->uname);
     }
 
     return online;
 }
 
 char *
 clone_strip(const char *last_rsc_id)
 {
     int lpc = 0;
     char *zero = NULL;
 
     CRM_CHECK(last_rsc_id != NULL, return NULL);
     lpc = strlen(last_rsc_id);
     while (--lpc > 0) {
         switch (last_rsc_id[lpc]) {
             case 0:
                 crm_err("Empty string: %s", last_rsc_id);
                 return NULL;
                 break;
             case '0':
             case '1':
             case '2':
             case '3':
             case '4':
             case '5':
             case '6':
             case '7':
             case '8':
             case '9':
                 break;
             case ':':
                 zero = calloc(1, lpc + 1);
                 memcpy(zero, last_rsc_id, lpc);
                 zero[lpc] = 0;
                 return zero;
             default:
                 goto done;
         }
     }
   done:
     zero = strdup(last_rsc_id);
     return zero;
 }
 
 char *
 clone_zero(const char *last_rsc_id)
 {
     int lpc = 0;
     char *zero = NULL;
 
     CRM_CHECK(last_rsc_id != NULL, return NULL);
     if (last_rsc_id != NULL) {
         lpc = strlen(last_rsc_id);
     }
 
     while (--lpc > 0) {
         switch (last_rsc_id[lpc]) {
             case 0:
                 return NULL;
                 break;
             case '0':
             case '1':
             case '2':
             case '3':
             case '4':
             case '5':
             case '6':
             case '7':
             case '8':
             case '9':
                 break;
             case ':':
                 zero = calloc(1, lpc + 3);
                 memcpy(zero, last_rsc_id, lpc);
                 zero[lpc] = ':';
                 zero[lpc + 1] = '0';
                 zero[lpc + 2] = 0;
                 return zero;
             default:
                 goto done;
         }
     }
   done:
     lpc = strlen(last_rsc_id);
     zero = calloc(1, lpc + 3);
     memcpy(zero, last_rsc_id, lpc);
     zero[lpc] = ':';
     zero[lpc + 1] = '0';
     zero[lpc + 2] = 0;
     crm_trace("%s -> %s", last_rsc_id, zero);
     return zero;
 }
 
 static resource_t *
 create_fake_resource(const char *rsc_id, xmlNode * rsc_entry, pe_working_set_t * data_set)
 {
     resource_t *rsc = NULL;
     xmlNode *xml_rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
 
     copy_in_properties(xml_rsc, rsc_entry);
     crm_xml_add(xml_rsc, XML_ATTR_ID, rsc_id);
     crm_log_xml_debug(xml_rsc, "Orphan resource");
 
     if (!common_unpack(xml_rsc, &rsc, NULL, data_set)) {
         return NULL;
     }
 
     if (xml_contains_remote_node(xml_rsc)) {
         node_t *node;
 
         crm_debug("Detected orphaned remote node %s", rsc_id);
         rsc->is_remote_node = TRUE;
         node = pe_find_node(data_set->nodes, rsc_id);
         if (node == NULL) {
 	        node = create_node(rsc_id, rsc_id, "remote", NULL, data_set);
         }
         link_rsc2remotenode(data_set, rsc);
 
         if (node) {
             crm_trace("Setting node %s as shutting down due to orphaned connection resource", rsc_id);
             node->details->shutdown = TRUE;
         }
     }
 
     if (crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER)) {
         /* This orphaned rsc needs to be mapped to a container. */
         crm_trace("Detected orphaned container filler %s", rsc_id);
         set_bit(rsc->flags, pe_rsc_orphan_container_filler);
     }
     set_bit(rsc->flags, pe_rsc_orphan);
     data_set->resources = g_list_append(data_set->resources, rsc);
     return rsc;
 }
 
 extern resource_t *create_child_clone(resource_t * rsc, int sub_id, pe_working_set_t * data_set);
 
 static resource_t *
 find_anonymous_clone(pe_working_set_t * data_set, node_t * node, resource_t * parent,
                      const char *rsc_id)
 {
     GListPtr rIter = NULL;
     resource_t *rsc = NULL;
     gboolean skip_inactive = FALSE;
 
     CRM_ASSERT(parent != NULL);
     CRM_ASSERT(parent->variant == pe_clone || parent->variant == pe_master);
     CRM_ASSERT(is_not_set(parent->flags, pe_rsc_unique));
 
     /* Find an instance active (or partially active for grouped clones) on the specified node */
     pe_rsc_trace(parent, "Looking for %s on %s in %s", rsc_id, node->details->uname, parent->id);
     for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
         GListPtr nIter = NULL;
         GListPtr locations = NULL;
         resource_t *child = rIter->data;
 
         child->fns->location(child, &locations, TRUE);
         if (locations == NULL) {
             pe_rsc_trace(child, "Resource %s, skip inactive", child->id);
             continue;
         }
 
         for (nIter = locations; nIter && rsc == NULL; nIter = nIter->next) {
             node_t *childnode = nIter->data;
 
             if (childnode->details == node->details) {
                 /* ->find_rsc() because we might be a cloned group */
                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
                 if(rsc) {
                     pe_rsc_trace(rsc, "Resource %s, active", rsc->id);
                 }
             }
 
             /* Keep this block, it means we'll do the right thing if
              * anyone toggles the unique flag to 'off'
              */
             if (rsc && rsc->running_on) {
                 crm_notice("/Anonymous/ clone %s is already running on %s",
                            parent->id, node->details->uname);
                 skip_inactive = TRUE;
                 rsc = NULL;
             }
         }
 
         g_list_free(locations);
     }
 
     /* Find an inactive instance */
     if (skip_inactive == FALSE) {
         pe_rsc_trace(parent, "Looking for %s anywhere", rsc_id);
         for (rIter = parent->children; rsc == NULL && rIter; rIter = rIter->next) {
             GListPtr locations = NULL;
             resource_t *child = rIter->data;
 
             if (is_set(child->flags, pe_rsc_block)) {
                 pe_rsc_trace(child, "Skip: blocked in stopped state");
                 continue;
             }
 
             child->fns->location(child, &locations, TRUE);
             if (locations == NULL) {
                 /* ->find_rsc() because we might be a cloned group */
                 rsc = parent->fns->find_rsc(child, rsc_id, NULL, pe_find_clone);
                 pe_rsc_trace(parent, "Resource %s, empty slot", rsc->id);
             }
             g_list_free(locations);
         }
     }
 
     if (rsc == NULL) {
         /* Create an extra orphan */
         resource_t *top = create_child_clone(parent, -1, data_set);
 
         /* ->find_rsc() because we might be a cloned group */
         rsc = top->fns->find_rsc(top, rsc_id, NULL, pe_find_clone);
         CRM_ASSERT(rsc != NULL);
 
         pe_rsc_debug(parent, "Created orphan %s for %s: %s on %s", top->id, parent->id, rsc_id,
                      node->details->uname);
     }
 
     if (safe_str_neq(rsc_id, rsc->id)) {
         pe_rsc_debug(rsc, "Internally renamed %s on %s to %s%s",
                     rsc_id, node->details->uname, rsc->id,
                     is_set(rsc->flags, pe_rsc_orphan) ? " (ORPHAN)" : "");
     }
 
     return rsc;
 }
 
 static resource_t *
 unpack_find_resource(pe_working_set_t * data_set, node_t * node, const char *rsc_id,
                      xmlNode * rsc_entry)
 {
     resource_t *rsc = NULL;
     resource_t *parent = NULL;
 
     crm_trace("looking for %s", rsc_id);
     rsc = pe_find_resource(data_set->resources, rsc_id);
 
     /* no match */
     if (rsc == NULL) {
         /* Even when clone-max=0, we still create a single :0 orphan to match against */
         char *tmp = clone_zero(rsc_id);
         resource_t *clone0 = pe_find_resource(data_set->resources, tmp);
 
         if (clone0 && is_not_set(clone0->flags, pe_rsc_unique)) {
             rsc = clone0;
         } else {
             crm_trace("%s is not known as %s either", rsc_id, tmp);
         }
 
         parent = uber_parent(clone0);
         free(tmp);
 
         crm_trace("%s not found: %s", rsc_id, parent ? parent->id : "orphan");
 
     } else if (rsc->variant > pe_native) {
         crm_trace("%s is no longer a primitve resource, the lrm_resource entry is obsolete",
                   rsc_id);
         return NULL;
 
     } else {
         parent = uber_parent(rsc);
     }
 
     if (parent && parent->variant > pe_group) {
         if (is_not_set(parent->flags, pe_rsc_unique)) {
             char *base = clone_strip(rsc_id);
 
             rsc = find_anonymous_clone(data_set, node, parent, base);
             CRM_ASSERT(rsc != NULL);
             free(base);
         }
 
         if (rsc && safe_str_neq(rsc_id, rsc->id)) {
             free(rsc->clone_name);
             rsc->clone_name = strdup(rsc_id);
         }
     }
 
     return rsc;
 }
 
 static resource_t *
 process_orphan_resource(xmlNode * rsc_entry, node_t * node, pe_working_set_t * data_set)
 {
     resource_t *rsc = NULL;
     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
 
     crm_debug("Detected orphan resource %s on %s", rsc_id, node->details->uname);
     rsc = create_fake_resource(rsc_id, rsc_entry, data_set);
 
     if (is_set(data_set->flags, pe_flag_stop_rsc_orphans) == FALSE) {
         clear_bit(rsc->flags, pe_rsc_managed);
 
     } else {
         GListPtr gIter = NULL;
 
         print_resource(LOG_DEBUG_3, "Added orphan", rsc, FALSE);
 
         CRM_CHECK(rsc != NULL, return NULL);
         resource_location(rsc, NULL, -INFINITY, "__orphan_dont_run__", data_set);
 
         for (gIter = data_set->nodes; gIter != NULL; gIter = gIter->next) {
             node_t *node = (node_t *) gIter->data;
 
             if (node->details->online && get_failcount(node, rsc, NULL, data_set)) {
                 action_t *clear_op = NULL;
                 action_t *ready = NULL;
 
                 if (is_remote_node(node)) {
                     char *pseudo_op_name = crm_concat(CRM_OP_PROBED, node->details->id, '_');
                     ready = get_pseudo_op(pseudo_op_name, data_set);
                     free(pseudo_op_name);
                 } else {
                     ready = get_pseudo_op(CRM_OP_PROBED, data_set);
                 }
 
                 clear_op = custom_action(rsc, crm_concat(rsc->id, CRM_OP_CLEAR_FAILCOUNT, '_'),
                                          CRM_OP_CLEAR_FAILCOUNT, node, FALSE, TRUE, data_set);
 
                 add_hash_param(clear_op->meta, XML_ATTR_TE_NOWAIT, XML_BOOLEAN_TRUE);
                 pe_rsc_info(rsc, "Clearing failcount (%d) for orphaned resource %s on %s (%s)",
                             get_failcount(node, rsc, NULL, data_set), rsc->id, node->details->uname,
                             clear_op->uuid);
 
                 order_actions(clear_op, ready, pe_order_optional);
             }
         }
     }
     return rsc;
 }
 
 static void
 process_rsc_state(resource_t * rsc, node_t * node,
                   enum action_fail_response on_fail,
                   xmlNode * migrate_op, pe_working_set_t * data_set)
 {
     node_t *tmpnode = NULL;
     CRM_ASSERT(rsc);
     pe_rsc_trace(rsc, "Resource %s is %s on %s: on_fail=%s",
                  rsc->id, role2text(rsc->role), node->details->uname, fail2text(on_fail));
 
     /* process current state */
     if (rsc->role != RSC_ROLE_UNKNOWN) {
         resource_t *iter = rsc;
 
         while (iter) {
             if (g_hash_table_lookup(iter->known_on, node->details->id) == NULL) {
                 node_t *n = node_copy(node);
 
                 pe_rsc_trace(rsc, "%s (aka. %s) known on %s", rsc->id, rsc->clone_name,
                              n->details->uname);
                 g_hash_table_insert(iter->known_on, (gpointer) n->details->id, n);
             }
             if (is_set(iter->flags, pe_rsc_unique)) {
                 break;
             }
             iter = iter->parent;
         }
     }
 
     if (rsc->role > RSC_ROLE_STOPPED
         && node->details->online == FALSE && is_set(rsc->flags, pe_rsc_managed)) {
 
         char *reason = NULL;
         gboolean should_fence = FALSE;
 
         /* if this is a remote_node living in a container, fence the container
          * by recovering it. Mark the resource as unmanaged. Once the container
          * and remote connenction are re-established, the status section will
          * get reset in the crmd freeing up this resource to run again once we
          * are sure we know the resources state. */
         if (is_container_remote_node(node)) {
             set_bit(rsc->flags, pe_rsc_failed);
 
             should_fence = TRUE;
         } else if (is_set(data_set->flags, pe_flag_stonith_enabled)) {
             if (is_baremetal_remote_node(node) && node->details->remote_rsc && is_not_set(node->details->remote_rsc->flags, pe_rsc_failed)) {
                 /* setting unseen = true means that fencing of the remote node will
                  * only occur if the connection resource is not going to start somewhere.
                  * This allows connection resources on a failed cluster-node to move to
                  * another node without requiring the baremetal remote nodes to be fenced
                  * as well. */
                 node->details->unseen = TRUE;
                 reason = crm_strdup_printf("because %s is active there. Fencing will be revoked if remote-node connection can be re-established on another cluster-node.", rsc->id);
             }
             should_fence = TRUE;
         }
 
         if (should_fence) {
             if (reason == NULL) {
                reason = crm_strdup_printf("because %s is thought to be active there", rsc->id);
             }
             pe_fence_node(data_set, node, reason);
         }
         free(reason);
     }
 
     if (node->details->unclean) {
         /* No extra processing needed
          * Also allows resources to be started again after a node is shot
          */
         on_fail = action_fail_ignore;
     }
 
     switch (on_fail) {
         case action_fail_ignore:
             /* nothing to do */
             break;
 
         case action_fail_fence:
             /* treat it as if it is still running
              * but also mark the node as unclean
              */
             pe_fence_node(data_set, node, "because of resource failure(s)");
             break;
 
         case action_fail_standby:
             node->details->standby = TRUE;
             node->details->standby_onfail = TRUE;
             break;
 
         case action_fail_block:
             /* is_managed == FALSE will prevent any
              * actions being sent for the resource
              */
             clear_bit(rsc->flags, pe_rsc_managed);
             set_bit(rsc->flags, pe_rsc_block);
             break;
 
         case action_fail_migrate:
             /* make sure it comes up somewhere else
              * or not at all
              */
             resource_location(rsc, node, -INFINITY, "__action_migration_auto__", data_set);
             break;
 
         case action_fail_stop:
             rsc->next_role = RSC_ROLE_STOPPED;
             break;
 
         case action_fail_recover:
             if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
                 set_bit(rsc->flags, pe_rsc_failed);
                 stop_action(rsc, node, FALSE);
             }
             break;
 
         case action_fail_restart_container:
             set_bit(rsc->flags, pe_rsc_failed);
 
             if (rsc->container) {
                 stop_action(rsc->container, node, FALSE);
             } else if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
                 stop_action(rsc, node, FALSE);
             }
             break;
         case action_fail_reset_remote:
             set_bit(rsc->flags, pe_rsc_failed);
             if (is_set(data_set->flags, pe_flag_stonith_enabled)) {
                 tmpnode = NULL;
                 if (rsc->is_remote_node) {
                     tmpnode = pe_find_node(data_set->nodes, rsc->id);
                 }
-                if (tmpnode && is_baremetal_remote_node(tmpnode)) {
+                if (tmpnode &&
+                    is_baremetal_remote_node(tmpnode) &&
+                    tmpnode->details->remote_was_fenced == FALSE) {
+
                     /* connection resource to baremetal resource failed in a way that
                      * should result in fencing the remote-node. */
                     pe_fence_node(data_set, tmpnode, "because of connection failure(s)");
                 }
             }
 
             /* require the stop action regardless if fencing is occuring or not. */
             if (rsc->role > RSC_ROLE_STOPPED) {
                 stop_action(rsc, node, FALSE);
             }
 
             /* if reconnect delay is in use, prevent the connection from exiting the
              * "STOPPED" role until the failure is cleared by the delay timeout. */
             if (rsc->remote_reconnect_interval) {
                 rsc->next_role = RSC_ROLE_STOPPED;
             }
             break;
     }
 
     /* ensure a remote-node connection failure forces an unclean remote-node
      * to be fenced. By setting unseen = FALSE, the remote-node failure will
      * result in a fencing operation regardless if we're going to attempt to 
      * reconnect to the remote-node in this transition or not. */
     if (is_set(rsc->flags, pe_rsc_failed) && rsc->is_remote_node) {
         tmpnode = pe_find_node(data_set->nodes, rsc->id);
         if (tmpnode && tmpnode->details->unclean) {
             tmpnode->details->unseen = FALSE;
         }
     }
 
     if (rsc->role != RSC_ROLE_STOPPED && rsc->role != RSC_ROLE_UNKNOWN) {
         if (is_set(rsc->flags, pe_rsc_orphan)) {
             if (is_set(rsc->flags, pe_rsc_managed)) {
                 crm_config_warn("Detected active orphan %s running on %s",
                                 rsc->id, node->details->uname);
             } else {
                 crm_config_warn("Cluster configured not to stop active orphans."
                                 " %s must be stopped manually on %s",
                                 rsc->id, node->details->uname);
             }
         }
 
         native_add_running(rsc, node, data_set);
         if (on_fail != action_fail_ignore) {
             set_bit(rsc->flags, pe_rsc_failed);
         }
 
     } else if (rsc->clone_name && strchr(rsc->clone_name, ':') != NULL) {
         /* Only do this for older status sections that included instance numbers
          * Otherwise stopped instances will appear as orphans
          */
         pe_rsc_trace(rsc, "Resetting clone_name %s for %s (stopped)", rsc->clone_name, rsc->id);
         free(rsc->clone_name);
         rsc->clone_name = NULL;
 
     } else {
         char *key = stop_key(rsc);
         GListPtr possible_matches = find_actions(rsc->actions, key, node);
         GListPtr gIter = possible_matches;
 
         for (; gIter != NULL; gIter = gIter->next) {
             action_t *stop = (action_t *) gIter->data;
 
             stop->flags |= pe_action_optional;
         }
 
         g_list_free(possible_matches);
         free(key);
     }
 }
 
 /* create active recurring operations as optional */
 static void
 process_recurring(node_t * node, resource_t * rsc,
                   int start_index, int stop_index,
                   GListPtr sorted_op_list, pe_working_set_t * data_set)
 {
     int counter = -1;
     const char *task = NULL;
     const char *status = NULL;
     GListPtr gIter = sorted_op_list;
 
     CRM_ASSERT(rsc);
     pe_rsc_trace(rsc, "%s: Start index %d, stop index = %d", rsc->id, start_index, stop_index);
 
     for (; gIter != NULL; gIter = gIter->next) {
         xmlNode *rsc_op = (xmlNode *) gIter->data;
 
         int interval = 0;
         char *key = NULL;
         const char *id = ID(rsc_op);
         const char *interval_s = NULL;
 
         counter++;
 
         if (node->details->online == FALSE) {
             pe_rsc_trace(rsc, "Skipping %s/%s: node is offline", rsc->id, node->details->uname);
             break;
 
             /* Need to check if there's a monitor for role="Stopped" */
         } else if (start_index < stop_index && counter <= stop_index) {
             pe_rsc_trace(rsc, "Skipping %s/%s: resource is not active", id, node->details->uname);
             continue;
 
         } else if (counter < start_index) {
             pe_rsc_trace(rsc, "Skipping %s/%s: old %d", id, node->details->uname, counter);
             continue;
         }
 
         interval_s = crm_element_value(rsc_op, XML_LRM_ATTR_INTERVAL);
         interval = crm_parse_int(interval_s, "0");
         if (interval == 0) {
             pe_rsc_trace(rsc, "Skipping %s/%s: non-recurring", id, node->details->uname);
             continue;
         }
 
         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
         if (safe_str_eq(status, "-1")) {
             pe_rsc_trace(rsc, "Skipping %s/%s: status", id, node->details->uname);
             continue;
         }
         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
         /* create the action */
         key = generate_op_key(rsc->id, task, interval);
         pe_rsc_trace(rsc, "Creating %s/%s", key, node->details->uname);
         custom_action(rsc, key, task, node, TRUE, TRUE, data_set);
     }
 }
 
 void
 calculate_active_ops(GListPtr sorted_op_list, int *start_index, int *stop_index)
 {
     int counter = -1;
     int implied_monitor_start = -1;
     int implied_master_start = -1;
     const char *task = NULL;
     const char *status = NULL;
     GListPtr gIter = sorted_op_list;
 
     *stop_index = -1;
     *start_index = -1;
 
     for (; gIter != NULL; gIter = gIter->next) {
         xmlNode *rsc_op = (xmlNode *) gIter->data;
 
         counter++;
 
         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
         status = crm_element_value(rsc_op, XML_LRM_ATTR_OPSTATUS);
 
         if (safe_str_eq(task, CRMD_ACTION_STOP)
             && safe_str_eq(status, "0")) {
             *stop_index = counter;
 
         } else if (safe_str_eq(task, CRMD_ACTION_START) || safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
             *start_index = counter;
 
         } else if ((implied_monitor_start <= *stop_index) && safe_str_eq(task, CRMD_ACTION_STATUS)) {
             const char *rc = crm_element_value(rsc_op, XML_LRM_ATTR_RC);
 
             if (safe_str_eq(rc, "0") || safe_str_eq(rc, "8")) {
                 implied_monitor_start = counter;
             }
         } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE) || safe_str_eq(task, CRMD_ACTION_DEMOTE)) {
             implied_master_start = counter;
         }
     }
 
     if (*start_index == -1) {
         if (implied_master_start != -1) {
             *start_index = implied_master_start;
         } else if (implied_monitor_start != -1) {
             *start_index = implied_monitor_start;
         }
     }
 }
 
 static resource_t *
 unpack_lrm_rsc_state(node_t * node, xmlNode * rsc_entry, pe_working_set_t * data_set)
 {
     GListPtr gIter = NULL;
     int stop_index = -1;
     int start_index = -1;
     enum rsc_role_e req_role = RSC_ROLE_UNKNOWN;
 
     const char *task = NULL;
     const char *rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
 
     resource_t *rsc = NULL;
     GListPtr op_list = NULL;
     GListPtr sorted_op_list = NULL;
 
     xmlNode *migrate_op = NULL;
     xmlNode *rsc_op = NULL;
 
     enum action_fail_response on_fail = FALSE;
     enum rsc_role_e saved_role = RSC_ROLE_UNKNOWN;
 
     crm_trace("[%s] Processing %s on %s",
               crm_element_name(rsc_entry), rsc_id, node->details->uname);
 
     /* extract operations */
     op_list = NULL;
     sorted_op_list = NULL;
 
     for (rsc_op = __xml_first_child(rsc_entry); rsc_op != NULL; rsc_op = __xml_next(rsc_op)) {
         if (crm_str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP, TRUE)) {
             op_list = g_list_prepend(op_list, rsc_op);
         }
     }
 
     if (op_list == NULL) {
         /* if there are no operations, there is nothing to do */
         return NULL;
     }
 
     /* find the resource */
     rsc = unpack_find_resource(data_set, node, rsc_id, rsc_entry);
     if (rsc == NULL) {
         rsc = process_orphan_resource(rsc_entry, node, data_set);
     }
     CRM_ASSERT(rsc != NULL);
 
     /* process operations */
     saved_role = rsc->role;
     on_fail = action_fail_ignore;
     rsc->role = RSC_ROLE_UNKNOWN;
     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
 
     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
         xmlNode *rsc_op = (xmlNode *) gIter->data;
 
         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
         if (safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
             migrate_op = rsc_op;
         }
 
         unpack_rsc_op(rsc, node, rsc_op, &on_fail, data_set);
     }
 
     /* create active recurring operations as optional */
     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
     process_recurring(node, rsc, start_index, stop_index, sorted_op_list, data_set);
 
     /* no need to free the contents */
     g_list_free(sorted_op_list);
 
     process_rsc_state(rsc, node, on_fail, migrate_op, data_set);
 
     if (get_target_role(rsc, &req_role)) {
         if (rsc->next_role == RSC_ROLE_UNKNOWN || req_role < rsc->next_role) {
             pe_rsc_debug(rsc, "%s: Overwriting calculated next role %s"
                          " with requested next role %s",
                          rsc->id, role2text(rsc->next_role), role2text(req_role));
             rsc->next_role = req_role;
 
         } else if (req_role > rsc->next_role) {
             pe_rsc_info(rsc, "%s: Not overwriting calculated next role %s"
                         " with requested next role %s",
                         rsc->id, role2text(rsc->next_role), role2text(req_role));
         }
     }
 
     if (saved_role > rsc->role) {
         rsc->role = saved_role;
     }
 
     return rsc;
 }
 
 static void
 handle_orphaned_container_fillers(xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
 {
     xmlNode *rsc_entry = NULL;
     for (rsc_entry = __xml_first_child(lrm_rsc_list); rsc_entry != NULL;
         rsc_entry = __xml_next(rsc_entry)) {
 
         resource_t *rsc;
         resource_t *container;
         const char *rsc_id;
         const char *container_id;
 
         if (safe_str_neq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE)) {
             continue;
         }
 
         container_id = crm_element_value(rsc_entry, XML_RSC_ATTR_CONTAINER);
         rsc_id = crm_element_value(rsc_entry, XML_ATTR_ID);
         if (container_id == NULL || rsc_id == NULL) {
             continue;
         }
 
         container = pe_find_resource(data_set->resources, container_id);
         if (container == NULL) {
             continue;
         }
 
         rsc = pe_find_resource(data_set->resources, rsc_id);
         if (rsc == NULL ||
             is_set(rsc->flags, pe_rsc_orphan_container_filler) == FALSE ||
             rsc->container != NULL) {
             continue;
         }
 
         pe_rsc_trace(rsc, "Mapped orphaned rsc %s's container to  %s", rsc->id, container_id);
         rsc->container = container;
         container->fillers = g_list_append(container->fillers, rsc);
     }
 }
 
 gboolean
 unpack_lrm_resources(node_t * node, xmlNode * lrm_rsc_list, pe_working_set_t * data_set)
 {
     xmlNode *rsc_entry = NULL;
     gboolean found_orphaned_container_filler = FALSE;
     GListPtr unexpected_containers = NULL;
     GListPtr gIter = NULL;
     resource_t *remote = NULL;
 
     CRM_CHECK(node != NULL, return FALSE);
 
     crm_trace("Unpacking resources on %s", node->details->uname);
 
     for (rsc_entry = __xml_first_child(lrm_rsc_list); rsc_entry != NULL;
          rsc_entry = __xml_next(rsc_entry)) {
 
         if (crm_str_eq((const char *)rsc_entry->name, XML_LRM_TAG_RESOURCE, TRUE)) {
             resource_t *rsc;
             rsc = unpack_lrm_rsc_state(node, rsc_entry, data_set);
             if (!rsc) {
                 continue;
             }
             if (is_set(rsc->flags, pe_rsc_orphan_container_filler)) {
                 found_orphaned_container_filler = TRUE;
             }
             if (is_set(rsc->flags, pe_rsc_unexpectedly_running)) {
                 remote = rsc_contains_remote_node(data_set, rsc);
                 if (remote) {
                     unexpected_containers = g_list_append(unexpected_containers, remote);
                 }
             }
         }
     }
 
     /* If a container resource is unexpectedly up... and the remote-node
      * connection resource for that container is not up, the entire container
      * must be recovered. */
     for (gIter = unexpected_containers; gIter != NULL; gIter = gIter->next) {
         remote = (resource_t *) gIter->data;
         if (remote->role != RSC_ROLE_STARTED) {
             crm_warn("Recovering container resource %s. Resource is unexpectedly running and involves a remote-node.", remote->container->id);
             set_bit(remote->container->flags, pe_rsc_failed);
         }
     }
 
     /* now that all the resource state has been unpacked for this node
      * we have to go back and map any orphaned container fillers to their
      * container resource */
     if (found_orphaned_container_filler) {
         handle_orphaned_container_fillers(lrm_rsc_list, data_set);
     }
     g_list_free(unexpected_containers);
     return TRUE;
 }
 
 static void
 set_active(resource_t * rsc)
 {
     resource_t *top = uber_parent(rsc);
 
     if (top && top->variant == pe_master) {
         rsc->role = RSC_ROLE_SLAVE;
     } else {
         rsc->role = RSC_ROLE_STARTED;
     }
 }
 
 static void
 set_node_score(gpointer key, gpointer value, gpointer user_data)
 {
     node_t *node = value;
     int *score = user_data;
 
     node->weight = *score;
 }
 
 #define STATUS_PATH_MAX 1024
 static xmlNode *
 find_lrm_op(const char *resource, const char *op, const char *node, const char *source,
             pe_working_set_t * data_set)
 {
     int offset = 0;
     char xpath[STATUS_PATH_MAX];
 
     offset += snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//node_state[@uname='%s']", node);
     offset +=
         snprintf(xpath + offset, STATUS_PATH_MAX - offset, "//" XML_LRM_TAG_RESOURCE "[@id='%s']",
                  resource);
 
     /* Need to check against transition_magic too? */
     if (source && safe_str_eq(op, CRMD_ACTION_MIGRATE)) {
         offset +=
             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_target='%s']", op,
                      source);
     } else if (source && safe_str_eq(op, CRMD_ACTION_MIGRATED)) {
         offset +=
             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s' and @migrate_source='%s']", op,
                      source);
     } else {
         offset +=
             snprintf(xpath + offset, STATUS_PATH_MAX - offset,
                      "/" XML_LRM_TAG_RSC_OP "[@operation='%s']", op);
     }
 
     CRM_LOG_ASSERT(offset > 0);
     return get_xpath_object(xpath, data_set->input, LOG_DEBUG);
 }
 
 static void
 unpack_rsc_migration(resource_t *rsc, node_t *node, xmlNode *xml_op, pe_working_set_t * data_set) 
 {
                 
     /*
      * The normal sequence is (now): migrate_to(Src) -> migrate_from(Tgt) -> stop(Src)
      *
      * So if a migrate_to is followed by a stop, then we dont need to care what
      * happended on the target node
      *
      * Without the stop, we need to look for a successful migrate_from.
      * This would also imply we're no longer running on the source
      *
      * Without the stop, and without a migrate_from op we make sure the resource
      * gets stopped on both source and target (assuming the target is up)
      *
      */
     int stop_id = 0;
     int task_id = 0;
     xmlNode *stop_op =
         find_lrm_op(rsc->id, CRMD_ACTION_STOP, node->details->id, NULL, data_set);
 
     if (stop_op) {
         crm_element_value_int(stop_op, XML_LRM_ATTR_CALLID, &stop_id);
     }
 
     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
 
     if (stop_op == NULL || stop_id < task_id) {
         int from_rc = 0, from_status = 0;
         const char *migrate_source =
             crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
         const char *migrate_target =
             crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
 
         node_t *target = pe_find_node(data_set->nodes, migrate_target);
         node_t *source = pe_find_node(data_set->nodes, migrate_source);
         xmlNode *migrate_from =
             find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, migrate_target, migrate_source,
                         data_set);
 
         rsc->role = RSC_ROLE_STARTED;       /* can be master? */
         if (migrate_from) {
             crm_element_value_int(migrate_from, XML_LRM_ATTR_RC, &from_rc);
             crm_element_value_int(migrate_from, XML_LRM_ATTR_OPSTATUS, &from_status);
             pe_rsc_trace(rsc, "%s op on %s exited with status=%d, rc=%d",
                          ID(migrate_from), migrate_target, from_status, from_rc);
         }
 
         if (migrate_from && from_rc == PCMK_OCF_OK
             && from_status == PCMK_LRM_OP_DONE) {
             pe_rsc_trace(rsc, "Detected dangling migration op: %s on %s", ID(xml_op),
                          migrate_source);
 
             /* all good
              * just need to arrange for the stop action to get sent
              * but _without_ affecting the target somehow
              */
             rsc->role = RSC_ROLE_STOPPED;
             rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
 
         } else if (migrate_from) {  /* Failed */
             if (target && target->details->online) {
                 pe_rsc_trace(rsc, "Marking active on %s %p %d", migrate_target, target,
                              target->details->online);
                 native_add_running(rsc, target, data_set);
             }
 
         } else {    /* Pending or complete but erased */
             if (target && target->details->online) {
                 pe_rsc_trace(rsc, "Marking active on %s %p %d", migrate_target, target,
                              target->details->online);
 
                 native_add_running(rsc, target, data_set);
                 if (source && source->details->online) {
                     /* If we make it here we have a partial migration.  The migrate_to
                      * has completed but the migrate_from on the target has not. Hold on
                      * to the target and source on the resource. Later on if we detect that
                      * the resource is still going to run on that target, we may continue
                      * the migration */
                     rsc->partial_migration_target = target;
                     rsc->partial_migration_source = source;
                 }
             } else {
                 /* Consider it failed here - forces a restart, prevents migration */
                 set_bit(rsc->flags, pe_rsc_failed);
                 clear_bit(rsc->flags, pe_rsc_allow_migrate);
             }
         }
     }
 }
 
 static void
 unpack_rsc_migration_failure(resource_t *rsc, node_t *node, xmlNode *xml_op, pe_working_set_t * data_set) 
 {
     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
 
     CRM_ASSERT(rsc);
     if (safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
         int stop_id = 0;
         int migrate_id = 0;
         const char *migrate_source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
         const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
 
         xmlNode *stop_op =
             find_lrm_op(rsc->id, CRMD_ACTION_STOP, migrate_source, NULL, data_set);
         xmlNode *migrate_op =
             find_lrm_op(rsc->id, CRMD_ACTION_MIGRATE, migrate_source, migrate_target,
                         data_set);
 
         if (stop_op) {
             crm_element_value_int(stop_op, XML_LRM_ATTR_CALLID, &stop_id);
         }
         if (migrate_op) {
             crm_element_value_int(migrate_op, XML_LRM_ATTR_CALLID, &migrate_id);
         }
 
         /* Get our state right */
         rsc->role = RSC_ROLE_STARTED;   /* can be master? */
 
         if (stop_op == NULL || stop_id < migrate_id) {
             node_t *source = pe_find_node(data_set->nodes, migrate_source);
 
             if (source && source->details->online) {
                 native_add_running(rsc, source, data_set);
             }
         }
 
     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE)) {
         int stop_id = 0;
         int migrate_id = 0;
         const char *migrate_source = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_SOURCE);
         const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
 
         xmlNode *stop_op =
             find_lrm_op(rsc->id, CRMD_ACTION_STOP, migrate_target, NULL, data_set);
         xmlNode *migrate_op =
             find_lrm_op(rsc->id, CRMD_ACTION_MIGRATED, migrate_target, migrate_source,
                         data_set);
 
         if (stop_op) {
             crm_element_value_int(stop_op, XML_LRM_ATTR_CALLID, &stop_id);
         }
         if (migrate_op) {
             crm_element_value_int(migrate_op, XML_LRM_ATTR_CALLID, &migrate_id);
         }
 
         /* Get our state right */
         rsc->role = RSC_ROLE_STARTED;   /* can be master? */
 
         if (stop_op == NULL || stop_id < migrate_id) {
             node_t *target = pe_find_node(data_set->nodes, migrate_target);
 
             pe_rsc_trace(rsc, "Stop: %p %d, Migrated: %p %d", stop_op, stop_id, migrate_op,
                          migrate_id);
             if (target && target->details->online) {
                 native_add_running(rsc, target, data_set);
             }
 
         } else if (migrate_op == NULL) {
             /* Make sure it gets cleaned up, the stop may pre-date the migrate_from */
             rsc->dangling_migrations = g_list_prepend(rsc->dangling_migrations, node);
         }
     }
 }
 
 static void
 record_failed_op(xmlNode *op, node_t* node, pe_working_set_t * data_set)
 {
     xmlNode *xIter = NULL;
     const char *op_key = crm_element_value(op, XML_LRM_ATTR_TASK_KEY);
 
     if (node->details->shutdown) {
         return;
     } else if(node->details->online == FALSE) {
         return;
     }
 
     for (xIter = data_set->failed->children; xIter; xIter = xIter->next) {
         const char *key = crm_element_value(xIter, XML_LRM_ATTR_TASK_KEY);
         const char *uname = crm_element_value(xIter, XML_ATTR_UNAME);
 
         if(safe_str_eq(op_key, key) && safe_str_eq(uname, node->details->uname)) {
             crm_trace("Skipping duplicate entry %s on %s", op_key, node->details->uname);
             return;
         }
     }
 
     crm_trace("Adding entry %s on %s", op_key, node->details->uname);
     crm_xml_add(op, XML_ATTR_UNAME, node->details->uname);
     add_node_copy(data_set->failed, op);
 }
 
 static const char *get_op_key(xmlNode *xml_op)
 {
     const char *key = crm_element_value(xml_op, XML_LRM_ATTR_TASK_KEY);
     if(key == NULL) {
         key = ID(xml_op);
     }
     return key;
 }
 
 static void
 unpack_rsc_op_failure(resource_t *rsc, node_t *node, int rc, xmlNode *xml_op, enum action_fail_response *on_fail, pe_working_set_t * data_set) 
 {
     int interval = 0;
     bool is_probe = FALSE;
     action_t *action = NULL;
 
     const char *key = get_op_key(xml_op);
     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
     const char *op_version = crm_element_value(xml_op, XML_ATTR_CRM_VERSION);
 
     CRM_ASSERT(rsc);
     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
     if(interval == 0 && safe_str_eq(task, CRMD_ACTION_STATUS)) {
         is_probe = TRUE;
         pe_rsc_trace(rsc, "is a probe: %s", key);
     }
 
     if (rc != PCMK_OCF_NOT_INSTALLED || is_set(data_set->flags, pe_flag_symmetric_cluster)) {
         crm_warn("Processing failed op %s for %s on %s: %s (%d)",
                  task, rsc->id, node->details->uname, services_ocf_exitcode_str(rc),
                  rc);
 
         record_failed_op(xml_op, node, data_set);
 
     } else {
         crm_trace("Processing failed op %s for %s on %s: %s (%d)",
                  task, rsc->id, node->details->uname, services_ocf_exitcode_str(rc),
                  rc);
     }
 
     action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
     if ((action->on_fail <= action_fail_fence && *on_fail < action->on_fail) ||
         (action->on_fail == action_fail_reset_remote && *on_fail <= action_fail_recover) ||
         (action->on_fail == action_fail_restart_container && *on_fail <= action_fail_recover) ||
         (*on_fail == action_fail_restart_container && action->on_fail >= action_fail_migrate)) {
         pe_rsc_trace(rsc, "on-fail %s -> %s for %s (%s)", fail2text(*on_fail),
                      fail2text(action->on_fail), action->uuid, key);
         *on_fail = action->on_fail;
     }
 
     if (safe_str_eq(task, CRMD_ACTION_STOP)) {
         resource_location(rsc, node, -INFINITY, "__stop_fail__", data_set);
 
     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE) || safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
         unpack_rsc_migration_failure(rsc, node, xml_op, data_set);
 
     } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE)) {
         rsc->role = RSC_ROLE_MASTER;
 
     } else if (safe_str_eq(task, CRMD_ACTION_DEMOTE)) {
         /*
          * staying in role=master ends up putting the PE/TE into a loop
          * setting role=slave is not dangerous because no master will be
          * promoted until the failed resource has been fully stopped
          */
         rsc->next_role = RSC_ROLE_STOPPED;
         if (action->on_fail == action_fail_block) {
             rsc->role = RSC_ROLE_MASTER;
 
         } else {
             crm_warn("Forcing %s to stop after a failed demote action", rsc->id);
             rsc->role = RSC_ROLE_SLAVE;
         }
 
     } else if (compare_version("2.0", op_version) > 0 && safe_str_eq(task, CRMD_ACTION_START)) {
         crm_warn("Compatibility handling for failed op %s on %s", key, node->details->uname);
         resource_location(rsc, node, -INFINITY, "__legacy_start__", data_set);
     }
 
     if(is_probe && rc == PCMK_OCF_NOT_INSTALLED) {
         /* leave stopped */
         pe_rsc_trace(rsc, "Leaving %s stopped", rsc->id);
         rsc->role = RSC_ROLE_STOPPED;
 
     } else if (rsc->role < RSC_ROLE_STARTED) {
         pe_rsc_trace(rsc, "Setting %s active", rsc->id);
         set_active(rsc);
     }
 
     pe_rsc_trace(rsc, "Resource %s: role=%s, unclean=%s, on_fail=%s, fail_role=%s",
                  rsc->id, role2text(rsc->role),
                  node->details->unclean ? "true" : "false",
                  fail2text(action->on_fail), role2text(action->fail_role));
 
     if (action->fail_role != RSC_ROLE_STARTED && rsc->next_role < action->fail_role) {
         rsc->next_role = action->fail_role;
     }
 
     if (action->fail_role == RSC_ROLE_STOPPED) {
         int score = -INFINITY;
 
         resource_t *fail_rsc = rsc;
 
         if (fail_rsc->parent) {
             resource_t *parent = uber_parent(fail_rsc);
 
             if ((parent->variant == pe_clone || parent->variant == pe_master)
                 && is_not_set(parent->flags, pe_rsc_unique)) {
                 /* for clone and master resources, if a child fails on an operation
                  * with on-fail = stop, all the resources fail.  Do this by preventing
                  * the parent from coming up again. */
                 fail_rsc = parent;
             }
         }
         crm_warn("Making sure %s doesn't come up again", fail_rsc->id);
         /* make sure it doesnt come up again */
         g_hash_table_destroy(fail_rsc->allowed_nodes);
         fail_rsc->allowed_nodes = node_hash_from_list(data_set->nodes);
         g_hash_table_foreach(fail_rsc->allowed_nodes, set_node_score, &score);
     }
 
     pe_free_action(action);
 }
 
 static int
 determine_op_status(
     resource_t *rsc, int rc, int target_rc, node_t * node, xmlNode * xml_op, enum action_fail_response * on_fail, pe_working_set_t * data_set) 
 {
     int interval = 0;
     int result = PCMK_LRM_OP_DONE;
 
     const char *key = get_op_key(xml_op);
     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
 
     bool is_probe = FALSE;
 
     CRM_ASSERT(rsc);
     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
     if (interval == 0 && safe_str_eq(task, CRMD_ACTION_STATUS)) {
         is_probe = TRUE;
     }
 
     if (target_rc >= 0 && target_rc != rc) {
         result = PCMK_LRM_OP_ERROR;
         pe_rsc_debug(rsc, "%s on %s returned '%s' (%d) instead of the expected value: '%s' (%d)",
                      key, node->details->uname,
                      services_ocf_exitcode_str(rc), rc,
                      services_ocf_exitcode_str(target_rc), target_rc);
     }
 
     /* we could clean this up significantly except for old LRMs and CRMs that
      * didnt include target_rc and liked to remap status
      */
     switch (rc) {
         case PCMK_OCF_OK:
             if (is_probe && target_rc == 7) {
                 result = PCMK_LRM_OP_DONE;
                 set_bit(rsc->flags, pe_rsc_unexpectedly_running);
                 pe_rsc_info(rsc, "Operation %s found resource %s active on %s",
                             task, rsc->id, node->details->uname);
 
                 /* legacy code for pre-0.6.5 operations */
             } else if (target_rc < 0 && interval > 0 && rsc->role == RSC_ROLE_MASTER) {
                 /* catch status ops that return 0 instead of 8 while they
                  *   are supposed to be in master mode
                  */
                 result = PCMK_LRM_OP_ERROR;
             }
             break;
 
         case PCMK_OCF_NOT_RUNNING:
             if (is_probe || target_rc == rc || is_not_set(rsc->flags, pe_rsc_managed)) {
                 result = PCMK_LRM_OP_DONE;
                 rsc->role = RSC_ROLE_STOPPED;
 
                 /* clear any previous failure actions */
                 *on_fail = action_fail_ignore;
                 rsc->next_role = RSC_ROLE_UNKNOWN;
 
             } else if (safe_str_neq(task, CRMD_ACTION_STOP)) {
                 result = PCMK_LRM_OP_ERROR;
             }
             break;
 
         case PCMK_OCF_RUNNING_MASTER:
             if (is_probe) {
                 result = PCMK_LRM_OP_DONE;
                 pe_rsc_info(rsc, "Operation %s found resource %s active in master mode on %s",
                             task, rsc->id, node->details->uname);
 
             } else if (target_rc == rc) {
                 /* nothing to do */
 
             } else if (target_rc >= 0) {
                 result = PCMK_LRM_OP_ERROR;
 
                 /* legacy code for pre-0.6.5 operations */
             } else if (safe_str_neq(task, CRMD_ACTION_STATUS)
                        || rsc->role != RSC_ROLE_MASTER) {
                 result = PCMK_LRM_OP_ERROR;
                 if (rsc->role != RSC_ROLE_MASTER) {
                     crm_err("%s reported %s in master mode on %s",
                             key, rsc->id, node->details->uname);
                 }
             }
             rsc->role = RSC_ROLE_MASTER;
             break;
 
         case PCMK_OCF_DEGRADED_MASTER:
         case PCMK_OCF_FAILED_MASTER:
             rsc->role = RSC_ROLE_MASTER;
             result = PCMK_LRM_OP_ERROR;
             break;
 
         case PCMK_OCF_NOT_CONFIGURED:
             result = PCMK_LRM_OP_ERROR_FATAL;
             break;
 
         case PCMK_OCF_NOT_INSTALLED:
         case PCMK_OCF_INVALID_PARAM:
         case PCMK_OCF_INSUFFICIENT_PRIV:
         case PCMK_OCF_UNIMPLEMENT_FEATURE:
             if (rc == PCMK_OCF_UNIMPLEMENT_FEATURE && interval > 0) {
                 result = PCMK_LRM_OP_NOTSUPPORTED;
                 break;
 
             } else if(pe_can_fence(data_set, node) == FALSE
                && safe_str_eq(task, CRMD_ACTION_STOP)) {
                 /* If a stop fails and we can't fence, there's nothing else we can do */
                 pe_proc_err("No further recovery can be attempted for %s: %s action failed with '%s' (%d)",
                             rsc->id, task, services_ocf_exitcode_str(rc), rc);
                 clear_bit(rsc->flags, pe_rsc_managed);
                 set_bit(rsc->flags, pe_rsc_block);
             }
             result = PCMK_LRM_OP_ERROR_HARD;
             break;
 
         default:
             if (result == PCMK_LRM_OP_DONE) {
                 crm_info("Treating %s (rc=%d) on %s as an ERROR",
                          key, rc, node->details->uname);
                 result = PCMK_LRM_OP_ERROR;
             }
     }
 
     return result;
 }
 
 static bool check_operation_expiry(resource_t *rsc, node_t *node, int rc, xmlNode *xml_op, pe_working_set_t * data_set)
 {
     bool expired = FALSE;
     time_t last_failure = 0;
     int clear_failcount = 0;
     int interval = 0;
     const char *key = get_op_key(xml_op);
     const char *task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
 
     if (rsc->failure_timeout > 0) {
         int last_run = 0;
 
         if (crm_element_value_int(xml_op, XML_RSC_OP_LAST_CHANGE, &last_run) == 0) {
             time_t now = get_effective_time(data_set);
 
             if (now > (last_run + rsc->failure_timeout)) {
                 expired = TRUE;
             }
         }
     }
 
     if (expired) {
         if (rsc->failure_timeout > 0) {
             int fc = get_failcount_full(node, rsc, &last_failure, FALSE, xml_op, data_set);
             if(fc) {
                 if (get_failcount_full(node, rsc, &last_failure, TRUE, xml_op, data_set) == 0) {
                     clear_failcount = 1;
                     crm_notice("Clearing expired failcount for %s on %s", rsc->id, node->details->uname);
 
                 } else {
                     expired = FALSE;
                 }
             }
         }
 
     } else if (strstr(ID(xml_op), "last_failure") &&
                ((strcmp(task, "start") == 0) || (strcmp(task, "monitor") == 0))) {
 
         op_digest_cache_t *digest_data = NULL;
 
         digest_data = rsc_action_digest_cmp(rsc, xml_op, node, data_set);
 
         if (digest_data->rc == RSC_DIGEST_UNKNOWN) {
             crm_trace("rsc op %s on node %s does not have a op digest to compare against", rsc->id,
                       key, node->details->id);
         } else if (digest_data->rc != RSC_DIGEST_MATCH) {
             clear_failcount = 1;
             crm_info
                 ("Clearing failcount for %s on %s, %s failed and now resource parameters have changed.",
                  task, rsc->id, node->details->uname);
         }
     }
 
     if (clear_failcount) {
         action_t *clear_op = NULL;
 
         clear_op = custom_action(rsc, crm_concat(rsc->id, CRM_OP_CLEAR_FAILCOUNT, '_'),
                                  CRM_OP_CLEAR_FAILCOUNT, node, FALSE, TRUE, data_set);
         add_hash_param(clear_op->meta, XML_ATTR_TE_NOWAIT, XML_BOOLEAN_TRUE);
     }
 
     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
     if(expired && interval == 0 && safe_str_eq(task, CRMD_ACTION_STATUS)) {
         switch(rc) {
             case PCMK_OCF_OK:
             case PCMK_OCF_NOT_RUNNING:
             case PCMK_OCF_RUNNING_MASTER:
             case PCMK_OCF_DEGRADED:
             case PCMK_OCF_DEGRADED_MASTER:
                 /* Don't expire probes that return these values */ 
                 expired = FALSE;
                 break;
         }
     }
     
     return expired;
 }
 
 int get_target_rc(xmlNode *xml_op)
 {
     int dummy = 0;
     int target_rc = 0;
     char *dummy_string = NULL;
     const char *key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
     if (key == NULL) {
         return -1;
     }
 
     decode_transition_key(key, &dummy_string, &dummy, &dummy, &target_rc);
     free(dummy_string);
 
     return target_rc;
 }
 
 static enum action_fail_response
 get_action_on_fail(resource_t *rsc, const char *key, const char *task, pe_working_set_t * data_set) 
 {
     int result = action_fail_recover;
     action_t *action = custom_action(rsc, strdup(key), task, NULL, TRUE, FALSE, data_set);
 
     result = action->on_fail;
     pe_free_action(action);
 
     return result;
 }
 
 static void
 update_resource_state(resource_t *rsc, node_t * node, xmlNode * xml_op, const char *task, int rc,
                       enum action_fail_response *on_fail, pe_working_set_t * data_set) 
 {
     gboolean clear_past_failure = FALSE;
 
     CRM_ASSERT(rsc);
     if (rc == PCMK_OCF_NOT_RUNNING) {
         clear_past_failure = TRUE;
 
     } else if (rc == PCMK_OCF_NOT_INSTALLED) {
         rsc->role = RSC_ROLE_STOPPED;
 
     } else if (safe_str_eq(task, CRMD_ACTION_STATUS)) {
         clear_past_failure = TRUE;
         if (rsc->role < RSC_ROLE_STARTED) {
             set_active(rsc);
         }
 
     } else if (safe_str_eq(task, CRMD_ACTION_START)) {
         rsc->role = RSC_ROLE_STARTED;
         clear_past_failure = TRUE;
 
     } else if (safe_str_eq(task, CRMD_ACTION_STOP)) {
         rsc->role = RSC_ROLE_STOPPED;
         clear_past_failure = TRUE;
 
     } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE)) {
         rsc->role = RSC_ROLE_MASTER;
         clear_past_failure = TRUE;
 
     } else if (safe_str_eq(task, CRMD_ACTION_DEMOTE)) {
         /* Demote from Master does not clear an error */
         rsc->role = RSC_ROLE_SLAVE;
 
     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATED)) {
         rsc->role = RSC_ROLE_STARTED;
         clear_past_failure = TRUE;
 
     } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE)) {
         unpack_rsc_migration(rsc, node, xml_op, data_set);
 
     } else if (rsc->role < RSC_ROLE_STARTED) {
         /* migrate_to and migrate_from will land here */
         pe_rsc_trace(rsc, "%s active on %s", rsc->id, node->details->uname);
         set_active(rsc);
     }
 
     /* clear any previous failure actions */
     if (clear_past_failure) {
         switch (*on_fail) {
             case action_fail_stop:
             case action_fail_fence:
             case action_fail_migrate:
             case action_fail_standby:
                 pe_rsc_trace(rsc, "%s.%s is not cleared by a completed stop",
                              rsc->id, fail2text(*on_fail));
                 break;
 
             case action_fail_block:
             case action_fail_ignore:
             case action_fail_recover:
             case action_fail_restart_container:
                 *on_fail = action_fail_ignore;
                 rsc->next_role = RSC_ROLE_UNKNOWN;
                 break;
             case action_fail_reset_remote:
                 if (rsc->remote_reconnect_interval == 0) {
                     /* when reconnect delay is not in use, the connection is allowed
                      * to start again after the remote node is fenced and completely
                      * stopped. Otherwise, with reconnect delay we wait for the failure
                      * to be cleared entirely before reconnected can be attempted. */ 
                     *on_fail = action_fail_ignore;
                     rsc->next_role = RSC_ROLE_UNKNOWN;
                 }
                 break;
         }
     }
 }
 
 gboolean
 unpack_rsc_op(resource_t * rsc, node_t * node, xmlNode * xml_op,
               enum action_fail_response * on_fail, pe_working_set_t * data_set)
 {
     int task_id = 0;
 
     const char *key = NULL;
     const char *task = NULL;
     const char *task_key = NULL;
 
     int rc = 0;
     int status = PCMK_LRM_OP_PENDING-1;
     int target_rc = get_target_rc(xml_op);
     int interval = 0;
 
     gboolean expired = FALSE;
     resource_t *parent = rsc;
     enum action_fail_response failure_strategy = action_fail_recover;
 
     CRM_CHECK(rsc != NULL, return FALSE);
     CRM_CHECK(node != NULL, return FALSE);
     CRM_CHECK(xml_op != NULL, return FALSE);
 
     task_key = get_op_key(xml_op);
 
     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
     key = crm_element_value(xml_op, XML_ATTR_TRANSITION_KEY);
 
     crm_element_value_int(xml_op, XML_LRM_ATTR_RC, &rc);
     crm_element_value_int(xml_op, XML_LRM_ATTR_CALLID, &task_id);
     crm_element_value_int(xml_op, XML_LRM_ATTR_OPSTATUS, &status);
     crm_element_value_int(xml_op, XML_LRM_ATTR_INTERVAL, &interval);
 
     CRM_CHECK(task != NULL, return FALSE);
     CRM_CHECK(status <= PCMK_LRM_OP_NOT_INSTALLED, return FALSE);
     CRM_CHECK(status >= PCMK_LRM_OP_PENDING, return FALSE);
 
     if (safe_str_eq(task, CRMD_ACTION_NOTIFY)) {
         /* safe to ignore these */
         return TRUE;
     }
 
     if (is_not_set(rsc->flags, pe_rsc_unique)) {
         parent = uber_parent(rsc);
     }
     
     pe_rsc_trace(rsc, "Unpacking task %s/%s (call_id=%d, status=%d, rc=%d) on %s (role=%s)",
                  task_key, task, task_id, status, rc, node->details->uname, role2text(rsc->role));
 
     if (node->details->unclean) {
         pe_rsc_trace(rsc, "Node %s (where %s is running) is unclean."
                      " Further action depends on the value of the stop's on-fail attribue",
                      node->details->uname, rsc->id);
     }
 
     if (status == PCMK_LRM_OP_ERROR) {
         /* Older versions set this if rc != 0 but its up to us to decide */
         status = PCMK_LRM_OP_DONE;
     }
 
     if(status != PCMK_LRM_OP_NOT_INSTALLED) {
         expired = check_operation_expiry(rsc, node, rc, xml_op, data_set);
     }
 
     /* Degraded results are informational only, re-map them to their error-free equivalents */
     if (rc == PCMK_OCF_DEGRADED && safe_str_eq(task, CRMD_ACTION_STATUS)) {
         rc = PCMK_OCF_OK;
 
         /* Add them to the failed list to highlight them for the user */
         if ((node->details->shutdown == FALSE) || (node->details->online == TRUE)) {
             crm_trace("Remapping %d to %d", PCMK_OCF_DEGRADED, PCMK_OCF_OK);
             record_failed_op(xml_op, node, data_set);
         }
 
     } else if (rc == PCMK_OCF_DEGRADED_MASTER && safe_str_eq(task, CRMD_ACTION_STATUS)) {
         rc = PCMK_OCF_RUNNING_MASTER;
 
         /* Add them to the failed list to highlight them for the user */
         if ((node->details->shutdown == FALSE) || (node->details->online == TRUE)) {
             crm_trace("Remapping %d to %d", PCMK_OCF_DEGRADED_MASTER, PCMK_OCF_RUNNING_MASTER);
             record_failed_op(xml_op, node, data_set);
         }
     }
 
     if (expired && target_rc != rc) {
         const char *magic = crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC);
 
         pe_rsc_debug(rsc, "Expired operation '%s' on %s returned '%s' (%d) instead of the expected value: '%s' (%d)",
                      key, node->details->uname,
                      services_ocf_exitcode_str(rc), rc,
                      services_ocf_exitcode_str(target_rc), target_rc);
 
         if(interval == 0) {
             crm_notice("Ignoring expired calculated failure %s (rc=%d, magic=%s) on %s",
                        task_key, rc, magic, node->details->uname);
             goto done;
 
         } else if(node->details->online && node->details->unclean == FALSE) {
             crm_notice("Re-initiated expired calculated failure %s (rc=%d, magic=%s) on %s",
                        task_key, rc, magic, node->details->uname);
             /* This is SO horrible, but we don't have access to CancelXmlOp() yet */
             crm_xml_add(xml_op, XML_LRM_ATTR_RESTART_DIGEST, "calculated-failure-timeout");
             goto done;
         }
     }
 
     if(status == PCMK_LRM_OP_DONE || status == PCMK_LRM_OP_ERROR) {
         status = determine_op_status(rsc, rc, target_rc, node, xml_op, on_fail, data_set);
     }
 
     pe_rsc_trace(rsc, "Handling status: %d", status);
     switch (status) {
         case PCMK_LRM_OP_CANCELLED:
             /* do nothing?? */
             pe_err("Dont know what to do for cancelled ops yet");
             break;
 
         case PCMK_LRM_OP_PENDING:
             if (safe_str_eq(task, CRMD_ACTION_START)) {
                 set_bit(rsc->flags, pe_rsc_start_pending);
                 set_active(rsc);
 
             } else if (safe_str_eq(task, CRMD_ACTION_PROMOTE)) {
                 rsc->role = RSC_ROLE_MASTER;
 
             } else if (safe_str_eq(task, CRMD_ACTION_MIGRATE) && node->details->unclean) {
                 /* If a pending migrate_to action is out on a unclean node,
                  * we have to force the stop action on the target. */
                 const char *migrate_target = crm_element_value(xml_op, XML_LRM_ATTR_MIGRATE_TARGET);
                 node_t *target = pe_find_node(data_set->nodes, migrate_target);
                 if (target) {
                     stop_action(rsc, target, FALSE);
                 }
             }
 
             if (rsc->pending_task == NULL) {
                 if (safe_str_eq(task, CRMD_ACTION_STATUS) && interval == 0) {
                     /* Comment this out until someone requests it */
                     /* Comment this out until cl#5184 is fixed */
                     /*rsc->pending_task = strdup("probe");*/
 
                 } else {
                     rsc->pending_task = strdup(task);
                 }
             }
             break;
 
         case PCMK_LRM_OP_DONE:
             pe_rsc_trace(rsc, "%s/%s completed on %s", rsc->id, task, node->details->uname);
             update_resource_state(rsc, node, xml_op, task, rc, on_fail, data_set);
             break;
 
         case PCMK_LRM_OP_NOT_INSTALLED:
             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
             if (failure_strategy == action_fail_ignore) {
                 crm_warn("Cannot ignore failed %s (status=%d, rc=%d) on %s: "
                          "Resource agent doesn't exist",
                          task_key, status, rc, node->details->uname);
                 /* Also for printing it as "FAILED" by marking it as pe_rsc_failed later */
                 *on_fail = action_fail_migrate;
             }
             resource_location(parent, node, -INFINITY, "hard-error", data_set);
             unpack_rsc_op_failure(rsc, node, rc, xml_op, on_fail, data_set);
             break;
 
         case PCMK_LRM_OP_ERROR:
         case PCMK_LRM_OP_ERROR_HARD:
         case PCMK_LRM_OP_ERROR_FATAL:
         case PCMK_LRM_OP_TIMEOUT:
         case PCMK_LRM_OP_NOTSUPPORTED:
 
             failure_strategy = get_action_on_fail(rsc, task_key, task, data_set);
             if ((failure_strategy == action_fail_ignore)
                 || (failure_strategy == action_fail_restart_container
                     && safe_str_eq(task, CRMD_ACTION_STOP))) {
 
                 crm_warn("Pretending the failure of %s (rc=%d) on %s succeeded",
                          task_key, rc, node->details->uname);
 
                 update_resource_state(rsc, node, xml_op, task, target_rc, on_fail, data_set);
                 crm_xml_add(xml_op, XML_ATTR_UNAME, node->details->uname);
                 set_bit(rsc->flags, pe_rsc_failure_ignored);
 
                 record_failed_op(xml_op, node, data_set);
 
                 if (failure_strategy == action_fail_restart_container && *on_fail <= action_fail_recover) {
                     *on_fail = failure_strategy;
                 }
 
             } else {
                 unpack_rsc_op_failure(rsc, node, rc, xml_op, on_fail, data_set);
 
                 if(status == PCMK_LRM_OP_ERROR_HARD) {
                     do_crm_log(rc != PCMK_OCF_NOT_INSTALLED?LOG_ERR:LOG_NOTICE,
                                "Preventing %s from re-starting on %s: operation %s failed '%s' (%d)",
                                parent->id, node->details->uname,
                                task, services_ocf_exitcode_str(rc), rc);
 
                     resource_location(parent, node, -INFINITY, "hard-error", data_set);
 
                 } else if(status == PCMK_LRM_OP_ERROR_FATAL) {
                     crm_err("Preventing %s from re-starting anywhere: operation %s failed '%s' (%d)",
                             parent->id, task, services_ocf_exitcode_str(rc), rc);
 
                     resource_location(parent, NULL, -INFINITY, "fatal-error", data_set);
                 }
             }
             break;
     }
 
   done:
     pe_rsc_trace(rsc, "Resource %s after %s: role=%s", rsc->id, task, role2text(rsc->role));
     return TRUE;
 }
 
 gboolean
 add_node_attrs(xmlNode * xml_obj, node_t * node, gboolean overwrite, pe_working_set_t * data_set)
 {
     const char *cluster_name = NULL;
 
     g_hash_table_insert(node->details->attrs,
                         strdup("#uname"), strdup(node->details->uname));
 
     g_hash_table_insert(node->details->attrs, strdup("#" XML_ATTR_ID), strdup(node->details->id));
     if (safe_str_eq(node->details->id, data_set->dc_uuid)) {
         data_set->dc_node = node;
         node->details->is_dc = TRUE;
         g_hash_table_insert(node->details->attrs,
                             strdup("#" XML_ATTR_DC), strdup(XML_BOOLEAN_TRUE));
     } else {
         g_hash_table_insert(node->details->attrs,
                             strdup("#" XML_ATTR_DC), strdup(XML_BOOLEAN_FALSE));
     }
 
     cluster_name = g_hash_table_lookup(data_set->config_hash, "cluster-name");
     if (cluster_name) {
         g_hash_table_insert(node->details->attrs, strdup("#cluster-name"), strdup(cluster_name));
     }
 
     unpack_instance_attributes(data_set->input, xml_obj, XML_TAG_ATTR_SETS, NULL,
                                node->details->attrs, NULL, overwrite, data_set->now);
 
     if (g_hash_table_lookup(node->details->attrs, "#site-name") == NULL) {
         const char *site_name = g_hash_table_lookup(node->details->attrs, "site-name");
 
         if (site_name) {
             /* Prefix '#' to the key */
             g_hash_table_insert(node->details->attrs, strdup("#site-name"), strdup(site_name));
 
         } else if (cluster_name) {
             /* Default to cluster-name if unset */
             g_hash_table_insert(node->details->attrs, strdup("#site-name"), strdup(cluster_name));
         }
     }
     return TRUE;
 }
 
 static GListPtr
 extract_operations(const char *node, const char *rsc, xmlNode * rsc_entry, gboolean active_filter)
 {
     int counter = -1;
     int stop_index = -1;
     int start_index = -1;
 
     xmlNode *rsc_op = NULL;
 
     GListPtr gIter = NULL;
     GListPtr op_list = NULL;
     GListPtr sorted_op_list = NULL;
 
     /* extract operations */
     op_list = NULL;
     sorted_op_list = NULL;
 
     for (rsc_op = __xml_first_child(rsc_entry); rsc_op != NULL; rsc_op = __xml_next(rsc_op)) {
         if (crm_str_eq((const char *)rsc_op->name, XML_LRM_TAG_RSC_OP, TRUE)) {
             crm_xml_add(rsc_op, "resource", rsc);
             crm_xml_add(rsc_op, XML_ATTR_UNAME, node);
             op_list = g_list_prepend(op_list, rsc_op);
         }
     }
 
     if (op_list == NULL) {
         /* if there are no operations, there is nothing to do */
         return NULL;
     }
 
     sorted_op_list = g_list_sort(op_list, sort_op_by_callid);
 
     /* create active recurring operations as optional */
     if (active_filter == FALSE) {
         return sorted_op_list;
     }
 
     op_list = NULL;
 
     calculate_active_ops(sorted_op_list, &start_index, &stop_index);
 
     for (gIter = sorted_op_list; gIter != NULL; gIter = gIter->next) {
         xmlNode *rsc_op = (xmlNode *) gIter->data;
 
         counter++;
 
         if (start_index < stop_index) {
             crm_trace("Skipping %s: not active", ID(rsc_entry));
             break;
 
         } else if (counter < start_index) {
             crm_trace("Skipping %s: old", ID(rsc_op));
             continue;
         }
         op_list = g_list_append(op_list, rsc_op);
     }
 
     g_list_free(sorted_op_list);
     return op_list;
 }
 
 GListPtr
 find_operations(const char *rsc, const char *node, gboolean active_filter,
                 pe_working_set_t * data_set)
 {
     GListPtr output = NULL;
     GListPtr intermediate = NULL;
 
     xmlNode *tmp = NULL;
     xmlNode *status = find_xml_node(data_set->input, XML_CIB_TAG_STATUS, TRUE);
 
     node_t *this_node = NULL;
 
     xmlNode *node_state = NULL;
 
     for (node_state = __xml_first_child(status); node_state != NULL;
          node_state = __xml_next(node_state)) {
 
         if (crm_str_eq((const char *)node_state->name, XML_CIB_TAG_STATE, TRUE)) {
             const char *uname = crm_element_value(node_state, XML_ATTR_UNAME);
 
             if (node != NULL && safe_str_neq(uname, node)) {
                 continue;
             }
 
             this_node = pe_find_node(data_set->nodes, uname);
             if(this_node == NULL) {
                 CRM_LOG_ASSERT(this_node != NULL);
                 continue;
 
             } else if (is_remote_node(this_node)) {
                 determine_remote_online_status(this_node);
             } else {
                 determine_online_status(node_state, this_node, data_set);
             }
 
             if (this_node->details->online || is_set(data_set->flags, pe_flag_stonith_enabled)) {
                 /* offline nodes run no resources...
                  * unless stonith is enabled in which case we need to
                  *   make sure rsc start events happen after the stonith
                  */
                 xmlNode *lrm_rsc = NULL;
 
                 tmp = find_xml_node(node_state, XML_CIB_TAG_LRM, FALSE);
                 tmp = find_xml_node(tmp, XML_LRM_TAG_RESOURCES, FALSE);
 
                 for (lrm_rsc = __xml_first_child(tmp); lrm_rsc != NULL;
                      lrm_rsc = __xml_next(lrm_rsc)) {
                     if (crm_str_eq((const char *)lrm_rsc->name, XML_LRM_TAG_RESOURCE, TRUE)) {
 
                         const char *rsc_id = crm_element_value(lrm_rsc, XML_ATTR_ID);
 
                         if (rsc != NULL && safe_str_neq(rsc_id, rsc)) {
                             continue;
                         }
 
                         intermediate = extract_operations(uname, rsc_id, lrm_rsc, active_filter);
                         output = g_list_concat(output, intermediate);
                     }
                 }
             }
         }
     }
 
     return output;
 }