diff --git a/lib/pacemaker/pcmk_sched_actions.c b/lib/pacemaker/pcmk_sched_actions.c
index f5359a0095..28e7a96b37 100644
--- a/lib/pacemaker/pcmk_sched_actions.c
+++ b/lib/pacemaker/pcmk_sched_actions.c
@@ -1,1930 +1,1930 @@
 /*
- * Copyright 2004-2022 the Pacemaker project contributors
+ * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <stdio.h>
 #include <sys/param.h>
 #include <glib.h>
 
 #include <crm/lrmd_internal.h>
 #include <pacemaker-internal.h>
 #include "libpacemaker_private.h"
 
 /*!
  * \internal
  * \brief Get the action flags relevant to ordering constraints
  *
  * \param[in] action  Action to check
  * \param[in] node    Node that *other* action in the ordering is on
  *                    (used only for clone resource actions)
  *
  * \return Action flags that should be used for orderings
  */
 static enum pe_action_flags
 action_flags_for_ordering(pe_action_t *action, pe_node_t *node)
 {
     bool runnable = false;
     enum pe_action_flags flags;
 
     // For non-resource actions, return the action flags
     if (action->rsc == NULL) {
         return action->flags;
     }
 
     /* For non-clone resources, or a clone action not assigned to a node,
      * return the flags as determined by the resource method without a node
      * specified.
      */
     flags = action->rsc->cmds->action_flags(action, NULL);
     if ((node == NULL) || !pe_rsc_is_clone(action->rsc)) {
         return flags;
     }
 
     /* Otherwise (i.e., for clone resource actions on a specific node), first
      * remember whether the non-node-specific action is runnable.
      */
     runnable = pcmk_is_set(flags, pe_action_runnable);
 
     // Then recheck the resource method with the node
     flags = action->rsc->cmds->action_flags(action, node);
 
     /* For clones in ordering constraints, the node-specific "runnable" doesn't
      * matter, just the non-node-specific setting (i.e., is the action runnable
      * anywhere).
      *
      * This applies only to runnable, and only for ordering constraints. This
      * function shouldn't be used for other types of constraints without
      * changes. Not very satisfying, but it's logical and appears to work well.
      */
     if (runnable && !pcmk_is_set(flags, pe_action_runnable)) {
         pe__set_raw_action_flags(flags, action->rsc->id,
                                  pe_action_runnable);
     }
     return flags;
 }
 
 /*!
  * \internal
  * \brief Get action UUID that should be used with a resource ordering
  *
  * When an action is ordered relative to an action for a collective resource
  * (clone, group, or bundle), it actually needs to be ordered after all
  * instances of the collective have completed the relevant action (for example,
  * given "start CLONE then start RSC", RSC must wait until all instances of
  * CLONE have started). Given the UUID and resource of the first action in an
  * ordering, this returns the UUID of the action that should actually be used
  * for ordering (for example, "CLONE_started_0" instead of "CLONE_start_0").
  *
  * \param[in] first_uuid    UUID of first action in ordering
  * \param[in] first_rsc     Resource of first action in ordering
  *
  * \return Newly allocated copy of UUID to use with ordering
  * \note It is the caller's responsibility to free the return value.
  */
 static char *
 action_uuid_for_ordering(const char *first_uuid, pe_resource_t *first_rsc)
 {
     guint interval_ms = 0;
     char *uuid = NULL;
     char *rid = NULL;
     char *first_task_str = NULL;
     enum action_tasks first_task = no_action;
     enum action_tasks remapped_task = no_action;
 
     // Only non-notify actions for collective resources need remapping
     if ((strstr(first_uuid, "notify") != NULL)
         || (first_rsc->variant < pe_group)) {
         goto done;
     }
 
     // Only non-recurring actions need remapping
     CRM_ASSERT(parse_op_key(first_uuid, &rid, &first_task_str, &interval_ms));
     if (interval_ms > 0) {
         goto done;
     }
 
     first_task = text2task(first_task_str);
     switch (first_task) {
         case stop_rsc:
         case start_rsc:
         case action_notify:
         case action_promote:
         case action_demote:
             remapped_task = first_task + 1;
             break;
         case stopped_rsc:
         case started_rsc:
         case action_notified:
         case action_promoted:
         case action_demoted:
             remapped_task = first_task;
             break;
         case monitor_rsc:
         case shutdown_crm:
         case stonith_node:
             break;
         default:
             crm_err("Unknown action '%s' in ordering", first_task_str);
             break;
     }
 
     if (remapped_task != no_action) {
         /* If a (clone) resource has notifications enabled, we want to order
          * relative to when all notifications have been sent for the remapped
          * task. Only outermost resources or those in bundles have
          * notifications.
          */
         if (pcmk_is_set(first_rsc->flags, pe_rsc_notify)
             && ((first_rsc->parent == NULL)
                 || (pe_rsc_is_clone(first_rsc)
                     && (first_rsc->parent->variant == pe_container)))) {
             uuid = pcmk__notify_key(rid, "confirmed-post",
                                     task2text(remapped_task));
         } else {
             uuid = pcmk__op_key(rid, task2text(remapped_task), 0);
         }
         pe_rsc_trace(first_rsc,
                      "Remapped action UUID %s to %s for ordering purposes",
                      first_uuid, uuid);
     }
 
 done:
     if (uuid == NULL) {
         uuid = strdup(first_uuid);
         CRM_ASSERT(uuid != NULL);
     }
     free(first_task_str);
     free(rid);
     return uuid;
 }
 
 /*!
  * \internal
  * \brief Get actual action that should be used with an ordering
  *
  * When an action is ordered relative to an action for a collective resource
  * (clone, group, or bundle), it actually needs to be ordered after all
  * instances of the collective have completed the relevant action (for example,
  * given "start CLONE then start RSC", RSC must wait until all instances of
  * CLONE have started). Given the first action in an ordering, this returns the
  * the action that should actually be used for ordering (for example, the
  * started action instead of the start action).
  *
  * \param[in] action  First action in an ordering
  *
  * \return Actual action that should be used for the ordering
  */
 static pe_action_t *
 action_for_ordering(pe_action_t *action)
 {
     pe_action_t *result = action;
     pe_resource_t *rsc = action->rsc;
 
     if ((rsc != NULL) && (rsc->variant >= pe_group) && (action->uuid != NULL)) {
         char *uuid = action_uuid_for_ordering(action->uuid, rsc);
 
         result = find_first_action(rsc->actions, uuid, NULL, NULL);
         if (result == NULL) {
             crm_warn("Not remapping %s to %s because %s does not have "
                      "remapped action", action->uuid, uuid, rsc->id);
             result = action;
         }
         free(uuid);
     }
     return result;
 }
 
 /*!
  * \internal
  * \brief Update flags for ordering's actions appropriately for ordering's flags
  *
  * \param[in] first        First action in an ordering
  * \param[in] then         Then action in an ordering
  * \param[in] first_flags  Action flags for \p first for ordering purposes
  * \param[in] then_flags   Action flags for \p then for ordering purposes
  * \param[in] order        Action wrapper for \p first in ordering
  * \param[in] data_set     Cluster working set
  *
  * \return Group of enum pcmk__updated flags
  */
 static uint32_t
 update_action_for_ordering_flags(pe_action_t *first, pe_action_t *then,
                                  enum pe_action_flags first_flags,
                                  enum pe_action_flags then_flags,
                                  pe_action_wrapper_t *order,
                                  pe_working_set_t *data_set)
 {
     uint32_t changed = pcmk__updated_none;
 
     /* The node will only be used for clones. If interleaved, node will be NULL,
      * otherwise the ordering scope will be limited to the node. Normally, the
      * whole 'then' clone should restart if 'first' is restarted, so then->node
      * is needed.
      */
     pe_node_t *node = then->node;
 
     if (pcmk_is_set(order->type, pe_order_implies_then_on_node)) {
         /* For unfencing, only instances of 'then' on the same node as 'first'
          * (the unfencing operation) should restart, so reset node to
          * first->node, at which point this case is handled like a normal
          * pe_order_implies_then.
          */
         pe__clear_order_flags(order->type, pe_order_implies_then_on_node);
         pe__set_order_flags(order->type, pe_order_implies_then);
         node = first->node;
         pe_rsc_trace(then->rsc,
                      "%s then %s: mapped pe_order_implies_then_on_node to "
                      "pe_order_implies_then on %s",
                      first->uuid, then->uuid, pe__node_name(node));
     }
 
     if (pcmk_is_set(order->type, pe_order_implies_then)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags & pe_action_optional,
                                                                pe_action_optional,
                                                                pe_order_implies_then,
                                                                data_set);
         } else if (!pcmk_is_set(first_flags, pe_action_optional)
                    && pcmk_is_set(then->flags, pe_action_optional)) {
             pe__clear_action_flags(then, pe_action_optional);
             pcmk__set_updated_flags(changed, first, pcmk__updated_then);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_implies_then",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_restart) && (then->rsc != NULL)) {
         enum pe_action_flags restart = pe_action_optional|pe_action_runnable;
 
         changed |= then->rsc->cmds->update_ordered_actions(first, then, node,
                                                            first_flags, restart,
                                                            pe_order_restart,
                                                            data_set);
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_restart",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_implies_first)) {
         if (first->rsc != NULL) {
             changed |= first->rsc->cmds->update_ordered_actions(first, then,
                                                                 node,
                                                                 first_flags,
                                                                 pe_action_optional,
                                                                 pe_order_implies_first,
                                                                 data_set);
         } else if (!pcmk_is_set(first_flags, pe_action_optional)
                    && pcmk_is_set(first->flags, pe_action_runnable)) {
             pe__clear_action_flags(first, pe_action_runnable);
             pcmk__set_updated_flags(changed, first, pcmk__updated_first);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_implies_first",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_promoted_implies_first)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags & pe_action_optional,
                                                                pe_action_optional,
                                                                pe_order_promoted_implies_first,
                                                                data_set);
         }
         pe_rsc_trace(then->rsc,
                      "%s then %s: %s after pe_order_promoted_implies_first",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_one_or_more)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_runnable,
                                                                pe_order_one_or_more,
                                                                data_set);
 
         } else if (pcmk_is_set(first_flags, pe_action_runnable)) {
             // We have another runnable instance of "first"
             then->runnable_before++;
 
             /* Mark "then" as runnable if it requires a certain number of
              * "before" instances to be runnable, and they now are.
              */
             if ((then->runnable_before >= then->required_runnable_before)
                 && !pcmk_is_set(then->flags, pe_action_runnable)) {
 
                 pe__set_action_flags(then, pe_action_runnable);
                 pcmk__set_updated_flags(changed, first, pcmk__updated_then);
             }
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_one_or_more",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_probe) && (then->rsc != NULL)) {
         if (!pcmk_is_set(first_flags, pe_action_runnable)
             && (first->rsc->running_on != NULL)) {
 
             pe_rsc_trace(then->rsc,
                          "%s then %s: ignoring because first is stopping",
                          first->uuid, then->uuid);
             order->type = pe_order_none;
         } else {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_runnable,
                                                                pe_order_runnable_left,
                                                                data_set);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_probe",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_runnable_left)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_runnable,
                                                                pe_order_runnable_left,
                                                                data_set);
 
         } else if (!pcmk_is_set(first_flags, pe_action_runnable)
                    && pcmk_is_set(then->flags, pe_action_runnable)) {
 
             pe__clear_action_flags(then, pe_action_runnable);
             pcmk__set_updated_flags(changed, first, pcmk__updated_then);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_runnable_left",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_implies_first_migratable)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_optional,
                                                                pe_order_implies_first_migratable,
                                                                data_set);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after "
                      "pe_order_implies_first_migratable",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_pseudo_left)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_optional,
                                                                pe_order_pseudo_left,
                                                                data_set);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_pseudo_left",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_optional)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_runnable,
                                                                pe_order_optional,
                                                                data_set);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_optional",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(order->type, pe_order_asymmetrical)) {
         if (then->rsc != NULL) {
             changed |= then->rsc->cmds->update_ordered_actions(first, then,
                                                                node,
                                                                first_flags,
                                                                pe_action_runnable,
                                                                pe_order_asymmetrical,
                                                                data_set);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after pe_order_asymmetrical",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     if (pcmk_is_set(first->flags, pe_action_runnable)
         && pcmk_is_set(order->type, pe_order_implies_then_printed)
         && !pcmk_is_set(first_flags, pe_action_optional)) {
 
         pe_rsc_trace(then->rsc, "%s will be in graph because %s is required",
                      then->uuid, first->uuid);
         pe__set_action_flags(then, pe_action_print_always);
         // Don't bother marking 'then' as changed just for this
     }
 
     if (pcmk_is_set(order->type, pe_order_implies_first_printed)
         && !pcmk_is_set(then_flags, pe_action_optional)) {
 
         pe_rsc_trace(then->rsc, "%s will be in graph because %s is required",
                      first->uuid, then->uuid);
         pe__set_action_flags(first, pe_action_print_always);
         // Don't bother marking 'first' as changed just for this
     }
 
     if (pcmk_any_flags_set(order->type, pe_order_implies_then
                                         |pe_order_implies_first
                                         |pe_order_restart)
         && (first->rsc != NULL)
         && !pcmk_is_set(first->rsc->flags, pe_rsc_managed)
         && pcmk_is_set(first->rsc->flags, pe_rsc_block)
         && !pcmk_is_set(first->flags, pe_action_runnable)
         && pcmk__str_eq(first->task, RSC_STOP, pcmk__str_casei)) {
 
         if (pcmk_is_set(then->flags, pe_action_runnable)) {
             pe__clear_action_flags(then, pe_action_runnable);
             pcmk__set_updated_flags(changed, first, pcmk__updated_then);
         }
         pe_rsc_trace(then->rsc, "%s then %s: %s after checking whether first "
                      "is blocked, unmanaged, unrunnable stop",
                      first->uuid, then->uuid,
                      (changed? "changed" : "unchanged"));
     }
 
     return changed;
 }
 
 // Convenience macros for logging action properties
 
 #define action_type_str(flags) \
     (pcmk_is_set((flags), pe_action_pseudo)? "pseudo-action" : "action")
 
 #define action_optional_str(flags) \
     (pcmk_is_set((flags), pe_action_optional)? "optional" : "required")
 
 #define action_runnable_str(flags) \
     (pcmk_is_set((flags), pe_action_runnable)? "runnable" : "unrunnable")
 
 #define action_node_str(a) \
     (((a)->node == NULL)? "no node" : (a)->node->details->uname)
 
 /*!
  * \internal
  * \brief Update an action's flags for all orderings where it is "then"
  *
  * \param[in] then      Action to update
  * \param[in] data_set  Cluster working set
  */
 void
 pcmk__update_action_for_orderings(pe_action_t *then, pe_working_set_t *data_set)
 {
     GList *lpc = NULL;
     uint32_t changed = pcmk__updated_none;
     int last_flags = then->flags;
 
     pe_rsc_trace(then->rsc, "Updating %s %s (%s %s) on %s",
                  action_type_str(then->flags), then->uuid,
                  action_optional_str(then->flags),
                  action_runnable_str(then->flags), action_node_str(then));
 
     if (pcmk_is_set(then->flags, pe_action_requires_any)) {
         /* Initialize current known "runnable before" actions. As
          * update_action_for_ordering_flags() is called for each of then's
          * before actions, this number will increment as runnable 'first'
          * actions are encountered.
          */
         then->runnable_before = 0;
 
         if (then->required_runnable_before == 0) {
             /* @COMPAT This ordering constraint uses the deprecated
              * "require-all=false" attribute. Treat it like "clone-min=1".
              */
             then->required_runnable_before = 1;
         }
 
         /* The pe_order_one_or_more clause of update_action_for_ordering_flags()
          * (called below) will reset runnable if appropriate.
          */
         pe__clear_action_flags(then, pe_action_runnable);
     }
 
     for (lpc = then->actions_before; lpc != NULL; lpc = lpc->next) {
         pe_action_wrapper_t *other = (pe_action_wrapper_t *) lpc->data;
         pe_action_t *first = other->action;
 
         pe_node_t *then_node = then->node;
         pe_node_t *first_node = first->node;
 
         if ((first->rsc != NULL)
             && (first->rsc->variant == pe_group)
             && pcmk__str_eq(first->task, RSC_START, pcmk__str_casei)) {
 
             first_node = first->rsc->fns->location(first->rsc, NULL, FALSE);
             if (first_node != NULL) {
                 pe_rsc_trace(first->rsc, "Found %s for 'first' %s",
                              pe__node_name(first_node), first->uuid);
             }
         }
 
         if ((then->rsc != NULL)
             && (then->rsc->variant == pe_group)
             && pcmk__str_eq(then->task, RSC_START, pcmk__str_casei)) {
 
             then_node = then->rsc->fns->location(then->rsc, NULL, FALSE);
             if (then_node != NULL) {
                 pe_rsc_trace(then->rsc, "Found %s for 'then' %s",
                              pe__node_name(then_node), then->uuid);
             }
         }
 
         // Disable constraint if it only applies when on same node, but isn't
         if (pcmk_is_set(other->type, pe_order_same_node)
             && (first_node != NULL) && (then_node != NULL)
             && (first_node->details != then_node->details)) {
 
             pe_rsc_trace(then->rsc,
                          "Disabled ordering %s on %s then %s on %s: not same node",
                          other->action->uuid, pe__node_name(first_node),
                          then->uuid, pe__node_name(then_node));
             other->type = pe_order_none;
             continue;
         }
 
         pcmk__clear_updated_flags(changed, then, pcmk__updated_first);
 
         if ((first->rsc != NULL)
             && pcmk_is_set(other->type, pe_order_then_cancels_first)
             && !pcmk_is_set(then->flags, pe_action_optional)) {
 
             /* 'then' is required, so we must abandon 'first'
              * (e.g. a required stop cancels any agent reload).
              */
             pe__set_action_flags(other->action, pe_action_optional);
             if (!strcmp(first->task, CRMD_ACTION_RELOAD_AGENT)) {
                 pe__clear_resource_flags(first->rsc, pe_rsc_reload);
             }
         }
 
         if ((first->rsc != NULL) && (then->rsc != NULL)
             && (first->rsc != then->rsc) && !is_parent(then->rsc, first->rsc)) {
             first = action_for_ordering(first);
         }
         if (first != other->action) {
             pe_rsc_trace(then->rsc, "Ordering %s after %s instead of %s",
                          then->uuid, first->uuid, other->action->uuid);
         }
 
         pe_rsc_trace(then->rsc,
                      "%s (%#.6x) then %s (%#.6x): type=%#.6x node=%s",
                      first->uuid, first->flags, then->uuid, then->flags,
                      other->type, action_node_str(first));
 
         if (first == other->action) {
             /* 'first' was not remapped (e.g. from 'start' to 'running'), which
              * could mean it is a non-resource action, a primitive resource
              * action, or already expanded.
              */
             enum pe_action_flags first_flags, then_flags;
 
             first_flags = action_flags_for_ordering(first, then_node);
             then_flags = action_flags_for_ordering(then, first_node);
 
             changed |= update_action_for_ordering_flags(first, then,
                                                         first_flags, then_flags,
                                                         other, data_set);
 
             /* 'first' was for a complex resource (clone, group, etc),
              * create a new dependency if necessary
              */
         } else if (order_actions(first, then, other->type)) {
             /* This was the first time 'first' and 'then' were associated,
              * start again to get the new actions_before list
              */
             pcmk__set_updated_flags(changed, then, pcmk__updated_then);
             pe_rsc_trace(then->rsc,
                          "Disabled ordering %s then %s in favor of %s then %s",
                          other->action->uuid, then->uuid, first->uuid,
                          then->uuid);
             other->type = pe_order_none;
         }
 
 
         if (pcmk_is_set(changed, pcmk__updated_first)) {
             crm_trace("Re-processing %s and its 'after' actions "
                       "because it changed", first->uuid);
             for (GList *lpc2 = first->actions_after; lpc2 != NULL;
                  lpc2 = lpc2->next) {
                 pe_action_wrapper_t *other = (pe_action_wrapper_t *) lpc2->data;
 
                 pcmk__update_action_for_orderings(other->action, data_set);
             }
             pcmk__update_action_for_orderings(first, data_set);
         }
     }
 
     if (pcmk_is_set(then->flags, pe_action_requires_any)) {
         if (last_flags == then->flags) {
             pcmk__clear_updated_flags(changed, then, pcmk__updated_then);
         } else {
             pcmk__set_updated_flags(changed, then, pcmk__updated_then);
         }
     }
 
     if (pcmk_is_set(changed, pcmk__updated_then)) {
         crm_trace("Re-processing %s and its 'after' actions because it changed",
                   then->uuid);
         if (pcmk_is_set(last_flags, pe_action_runnable)
             && !pcmk_is_set(then->flags, pe_action_runnable)) {
             pcmk__block_colocation_dependents(then, data_set);
         }
         pcmk__update_action_for_orderings(then, data_set);
         for (lpc = then->actions_after; lpc != NULL; lpc = lpc->next) {
             pe_action_wrapper_t *other = (pe_action_wrapper_t *) lpc->data;
 
             pcmk__update_action_for_orderings(other->action, data_set);
         }
     }
 }
 
 static inline bool
 is_primitive_action(pe_action_t *action)
 {
     return action && action->rsc && (action->rsc->variant == pe_native);
 }
 
 /*!
  * \internal
  * \brief Clear a single action flag and set reason text
  *
  * \param[in] action  Action whose flag should be cleared
  * \param[in] flag    Action flag that should be cleared
  * \param[in] reason  Action that is the reason why flag is being cleared
  */
 #define clear_action_flag_because(action, flag, reason) do {                \
         if (pcmk_is_set((action)->flags, (flag))) {                         \
             pe__clear_action_flags(action, flag);                           \
             if ((action)->rsc != (reason)->rsc) {                           \
                 char *reason_text = pe__action2reason((reason), (flag));    \
                 pe_action_set_reason((action), reason_text,                 \
                                    ((flag) == pe_action_migrate_runnable)); \
                 free(reason_text);                                          \
             }                                                               \
         }                                                                   \
     } while (0)
 
 /*!
  * \internal
  * \brief Update actions in an asymmetric ordering
  *
  * \param[in] first  'First' action in an asymmetric ordering
  * \param[in] then   'Then' action in an asymmetric ordering
  */
 static void
 handle_asymmetric_ordering(pe_action_t *first, pe_action_t *then)
 {
     enum rsc_role_e then_rsc_role = RSC_ROLE_UNKNOWN;
     GList *then_on = NULL;
 
     if (then->rsc == NULL) {
         // Asymmetric orderings only matter if there's a resource involved
         return;
     }
 
     then_rsc_role = then->rsc->fns->state(then->rsc, TRUE);
     then_on = then->rsc->running_on;
 
     if ((then_rsc_role == RSC_ROLE_STOPPED)
         && pcmk__str_eq(then->task, RSC_STOP, pcmk__str_none)) {
         /* Nothing needs to be done for asymmetric ordering if 'then' is
          * supposed to be stopped after 'first' but is already stopped.
          */
         return;
     }
 
     if ((then_rsc_role >= RSC_ROLE_STARTED)
         && pcmk_is_set(then->flags, pe_action_optional)
         && (then->node != NULL)
         && pcmk__list_of_1(then_on)
         && (then->node->details == ((pe_node_t *) then_on->data)->details)
         && pcmk__str_eq(then->task, RSC_START, pcmk__str_none)) {
         /* Nothing needs to be done for asymmetric ordering if 'then' is
          * supposed to be started after 'first' but is already started --
          * unless the start is mandatory, which indicates the resource is
          * restarting and the ordering is still needed.
          */
         return;
     }
 
     if (!pcmk_is_set(first->flags, pe_action_runnable)) {
         // 'First' can't run, so 'then' can't either
         clear_action_flag_because(then, pe_action_optional, first);
         clear_action_flag_because(then, pe_action_runnable, first);
     }
 }
 
 /*!
  * \internal
  * \brief Set action bits appropriately when pe_restart_order is used
  *
  * \param[in] first   'First' action in an ordering with pe_restart_order
  * \param[in] then    'Then' action in an ordering with pe_restart_order
  * \param[in] filter  What action flags to care about
  *
  * \note pe_restart_order is set for "stop resource before starting it" and
  *       "stop later group member before stopping earlier group member"
  */
 static void
 handle_restart_ordering(pe_action_t *first, pe_action_t *then, uint32_t filter)
 {
     const char *reason = NULL;
 
     CRM_ASSERT(is_primitive_action(first));
     CRM_ASSERT(is_primitive_action(then));
 
     // We need to update the action in two cases:
 
     // ... if 'then' is required
     if (pcmk_is_set(filter, pe_action_optional)
         && !pcmk_is_set(then->flags, pe_action_optional)) {
         reason = "restart";
     }
 
     /* ... if 'then' is unrunnable action on same resource (if a resource
      * should restart but can't start, we still want to stop)
      */
     if (pcmk_is_set(filter, pe_action_runnable)
         && !pcmk_is_set(then->flags, pe_action_runnable)
         && pcmk_is_set(then->rsc->flags, pe_rsc_managed)
         && (first->rsc == then->rsc)) {
         reason = "stop";
     }
 
     if (reason == NULL) {
         return;
     }
 
     pe_rsc_trace(first->rsc, "Handling %s -> %s for %s",
                  first->uuid, then->uuid, reason);
 
     // Make 'first' required if it is runnable
     if (pcmk_is_set(first->flags, pe_action_runnable)) {
         clear_action_flag_because(first, pe_action_optional, then);
     }
 
     // Make 'first' required if 'then' is required
     if (!pcmk_is_set(then->flags, pe_action_optional)) {
         clear_action_flag_because(first, pe_action_optional, then);
     }
 
     // Make 'first' unmigratable if 'then' is unmigratable
     if (!pcmk_is_set(then->flags, pe_action_migrate_runnable)) {
         clear_action_flag_because(first, pe_action_migrate_runnable, then);
     }
 
     // Make 'then' unrunnable if 'first' is required but unrunnable
     if (!pcmk_is_set(first->flags, pe_action_optional)
         && !pcmk_is_set(first->flags, pe_action_runnable)) {
         clear_action_flag_because(then, pe_action_runnable, first);
     }
 }
 
 /*!
  * \internal
  * \brief Update two actions according to an ordering between them
  *
  * Given information about an ordering of two actions, update the actions'
  * flags (and runnable_before members if appropriate) as appropriate for the
  * ordering. In some cases, the ordering could be disabled as well.
  *
  * \param[in,out] first     'First' action in an ordering
  * \param[in,out] then      'Then' action in an ordering
  * \param[in]     node      If not NULL, limit scope of ordering to this node
  *                          (ignored)
  * \param[in]     flags     Action flags for \p first for ordering purposes
  * \param[in]     filter    Action flags to limit scope of certain updates (may
  *                          include pe_action_optional to affect only mandatory
  *                          actions, and pe_action_runnable to affect only
  *                          runnable actions)
  * \param[in]     type      Group of enum pe_ordering flags to apply
  * \param[in,out] data_set  Cluster working set
  *
  * \return Group of enum pcmk__updated flags indicating what was updated
  */
 uint32_t
 pcmk__update_ordered_actions(pe_action_t *first, pe_action_t *then,
                              const pe_node_t *node, uint32_t flags,
                              uint32_t filter, uint32_t type,
                              pe_working_set_t *data_set)
 {
     uint32_t changed = pcmk__updated_none;
     uint32_t then_flags = then->flags;
     uint32_t first_flags = first->flags;
 
     if (pcmk_is_set(type, pe_order_asymmetrical)) {
         handle_asymmetric_ordering(first, then);
     }
 
     if (pcmk_is_set(type, pe_order_implies_first)
         && !pcmk_is_set(then_flags, pe_action_optional)) {
         // Then is required, and implies first should be, too
 
         if (pcmk_is_set(filter, pe_action_optional)
             && !pcmk_is_set(flags, pe_action_optional)
             && pcmk_is_set(first_flags, pe_action_optional)) {
             clear_action_flag_because(first, pe_action_optional, then);
         }
 
         if (pcmk_is_set(flags, pe_action_migrate_runnable)
             && !pcmk_is_set(then->flags, pe_action_migrate_runnable)) {
             clear_action_flag_because(first, pe_action_migrate_runnable, then);
         }
     }
 
     if (pcmk_is_set(type, pe_order_promoted_implies_first)
         && (then->rsc != NULL) && (then->rsc->role == RSC_ROLE_PROMOTED)
         && pcmk_is_set(filter, pe_action_optional)
         && !pcmk_is_set(then->flags, pe_action_optional)) {
 
         clear_action_flag_because(first, pe_action_optional, then);
 
         if (pcmk_is_set(first->flags, pe_action_migrate_runnable)
             && !pcmk_is_set(then->flags, pe_action_migrate_runnable)) {
             clear_action_flag_because(first, pe_action_migrate_runnable,
                                       then);
         }
     }
 
     if (pcmk_is_set(type, pe_order_implies_first_migratable)
         && pcmk_is_set(filter, pe_action_optional)) {
 
         if (!pcmk_all_flags_set(then->flags,
                                 pe_action_migrate_runnable|pe_action_runnable)) {
             clear_action_flag_because(first, pe_action_runnable, then);
         }
 
         if (!pcmk_is_set(then->flags, pe_action_optional)) {
             clear_action_flag_because(first, pe_action_optional, then);
         }
     }
 
     if (pcmk_is_set(type, pe_order_pseudo_left)
         && pcmk_is_set(filter, pe_action_optional)
         && !pcmk_is_set(first->flags, pe_action_runnable)) {
 
         clear_action_flag_because(then, pe_action_migrate_runnable, first);
         pe__clear_action_flags(then, pe_action_pseudo);
     }
 
     if (pcmk_is_set(type, pe_order_runnable_left)
         && pcmk_is_set(filter, pe_action_runnable)
         && pcmk_is_set(then->flags, pe_action_runnable)
         && !pcmk_is_set(flags, pe_action_runnable)) {
 
         clear_action_flag_because(then, pe_action_runnable, first);
         clear_action_flag_because(then, pe_action_migrate_runnable, first);
     }
 
     if (pcmk_is_set(type, pe_order_implies_then)
         && pcmk_is_set(filter, pe_action_optional)
         && pcmk_is_set(then->flags, pe_action_optional)
         && !pcmk_is_set(flags, pe_action_optional)
         && !pcmk_is_set(first->flags, pe_action_migrate_runnable)) {
 
         clear_action_flag_because(then, pe_action_optional, first);
     }
 
     if (pcmk_is_set(type, pe_order_restart)) {
         handle_restart_ordering(first, then, filter);
     }
 
     if (then_flags != then->flags) {
         pcmk__set_updated_flags(changed, first, pcmk__updated_then);
         pe_rsc_trace(then->rsc,
                      "%s on %s: flags are now %#.6x (was %#.6x) "
                      "because of 'first' %s (%#.6x)",
                      then->uuid, pe__node_name(then->node),
                      then->flags, then_flags, first->uuid, first->flags);
 
         if ((then->rsc != NULL) && (then->rsc->parent != NULL)) {
             // Required to handle "X_stop then X_start" for cloned groups
             pcmk__update_action_for_orderings(then, data_set);
         }
     }
 
     if (first_flags != first->flags) {
         pcmk__set_updated_flags(changed, first, pcmk__updated_first);
         pe_rsc_trace(first->rsc,
                      "%s on %s: flags are now %#.6x (was %#.6x) "
                      "because of 'then' %s (%#.6x)",
                      first->uuid, pe__node_name(first->node),
                      first->flags, first_flags, then->uuid, then->flags);
     }
 
     return changed;
 }
 
 /*!
  * \internal
  * \brief Trace-log an action (optionally with its dependent actions)
  *
  * \param[in] pre_text  If not NULL, prefix the log with this plus ": "
  * \param[in] action    Action to log
  * \param[in] details   If true, recursively log dependent actions
  */
 void
 pcmk__log_action(const char *pre_text, pe_action_t *action, bool details)
 {
     const char *node_uname = NULL;
     const char *node_uuid = NULL;
     const char *desc = NULL;
 
     CRM_CHECK(action != NULL, return);
 
     if (!pcmk_is_set(action->flags, pe_action_pseudo)) {
         if (action->node != NULL) {
             node_uname = action->node->details->uname;
             node_uuid = action->node->details->id;
         } else {
             node_uname = "<none>";
         }
     }
 
     switch (text2task(action->task)) {
         case stonith_node:
         case shutdown_crm:
             if (pcmk_is_set(action->flags, pe_action_pseudo)) {
                 desc = "Pseudo ";
             } else if (pcmk_is_set(action->flags, pe_action_optional)) {
                 desc = "Optional ";
             } else if (!pcmk_is_set(action->flags, pe_action_runnable)) {
                 desc = "!!Non-Startable!! ";
             } else if (pcmk_is_set(action->flags, pe_action_processed)) {
                desc = "";
             } else {
                desc = "(Provisional) ";
             }
             crm_trace("%s%s%sAction %d: %s%s%s%s%s%s",
                       ((pre_text == NULL)? "" : pre_text),
                       ((pre_text == NULL)? "" : ": "),
                       desc, action->id, action->uuid,
                       (node_uname? "\ton " : ""), (node_uname? node_uname : ""),
                       (node_uuid? "\t\t(" : ""), (node_uuid? node_uuid : ""),
                       (node_uuid? ")" : ""));
             break;
         default:
             if (pcmk_is_set(action->flags, pe_action_optional)) {
                 desc = "Optional ";
             } else if (pcmk_is_set(action->flags, pe_action_pseudo)) {
                 desc = "Pseudo ";
             } else if (!pcmk_is_set(action->flags, pe_action_runnable)) {
                 desc = "!!Non-Startable!! ";
             } else if (pcmk_is_set(action->flags, pe_action_processed)) {
                desc = "";
             } else {
                desc = "(Provisional) ";
             }
             crm_trace("%s%s%sAction %d: %s %s%s%s%s%s%s",
                       ((pre_text == NULL)? "" : pre_text),
                       ((pre_text == NULL)? "" : ": "),
                       desc, action->id, action->uuid,
                       (action->rsc? action->rsc->id : "<none>"),
                       (node_uname? "\ton " : ""), (node_uname? node_uname : ""),
                       (node_uuid? "\t\t(" : ""), (node_uuid? node_uuid : ""),
                       (node_uuid? ")" : ""));
             break;
     }
 
     if (details) {
         GList *iter = NULL;
 
         crm_trace("\t\t====== Preceding Actions");
         for (iter = action->actions_before; iter != NULL; iter = iter->next) {
             pe_action_wrapper_t *other = (pe_action_wrapper_t *) iter->data;
 
             pcmk__log_action("\t\t", other->action, false);
         }
         crm_trace("\t\t====== Subsequent Actions");
         for (iter = action->actions_after; iter != NULL; iter = iter->next) {
             pe_action_wrapper_t *other = (pe_action_wrapper_t *) iter->data;
 
             pcmk__log_action("\t\t", other->action, false);
         }
         crm_trace("\t\t====== End");
 
     } else {
         crm_trace("\t\t(before=%d, after=%d)",
                   g_list_length(action->actions_before),
                   g_list_length(action->actions_after));
     }
 }
 
 /*!
  * \internal
  * \brief Create a new shutdown action for a node
  *
  * \param[in] node         Node being shut down
  *
  * \return Newly created shutdown action for \p node
  */
 pe_action_t *
 pcmk__new_shutdown_action(pe_node_t *node)
 {
     char *shutdown_id = NULL;
     pe_action_t *shutdown_op = NULL;
 
     CRM_ASSERT(node != NULL);
 
     shutdown_id = crm_strdup_printf("%s-%s", CRM_OP_SHUTDOWN,
                                     node->details->uname);
 
     shutdown_op = custom_action(NULL, shutdown_id, CRM_OP_SHUTDOWN, node, FALSE,
                                 TRUE, node->details->data_set);
 
     pcmk__order_stops_before_shutdown(node, shutdown_op);
     add_hash_param(shutdown_op->meta, XML_ATTR_TE_NOWAIT, XML_BOOLEAN_TRUE);
     return shutdown_op;
 }
 
 /*!
  * \internal
  * \brief Calculate and add an operation digest to XML
  *
  * Calculate an operation digest, which enables us to later determine when a
  * restart is needed due to the resource's parameters being changed, and add it
  * to given XML.
  *
  * \param[in] op       Operation result from executor
  * \param[in] update   XML to add digest to
  */
 static void
 add_op_digest_to_xml(lrmd_event_data_t *op, xmlNode *update)
 {
     char *digest = NULL;
     xmlNode *args_xml = NULL;
 
     if (op->params == NULL) {
         return;
     }
     args_xml = create_xml_node(NULL, XML_TAG_PARAMS);
     g_hash_table_foreach(op->params, hash2field, args_xml);
     pcmk__filter_op_for_digest(args_xml);
     digest = calculate_operation_digest(args_xml, NULL);
     crm_xml_add(update, XML_LRM_ATTR_OP_DIGEST, digest);
     free_xml(args_xml);
     free(digest);
 }
 
 #define FAKE_TE_ID     "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
 
 /*!
  * \internal
  * \brief Create XML for resource operation history update
  *
  * \param[in,out] parent          Parent XML node to add to
  * \param[in,out] op              Operation event data
  * \param[in]     caller_version  DC feature set
  * \param[in]     target_rc       Expected result of operation
  * \param[in]     node            Name of node on which operation was performed
  * \param[in]     origin          Arbitrary description of update source
  *
  * \return Newly created XML node for history update
  */
 xmlNode *
 pcmk__create_history_xml(xmlNode *parent, lrmd_event_data_t *op,
                          const char *caller_version, int target_rc,
                          const char *node, const char *origin)
 {
     char *key = NULL;
     char *magic = NULL;
     char *op_id = NULL;
     char *op_id_additional = NULL;
     char *local_user_data = NULL;
     const char *exit_reason = NULL;
 
     xmlNode *xml_op = NULL;
     const char *task = NULL;
 
     CRM_CHECK(op != NULL, return NULL);
     crm_trace("Creating history XML for %s-interval %s action for %s on %s "
               "(DC version: %s, origin: %s)",
               pcmk__readable_interval(op->interval_ms), op->op_type, op->rsc_id,
               ((node == NULL)? "no node" : node), caller_version, origin);
 
     task = op->op_type;
 
     /* Record a successful agent reload as a start, and a failed one as a
      * monitor, to make life easier for the scheduler when determining the
      * current state.
      *
      * @COMPAT We should check "reload" here only if the operation was for a
      * pre-OCF-1.1 resource agent, but we don't know that here, and we should
      * only ever get results for actions scheduled by us, so we can reasonably
      * assume any "reload" is actually a pre-1.1 agent reload.
      */
     if (pcmk__str_any_of(task, CRMD_ACTION_RELOAD, CRMD_ACTION_RELOAD_AGENT,
                          NULL)) {
         if (op->op_status == PCMK_EXEC_DONE) {
             task = CRMD_ACTION_START;
         } else {
             task = CRMD_ACTION_STATUS;
         }
     }
 
     key = pcmk__op_key(op->rsc_id, task, op->interval_ms);
     if (pcmk__str_eq(task, CRMD_ACTION_NOTIFY, pcmk__str_none)) {
         const char *n_type = crm_meta_value(op->params, "notify_type");
         const char *n_task = crm_meta_value(op->params, "notify_operation");
 
         CRM_LOG_ASSERT(n_type != NULL);
         CRM_LOG_ASSERT(n_task != NULL);
         op_id = pcmk__notify_key(op->rsc_id, n_type, n_task);
 
         if (op->op_status != PCMK_EXEC_PENDING) {
             /* Ignore notify errors.
              *
              * @TODO It might be better to keep the correct result here, and
              * ignore it in process_graph_event().
              */
             lrmd__set_result(op, PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
         }
 
     /* Migration history is preserved separately, which usually matters for
      * multiple nodes and is important for future cluster transitions.
      */
     } else if (pcmk__str_any_of(op->op_type, CRMD_ACTION_MIGRATE,
                                 CRMD_ACTION_MIGRATED, NULL)) {
         op_id = strdup(key);
 
     } else if (did_rsc_op_fail(op, target_rc)) {
         op_id = pcmk__op_key(op->rsc_id, "last_failure", 0);
         if (op->interval_ms == 0) {
             // Ensure 'last' gets updated, in case record-pending is true
             op_id_additional = pcmk__op_key(op->rsc_id, "last", 0);
 
         } else {
             // Ensure any pending recurring monitor gets updated if it fails
             op_id_additional = strdup(key);
         }
         exit_reason = op->exit_reason;
 
     } else if (op->interval_ms > 0) {
         op_id = strdup(key);
 
     } else {
         op_id = pcmk__op_key(op->rsc_id, "last", 0);
     }
 
   again:
     xml_op = pcmk__xe_match(parent, XML_LRM_TAG_RSC_OP, XML_ATTR_ID, op_id);
     if (xml_op == NULL) {
         xml_op = create_xml_node(parent, XML_LRM_TAG_RSC_OP);
     }
 
     if (op->user_data == NULL) {
         crm_debug("Generating fake transition key for: " PCMK__OP_FMT
                   " %d from %s", op->rsc_id, op->op_type, op->interval_ms,
                   op->call_id, origin);
         local_user_data = pcmk__transition_key(-1, op->call_id, target_rc,
                                                FAKE_TE_ID);
         op->user_data = local_user_data;
     }
 
     if (magic == NULL) {
         magic = crm_strdup_printf("%d:%d;%s", op->op_status, op->rc,
                                   (const char *) op->user_data);
     }
 
     crm_xml_add(xml_op, XML_ATTR_ID, op_id);
     crm_xml_add(xml_op, XML_LRM_ATTR_TASK_KEY, key);
     crm_xml_add(xml_op, XML_LRM_ATTR_TASK, task);
     crm_xml_add(xml_op, XML_ATTR_ORIGIN, origin);
     crm_xml_add(xml_op, XML_ATTR_CRM_VERSION, caller_version);
     crm_xml_add(xml_op, XML_ATTR_TRANSITION_KEY, op->user_data);
     crm_xml_add(xml_op, XML_ATTR_TRANSITION_MAGIC, magic);
     crm_xml_add(xml_op, XML_LRM_ATTR_EXIT_REASON, exit_reason == NULL ? "" : exit_reason);
     crm_xml_add(xml_op, XML_LRM_ATTR_TARGET, node); /* For context during triage */
 
     crm_xml_add_int(xml_op, XML_LRM_ATTR_CALLID, op->call_id);
     crm_xml_add_int(xml_op, XML_LRM_ATTR_RC, op->rc);
     crm_xml_add_int(xml_op, XML_LRM_ATTR_OPSTATUS, op->op_status);
     crm_xml_add_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, op->interval_ms);
 
     if (compare_version("2.1", caller_version) <= 0) {
         if (op->t_run || op->t_rcchange || op->exec_time || op->queue_time) {
             crm_trace("Timing data (" PCMK__OP_FMT
                       "): last=%u change=%u exec=%u queue=%u",
                       op->rsc_id, op->op_type, op->interval_ms,
                       op->t_run, op->t_rcchange, op->exec_time, op->queue_time);
 
             if ((op->interval_ms != 0) && (op->t_rcchange != 0)) {
                 // Recurring ops may have changed rc after initial run
                 crm_xml_add_ll(xml_op, XML_RSC_OP_LAST_CHANGE,
                                (long long) op->t_rcchange);
             } else {
                 crm_xml_add_ll(xml_op, XML_RSC_OP_LAST_CHANGE,
                                (long long) op->t_run);
             }
 
             crm_xml_add_int(xml_op, XML_RSC_OP_T_EXEC, op->exec_time);
             crm_xml_add_int(xml_op, XML_RSC_OP_T_QUEUE, op->queue_time);
         }
     }
 
     if (pcmk__str_any_of(op->op_type, CRMD_ACTION_MIGRATE, CRMD_ACTION_MIGRATED, NULL)) {
         /*
          * Record migrate_source and migrate_target always for migrate ops.
          */
         const char *name = XML_LRM_ATTR_MIGRATE_SOURCE;
 
         crm_xml_add(xml_op, name, crm_meta_value(op->params, name));
 
         name = XML_LRM_ATTR_MIGRATE_TARGET;
         crm_xml_add(xml_op, name, crm_meta_value(op->params, name));
     }
 
     add_op_digest_to_xml(op, xml_op);
 
     if (op_id_additional) {
         free(op_id);
         op_id = op_id_additional;
         op_id_additional = NULL;
         goto again;
     }
 
     if (local_user_data) {
         free(local_user_data);
         op->user_data = NULL;
     }
     free(magic);
     free(op_id);
     free(key);
     return xml_op;
 }
 
 /*!
  * \internal
  * \brief Check whether an action shutdown-locks a resource to a node
  *
  * If the shutdown-lock cluster property is set, resources will not be recovered
  * on a different node if cleanly stopped, and may start only on that same node.
  * This function checks whether that applies to a given action, so that the
  * transition graph can be marked appropriately.
  *
  * \param[in] action  Action to check
  *
  * \return true if \p action locks its resource to the action's node,
  *         otherwise false
  */
 bool
 pcmk__action_locks_rsc_to_node(const pe_action_t *action)
 {
     // Only resource actions taking place on resource's lock node are locked
     if ((action == NULL) || (action->rsc == NULL)
         || (action->rsc->lock_node == NULL) || (action->node == NULL)
         || (action->node->details != action->rsc->lock_node->details)) {
         return false;
     }
 
     /* During shutdown, only stops are locked (otherwise, another action such as
      * a demote would cause the controller to clear the lock)
      */
     if (action->node->details->shutdown && (action->task != NULL)
         && (strcmp(action->task, RSC_STOP) != 0)) {
         return false;
     }
 
     return true;
 }
 
 /* lowest to highest */
 static gint
 sort_action_id(gconstpointer a, gconstpointer b)
 {
     const pe_action_wrapper_t *action_wrapper2 = (const pe_action_wrapper_t *)a;
     const pe_action_wrapper_t *action_wrapper1 = (const pe_action_wrapper_t *)b;
 
     if (a == NULL) {
         return 1;
     }
     if (b == NULL) {
         return -1;
     }
     if (action_wrapper1->action->id < action_wrapper2->action->id) {
         return 1;
     }
     if (action_wrapper1->action->id > action_wrapper2->action->id) {
         return -1;
     }
     return 0;
 }
 
 /*!
  * \internal
  * \brief Remove any duplicate action inputs, merging action flags
  *
  * \param[in] action  Action whose inputs should be checked
  */
 void
 pcmk__deduplicate_action_inputs(pe_action_t *action)
 {
     GList *item = NULL;
     GList *next = NULL;
     pe_action_wrapper_t *last_input = NULL;
 
     action->actions_before = g_list_sort(action->actions_before,
                                          sort_action_id);
     for (item = action->actions_before; item != NULL; item = next) {
         pe_action_wrapper_t *input = (pe_action_wrapper_t *) item->data;
 
         next = item->next;
         if ((last_input != NULL)
             && (input->action->id == last_input->action->id)) {
             crm_trace("Input %s (%d) duplicate skipped for action %s (%d)",
                       input->action->uuid, input->action->id,
                       action->uuid, action->id);
 
             /* For the purposes of scheduling, the ordering flags no longer
              * matter, but crm_simulate looks at certain ones when creating a
              * dot graph. Combining the flags is sufficient for that purpose.
              */
             last_input->type |= input->type;
             if (input->state == pe_link_dumped) {
                 last_input->state = pe_link_dumped;
             }
 
             free(item->data);
             action->actions_before = g_list_delete_link(action->actions_before,
                                                         item);
         } else {
             last_input = input;
             input->state = pe_link_not_dumped;
         }
     }
 }
 
 /*!
  * \internal
  * \brief Output all scheduled actions
  *
  * \param[in] data_set  Cluster working set
  */
 void
 pcmk__output_actions(pe_working_set_t *data_set)
 {
     pcmk__output_t *out = data_set->priv;
 
     // Output node (non-resource) actions
     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
         char *node_name = NULL;
         char *task = NULL;
         pe_action_t *action = (pe_action_t *) iter->data;
 
         if (action->rsc != NULL) {
             continue; // Resource actions will be output later
 
         } else if (pcmk_is_set(action->flags, pe_action_optional)) {
             continue; // This action was not scheduled
         }
 
         if (pcmk__str_eq(action->task, CRM_OP_SHUTDOWN, pcmk__str_casei)) {
             task = strdup("Shutdown");
 
         } else if (pcmk__str_eq(action->task, CRM_OP_FENCE, pcmk__str_casei)) {
             const char *op = g_hash_table_lookup(action->meta, "stonith_action");
 
             task = crm_strdup_printf("Fence (%s)", op);
 
         } else {
             continue; // Don't display other node action types
         }
 
         if (pe__is_guest_node(action->node)) {
             node_name = crm_strdup_printf("%s (resource: %s)",
                                           pe__node_name(action->node),
                                           action->node->details->remote_rsc->container->id);
         } else if (action->node != NULL) {
             node_name = crm_strdup_printf("%s", pe__node_name(action->node));
         }
 
         out->message(out, "node-action", task, node_name, action->reason);
 
         free(node_name);
         free(task);
     }
 
     // Output resource actions
     for (GList *iter = data_set->resources; iter != NULL; iter = iter->next) {
         pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
         rsc->cmds->output_actions(rsc);
     }
 }
 
 /*!
  * \internal
  * \brief Check whether action from resource history is still in configuration
  *
  * \param[in] rsc          Resource that action is for
  * \param[in] task         Action's name
  * \param[in] interval_ms  Action's interval (in milliseconds)
  *
  * \return true if action is still in resource configuration, otherwise false
  */
 static bool
 action_in_config(pe_resource_t *rsc, const char *task, guint interval_ms)
 {
     char *key = pcmk__op_key(rsc->id, task, interval_ms);
     bool config = (find_rsc_op_entry(rsc, key) != NULL);
 
     free(key);
     return config;
 }
 
 /*!
  * \internal
  * \brief Get action name needed to compare digest for configuration changes
  *
  * \param[in] task         Action name from history
  * \param[in] interval_ms  Action interval (in milliseconds)
  *
  * \return Action name whose digest should be compared
  */
 static const char *
 task_for_digest(const char *task, guint interval_ms)
 {
     /* Certain actions need to be compared against the parameters used to start
      * the resource.
      */
     if ((interval_ms == 0)
         && pcmk__str_any_of(task, RSC_STATUS, RSC_MIGRATED, RSC_PROMOTE, NULL)) {
         task = RSC_START;
     }
     return task;
 }
 
 /*!
  * \internal
  * \brief Check whether only sanitized parameters to an action changed
  *
  * When collecting CIB files for troubleshooting, crm_report will mask
  * sensitive resource parameters. If simulations were run using that, affected
  * resources would appear to need a restart, which would complicate
  * troubleshooting. To avoid that, we save a "secure digest" of non-sensitive
  * parameters. This function used that digest to check whether only masked
  * parameters are different.
  *
  * \param[in] xml_op       Resource history entry with secure digest
  * \param[in] digest_data  Operation digest information being compared
  * \param[in] data_set     Cluster working set
  *
  * \return true if only sanitized parameters changed, otherwise false
  */
 static bool
 only_sanitized_changed(const xmlNode *xml_op,
                        const op_digest_cache_t *digest_data,
                        const pe_working_set_t *data_set)
 {
     const char *digest_secure = NULL;
 
     if (!pcmk_is_set(data_set->flags, pe_flag_sanitized)) {
         // The scheduler is not being run as a simulation
         return false;
     }
 
     digest_secure = crm_element_value(xml_op, XML_LRM_ATTR_SECURE_DIGEST);
 
     return (digest_data->rc != RSC_DIGEST_MATCH) && (digest_secure != NULL)
            && (digest_data->digest_secure_calc != NULL)
            && (strcmp(digest_data->digest_secure_calc, digest_secure) == 0);
 }
 
 /*!
  * \internal
  * \brief Force a restart due to a configuration change
  *
  * \param[in]     rsc          Resource that action is for
  * \param[in]     task         Name of action whose configuration changed
  * \param[in]     interval_ms  Action interval (in milliseconds)
  * \param[in,out] node         Node where resource should be restarted
  */
 static void
 force_restart(pe_resource_t *rsc, const char *task, guint interval_ms,
               pe_node_t *node)
 {
     char *key = pcmk__op_key(rsc->id, task, interval_ms);
     pe_action_t *required = custom_action(rsc, key, task, NULL, FALSE, TRUE,
                                           rsc->cluster);
 
     pe_action_set_reason(required, "resource definition change", true);
     trigger_unfencing(rsc, node, "Device parameters changed", NULL,
                       rsc->cluster);
 }
 
 /*!
  * \internal
  * \brief Schedule a reload of a resource on a node
  *
  * \param[in] rsc   Resource to reload
  * \param[in] node  Where resource should be reloaded
  */
 static void
 schedule_reload(pe_resource_t *rsc, const pe_node_t *node)
 {
     pe_action_t *reload = NULL;
 
     // For collective resources, just call recursively for children
     if (rsc->variant > pe_native) {
         g_list_foreach(rsc->children, (GFunc) schedule_reload, (gpointer) node);
         return;
     }
 
     // Skip the reload in certain situations
     if ((node == NULL)
         || !pcmk_is_set(rsc->flags, pe_rsc_managed)
         || pcmk_is_set(rsc->flags, pe_rsc_failed)) {
         pe_rsc_trace(rsc, "Skip reload of %s:%s%s %s",
                      rsc->id,
                      pcmk_is_set(rsc->flags, pe_rsc_managed)? "" : " unmanaged",
                      pcmk_is_set(rsc->flags, pe_rsc_failed)? " failed" : "",
                      (node == NULL)? "inactive" : node->details->uname);
         return;
     }
 
     /* If a resource's configuration changed while a start was pending,
      * force a full restart instead of a reload.
      */
     if (pcmk_is_set(rsc->flags, pe_rsc_start_pending)) {
         pe_rsc_trace(rsc, "%s: preventing agent reload because start pending",
                      rsc->id);
         custom_action(rsc, stop_key(rsc), CRMD_ACTION_STOP, node, FALSE, TRUE,
                       rsc->cluster);
         return;
     }
 
     // Schedule the reload
     pe__set_resource_flags(rsc, pe_rsc_reload);
     reload = custom_action(rsc, reload_key(rsc), CRMD_ACTION_RELOAD_AGENT, node,
                            FALSE, TRUE, rsc->cluster);
     pe_action_set_reason(reload, "resource definition change", FALSE);
 
     // Set orderings so that a required stop or demote cancels the reload
     pcmk__new_ordering(NULL, NULL, reload, rsc, stop_key(rsc), NULL,
                        pe_order_optional|pe_order_then_cancels_first,
                        rsc->cluster);
     pcmk__new_ordering(NULL, NULL, reload, rsc, demote_key(rsc), NULL,
                        pe_order_optional|pe_order_then_cancels_first,
                        rsc->cluster);
 }
 
 /*!
  * \internal
  * \brief Handle any configuration change for an action
  *
  * Given an action from resource history, if the resource's configuration
  * changed since the action was done, schedule any actions needed (restart,
  * reload, unfencing, rescheduling recurring actions, etc.).
  *
  * \param[in,out] rsc     Resource that action is for
  * \param[in,out] node    Node that action was on
  * \param[in]     xml_op  Action XML from resource history
  *
  * \return true if action configuration changed, otherwise false
  */
 bool
 pcmk__check_action_config(pe_resource_t *rsc, pe_node_t *node,
                           const xmlNode *xml_op)
 {
     guint interval_ms = 0;
     const char *task = NULL;
     const op_digest_cache_t *digest_data = NULL;
 
     CRM_CHECK((rsc != NULL) && (node != NULL) && (xml_op != NULL),
               return false);
 
     task = crm_element_value(xml_op, XML_LRM_ATTR_TASK);
     CRM_CHECK(task != NULL, return false);
 
     crm_element_value_ms(xml_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
 
     // If this is a recurring action, check whether it has been orphaned
     if (interval_ms > 0) {
         if (action_in_config(rsc, task, interval_ms)) {
             pe_rsc_trace(rsc, "%s-interval %s for %s on %s is in configuration",
                          pcmk__readable_interval(interval_ms), task, rsc->id,
                          pe__node_name(node));
         } else if (pcmk_is_set(rsc->cluster->flags,
                                pe_flag_stop_action_orphans)) {
             pcmk__schedule_cancel(rsc,
                                   crm_element_value(xml_op, XML_LRM_ATTR_CALLID),
                                   task, interval_ms, node, "orphan");
             return true;
         } else {
             pe_rsc_debug(rsc, "%s-interval %s for %s on %s is orphaned",
                          pcmk__readable_interval(interval_ms), task, rsc->id,
                          pe__node_name(node));
             return true;
         }
     }
 
     crm_trace("Checking %s-interval %s for %s on %s for configuration changes",
               pcmk__readable_interval(interval_ms), task, rsc->id,
               pe__node_name(node));
     task = task_for_digest(task, interval_ms);
     digest_data = rsc_action_digest_cmp(rsc, xml_op, node, rsc->cluster);
 
     if (only_sanitized_changed(xml_op, digest_data, rsc->cluster)) {
         if (!pcmk__is_daemon && (rsc->cluster->priv != NULL)) {
             pcmk__output_t *out = rsc->cluster->priv;
 
             out->info(out,
                       "Only 'private' parameters to %s-interval %s for %s "
                       "on %s changed: %s",
                       pcmk__readable_interval(interval_ms), task, rsc->id,
                       pe__node_name(node),
                       crm_element_value(xml_op, XML_ATTR_TRANSITION_MAGIC));
         }
         return false;
     }
 
     switch (digest_data->rc) {
         case RSC_DIGEST_RESTART:
             crm_log_xml_debug(digest_data->params_restart, "params:restart");
             force_restart(rsc, task, interval_ms, node);
             return true;
 
         case RSC_DIGEST_ALL:
         case RSC_DIGEST_UNKNOWN:
             // Changes that can potentially be handled by an agent reload
 
             if (interval_ms > 0) {
                 /* Recurring actions aren't reloaded per se, they are just
                  * re-scheduled so the next run uses the new parameters.
                  * The old instance will be cancelled automatically.
                  */
                 crm_log_xml_debug(digest_data->params_all, "params:reschedule");
                 pcmk__reschedule_recurring(rsc, task, interval_ms, node);
 
             } else if (crm_element_value(xml_op,
                                          XML_LRM_ATTR_RESTART_DIGEST) != NULL) {
                 // Agent supports reload, so use it
                 trigger_unfencing(rsc, node,
                                   "Device parameters changed (reload)", NULL,
                                   rsc->cluster);
                 crm_log_xml_debug(digest_data->params_all, "params:reload");
                 schedule_reload(rsc, node);
 
             } else {
                 pe_rsc_trace(rsc,
                              "Restarting %s because agent doesn't support reload",
                              rsc->id);
                 crm_log_xml_debug(digest_data->params_restart,
                                   "params:restart");
                 force_restart(rsc, task, interval_ms, node);
             }
             return true;
 
         default:
             break;
     }
     return false;
 }
 
 /*!
  * \internal
  * \brief Create a list of resource's action history entries, sorted by call ID
  *
  * \param[in]  rsc          Resource whose history should be checked
  * \param[in]  rsc_entry    Resource's <lrm_rsc_op> status XML
  * \param[out] start_index  Where to store index of start-like action, if any
  * \param[out] stop_index   Where to store index of stop action, if any
  */
 static GList *
 rsc_history_as_list(pe_resource_t *rsc, xmlNode *rsc_entry,
                     int *start_index, int *stop_index)
 {
     GList *ops = NULL;
 
     for (xmlNode *rsc_op = first_named_child(rsc_entry, XML_LRM_TAG_RSC_OP);
          rsc_op != NULL; rsc_op = crm_next_same_xml(rsc_op)) {
         ops = g_list_prepend(ops, rsc_op);
     }
     ops = g_list_sort(ops, sort_op_by_callid);
     calculate_active_ops(ops, start_index, stop_index);
     return ops;
 }
 
 /*!
  * \internal
  * \brief Process a resource's action history from the CIB status
  *
  * Given a resource's action history, if the resource's configuration
  * changed since the actions were done, schedule any actions needed (restart,
  * reload, unfencing, rescheduling recurring actions, clean-up, etc.).
  * (This also cancels recurring actions for maintenance mode, which is not
  * entirely related but convenient to do here.)
  *
  * \param[in] rsc_entry  Resource's <lrm_rsc_op> status XML
  * \param[in] rsc        Resource whose history is being processed
  * \param[in] node       Node whose history is being processed
  */
 static void
 process_rsc_history(xmlNode *rsc_entry, pe_resource_t *rsc, pe_node_t *node)
 {
     int offset = -1;
     int stop_index = 0;
     int start_index = 0;
     GList *sorted_op_list = NULL;
 
     if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
-        if (pe_rsc_is_anon_clone(uber_parent(rsc))) {
+        if (pe_rsc_is_anon_clone(pe__const_top_resource(rsc, false))) {
             pe_rsc_trace(rsc,
                          "Skipping configuration check "
                          "for orphaned clone instance %s",
                          rsc->id);
         } else {
             pe_rsc_trace(rsc,
                          "Skipping configuration check and scheduling clean-up "
                          "for orphaned resource %s", rsc->id);
             pcmk__schedule_cleanup(rsc, node, false);
         }
         return;
     }
 
     if (pe_find_node_id(rsc->running_on, node->details->id) == NULL) {
         if (pcmk__rsc_agent_changed(rsc, node, rsc_entry, false)) {
             pcmk__schedule_cleanup(rsc, node, false);
         }
         pe_rsc_trace(rsc,
                      "Skipping configuration check for %s "
                      "because no longer active on %s",
                      rsc->id, pe__node_name(node));
         return;
     }
 
     pe_rsc_trace(rsc, "Checking for configuration changes for %s on %s",
                  rsc->id, pe__node_name(node));
 
     if (pcmk__rsc_agent_changed(rsc, node, rsc_entry, true)) {
         pcmk__schedule_cleanup(rsc, node, false);
     }
 
     sorted_op_list = rsc_history_as_list(rsc, rsc_entry, &start_index,
                                          &stop_index);
     if (start_index < stop_index) {
         return; // Resource is stopped
     }
 
     for (GList *iter = sorted_op_list; iter != NULL; iter = iter->next) {
         xmlNode *rsc_op = (xmlNode *) iter->data;
         const char *task = NULL;
         guint interval_ms = 0;
 
         if (++offset < start_index) {
             // Skip actions that happened before a start
             continue;
         }
 
         task = crm_element_value(rsc_op, XML_LRM_ATTR_TASK);
         crm_element_value_ms(rsc_op, XML_LRM_ATTR_INTERVAL_MS, &interval_ms);
 
         if ((interval_ms > 0)
             && (pcmk_is_set(rsc->flags, pe_rsc_maintenance)
                 || node->details->maintenance)) {
             // Maintenance mode cancels recurring operations
             pcmk__schedule_cancel(rsc,
                                   crm_element_value(rsc_op, XML_LRM_ATTR_CALLID),
                                   task, interval_ms, node, "maintenance mode");
 
         } else if ((interval_ms > 0)
                    || pcmk__strcase_any_of(task, RSC_STATUS, RSC_START,
                                            RSC_PROMOTE, RSC_MIGRATED, NULL)) {
             /* If a resource operation failed, and the operation's definition
              * has changed, clear any fail count so they can be retried fresh.
              */
 
             if (pe__bundle_needs_remote_name(rsc)) {
                 /* We haven't allocated resources to nodes yet, so if the
                  * REMOTE_CONTAINER_HACK is used, we may calculate the digest
                  * based on the literal "#uname" value rather than the properly
                  * substituted value. That would mistakenly make the action
                  * definition appear to have been changed. Defer the check until
                  * later in this case.
                  */
                 pe__add_param_check(rsc_op, rsc, node, pe_check_active,
                                     rsc->cluster);
 
             } else if (pcmk__check_action_config(rsc, node, rsc_op)
                        && (pe_get_failcount(node, rsc, NULL, pe_fc_effective,
                                             NULL) != 0)) {
                 pe__clear_failcount(rsc, node, "action definition changed",
                                     rsc->cluster);
             }
         }
     }
     g_list_free(sorted_op_list);
 }
 
 /*!
  * \internal
  * \brief Process a node's action history from the CIB status
  *
  * Given a node's resource history, if the resource's configuration changed
  * since the actions were done, schedule any actions needed (restart,
  * reload, unfencing, rescheduling recurring actions, clean-up, etc.).
  * (This also cancels recurring actions for maintenance mode, which is not
  * entirely related but convenient to do here.)
  *
  * \param[in] node      Node whose history is being processed
  * \param[in] lrm_rscs  Node's <lrm_resources> from CIB status XML
  * \param[in] data_set  Cluster working set
  */
 static void
 process_node_history(pe_node_t *node, xmlNode *lrm_rscs, pe_working_set_t *data_set)
 {
     crm_trace("Processing node history for %s", pe__node_name(node));
     for (xmlNode *rsc_entry = first_named_child(lrm_rscs, XML_LRM_TAG_RESOURCE);
          rsc_entry != NULL; rsc_entry = crm_next_same_xml(rsc_entry)) {
 
         if (xml_has_children(rsc_entry)) {
             GList *result = pcmk__rscs_matching_id(ID(rsc_entry), data_set);
 
             for (GList *iter = result; iter != NULL; iter = iter->next) {
                 pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
                 if (rsc->variant == pe_native) {
                     process_rsc_history(rsc_entry, rsc, node);
                 }
             }
             g_list_free(result);
         }
     }
 }
 
 // XPath to find a node's resource history
 #define XPATH_NODE_HISTORY "/" XML_TAG_CIB "/" XML_CIB_TAG_STATUS             \
                            "/" XML_CIB_TAG_STATE "[@" XML_ATTR_UNAME "='%s']" \
                            "/" XML_CIB_TAG_LRM "/" XML_LRM_TAG_RESOURCES
 
 /*!
  * \internal
  * \brief Process any resource configuration changes in the CIB status
  *
  * Go through all nodes' resource history, and if a resource's configuration
  * changed since its actions were done, schedule any actions needed (restart,
  * reload, unfencing, rescheduling recurring actions, clean-up, etc.).
  * (This also cancels recurring actions for maintenance mode, which is not
  * entirely related but convenient to do here.)
  *
  * \param[in] data_set  Cluster working set
  */
 void
 pcmk__handle_rsc_config_changes(pe_working_set_t *data_set)
 {
     crm_trace("Check resource and action configuration for changes");
 
     /* Rather than iterate through the status section, iterate through the nodes
      * and search for the appropriate status subsection for each. This skips
      * orphaned nodes and lets us eliminate some cases before searching the XML.
      */
     for (GList *iter = data_set->nodes; iter != NULL; iter = iter->next) {
         pe_node_t *node = (pe_node_t *) iter->data;
 
         /* Don't bother checking actions for a node that can't run actions ...
          * unless it's in maintenance mode, in which case we still need to
          * cancel any existing recurring monitors.
          */
         if (node->details->maintenance
             || pcmk__node_available(node, false, false)) {
 
             char *xpath = NULL;
             xmlNode *history = NULL;
 
             xpath = crm_strdup_printf(XPATH_NODE_HISTORY, node->details->uname);
             history = get_xpath_object(xpath, data_set->input, LOG_NEVER);
             free(xpath);
 
             process_node_history(node, history, data_set);
         }
     }
 }
diff --git a/lib/pacemaker/pcmk_sched_allocate.c b/lib/pacemaker/pcmk_sched_allocate.c
index 1fe8251d48..fc426f2e18 100644
--- a/lib/pacemaker/pcmk_sched_allocate.c
+++ b/lib/pacemaker/pcmk_sched_allocate.c
@@ -1,804 +1,805 @@
 /*
- * Copyright 2004-2022 the Pacemaker project contributors
+ * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <crm/crm.h>
 #include <crm/cib.h>
 #include <crm/msg_xml.h>
 #include <crm/common/xml.h>
 #include <crm/common/xml_internal.h>
 
 #include <glib.h>
 
 #include <crm/pengine/status.h>
 #include <pacemaker-internal.h>
 #include "libpacemaker_private.h"
 
 CRM_TRACE_INIT_DATA(pacemaker);
 
 /*!
  * \internal
  * \brief Do deferred action checks after allocation
  *
  * When unpacking the resource history, the scheduler checks for resource
  * configurations that have changed since an action was run. However, at that
  * time, bundles using the REMOTE_CONTAINER_HACK don't have their final
  * parameter information, so instead they add a deferred check to a list. This
  * function processes one entry in that list.
  *
  * \param[in,out] rsc     Resource that action history is for
  * \param[in,out] node    Node that action history is for
  * \param[in]     rsc_op  Action history entry
  * \param[in]     check   Type of deferred check to do
  */
 static void
 check_params(pe_resource_t *rsc, pe_node_t *node, const xmlNode *rsc_op,
              enum pe_check_parameters check)
 {
     const char *reason = NULL;
     op_digest_cache_t *digest_data = NULL;
 
     switch (check) {
         case pe_check_active:
             if (pcmk__check_action_config(rsc, node, rsc_op)
                 && pe_get_failcount(node, rsc, NULL, pe_fc_effective, NULL)) {
                 reason = "action definition changed";
             }
             break;
 
         case pe_check_last_failure:
             digest_data = rsc_action_digest_cmp(rsc, rsc_op, node,
                                                 rsc->cluster);
             switch (digest_data->rc) {
                 case RSC_DIGEST_UNKNOWN:
                     crm_trace("Resource %s history entry %s on %s has "
                               "no digest to compare",
                               rsc->id, ID(rsc_op), node->details->id);
                     break;
                 case RSC_DIGEST_MATCH:
                     break;
                 default:
                     reason = "resource parameters have changed";
                     break;
             }
             break;
     }
     if (reason != NULL) {
         pe__clear_failcount(rsc, node, reason, rsc->cluster);
     }
 }
 
 /*!
  * \internal
  * \brief Check whether a resource has failcount clearing scheduled on a node
  *
  * \param[in] node  Node to check
  * \param[in] rsc   Resource to check
  *
  * \return true if \p rsc has failcount clearing scheduled on \p node,
  *         otherwise false
  */
 static bool
 failcount_clear_action_exists(pe_node_t *node, pe_resource_t *rsc)
 {
     GList *list = pe__resource_actions(rsc, node, CRM_OP_CLEAR_FAILCOUNT, TRUE);
 
     if (list != NULL) {
         g_list_free(list);
         return true;
     }
     return false;
 }
 
 /*!
  * \internal
  * \brief Ban a resource from a node if it reached its failure threshold there
  *
  * \param[in] rsc       Resource to check failure threshold for
  * \param[in] node      Node to check \p rsc on
  */
 static void
 check_failure_threshold(pe_resource_t *rsc, pe_node_t *node)
 {
     // If this is a collective resource, apply recursively to children instead
     if (rsc->children != NULL) {
         g_list_foreach(rsc->children, (GFunc) check_failure_threshold,
                        node);
         return;
 
     } else if (failcount_clear_action_exists(node, rsc)) {
         /* Don't force the resource away from this node due to a failcount
          * that's going to be cleared.
          *
          * @TODO Failcount clearing can be scheduled in
          * pcmk__handle_rsc_config_changes() via process_rsc_history(), or in
          * schedule_resource_actions() via check_params(). This runs well before
          * then, so it cannot detect those, meaning we might check the migration
          * threshold when we shouldn't. Worst case, we stop or move the
          * resource, then move it back in the next transition.
          */
         return;
 
     } else {
         pe_resource_t *failed = NULL;
 
         if (pcmk__threshold_reached(rsc, node, &failed)) {
             resource_location(failed, node, -INFINITY, "__fail_limit__",
                               rsc->cluster);
         }
     }
 }
 
 /*!
  * \internal
  * \brief If resource has exclusive discovery, ban node if not allowed
  *
  * Location constraints have a resource-discovery option that allows users to
  * specify where probes are done for the affected resource. If this is set to
  * exclusive, probes will only be done on nodes listed in exclusive constraints.
  * This function bans the resource from the node if the node is not listed.
  *
  * \param[in] rsc   Resource to check
  * \param[in] node  Node to check \p rsc on
  */
 static void
 apply_exclusive_discovery(pe_resource_t *rsc, pe_node_t *node)
 {
-    if (rsc->exclusive_discover || uber_parent(rsc)->exclusive_discover) {
+    if (rsc->exclusive_discover
+        || pe__const_top_resource(rsc, false)->exclusive_discover) {
         pe_node_t *match = NULL;
 
         // If this is a collective resource, apply recursively to children
         g_list_foreach(rsc->children, (GFunc) apply_exclusive_discovery, node);
 
         match = g_hash_table_lookup(rsc->allowed_nodes, node->details->id);
         if ((match != NULL)
             && (match->rsc_discover_mode != pe_discover_exclusive)) {
             match->weight = -INFINITY;
         }
     }
 }
 
 /*!
  * \internal
  * \brief Apply stickiness to a resource if appropriate
  *
  * \param[in] rsc       Resource to check for stickiness
  * \param[in] data_set  Cluster working set
  */
 static void
 apply_stickiness(pe_resource_t *rsc, pe_working_set_t *data_set)
 {
     pe_node_t *node = NULL;
 
     // If this is a collective resource, apply recursively to children instead
     if (rsc->children != NULL) {
         g_list_foreach(rsc->children, (GFunc) apply_stickiness, data_set);
         return;
     }
 
     /* A resource is sticky if it is managed, has stickiness configured, and is
      * active on a single node.
      */
     if (!pcmk_is_set(rsc->flags, pe_rsc_managed)
         || (rsc->stickiness < 1) || !pcmk__list_of_1(rsc->running_on)) {
         return;
     }
 
     node = rsc->running_on->data;
 
     /* In a symmetric cluster, stickiness can always be used. In an
      * asymmetric cluster, we have to check whether the resource is still
      * allowed on the node, so we don't keep the resource somewhere it is no
      * longer explicitly enabled.
      */
     if (!pcmk_is_set(rsc->cluster->flags, pe_flag_symmetric_cluster)
         && (pe_hash_table_lookup(rsc->allowed_nodes,
                                  node->details->id) == NULL)) {
         pe_rsc_debug(rsc,
                      "Ignoring %s stickiness because the cluster is "
                      "asymmetric and %s is not explicitly allowed",
                      rsc->id, pe__node_name(node));
         return;
     }
 
     pe_rsc_debug(rsc, "Resource %s has %d stickiness on %s",
                  rsc->id, rsc->stickiness, pe__node_name(node));
     resource_location(rsc, node, rsc->stickiness, "stickiness",
                       rsc->cluster);
 }
 
 /*!
  * \internal
  * \brief Apply shutdown locks for all resources as appropriate
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 apply_shutdown_locks(pe_working_set_t *data_set)
 {
     if (!pcmk_is_set(data_set->flags, pe_flag_shutdown_lock)) {
         return;
     }
     for (GList *iter = data_set->resources; iter != NULL; iter = iter->next) {
         pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
         rsc->cmds->shutdown_lock(rsc);
     }
 }
 
 /*!
  * \internal
  * \brief Calculate the number of available nodes in the cluster
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 count_available_nodes(pe_working_set_t *data_set)
 {
     if (pcmk_is_set(data_set->flags, pe_flag_no_compat)) {
         return;
     }
 
     // @COMPAT for API backward compatibility only (cluster does not use value)
     for (GList *iter = data_set->nodes; iter != NULL; iter = iter->next) {
         pe_node_t *node = (pe_node_t *) iter->data;
 
         if ((node != NULL) && (node->weight >= 0) && node->details->online
             && (node->details->type != node_ping)) {
             data_set->max_valid_nodes++;
         }
     }
     crm_trace("Online node count: %d", data_set->max_valid_nodes);
 }
 
 /*
  * \internal
  * \brief Apply node-specific scheduling criteria
  *
  * After the CIB has been unpacked, process node-specific scheduling criteria
  * including shutdown locks, location constraints, resource stickiness,
  * migration thresholds, and exclusive resource discovery.
  */
 static void
 apply_node_criteria(pe_working_set_t *data_set)
 {
     crm_trace("Applying node-specific scheduling criteria");
     apply_shutdown_locks(data_set);
     count_available_nodes(data_set);
     pcmk__apply_locations(data_set);
     g_list_foreach(data_set->resources, (GFunc) apply_stickiness, data_set);
 
     for (GList *node_iter = data_set->nodes; node_iter != NULL;
          node_iter = node_iter->next) {
         for (GList *rsc_iter = data_set->resources; rsc_iter != NULL;
              rsc_iter = rsc_iter->next) {
             pe_node_t *node = (pe_node_t *) node_iter->data;
             pe_resource_t *rsc = (pe_resource_t *) rsc_iter->data;
 
             check_failure_threshold(rsc, node);
             apply_exclusive_discovery(rsc, node);
         }
     }
 }
 
 /*!
  * \internal
  * \brief Allocate resources to nodes
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 allocate_resources(pe_working_set_t *data_set)
 {
     GList *iter = NULL;
 
     crm_trace("Allocating resources to nodes");
 
     if (!pcmk__str_eq(data_set->placement_strategy, "default", pcmk__str_casei)) {
         pcmk__sort_resources(data_set);
     }
     pcmk__show_node_capacities("Original", data_set);
 
     if (pcmk_is_set(data_set->flags, pe_flag_have_remote_nodes)) {
         /* Allocate remote connection resources first (which will also allocate
          * any colocation dependencies). If the connection is migrating, always
          * prefer the partial migration target.
          */
         for (iter = data_set->resources; iter != NULL; iter = iter->next) {
             pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
             if (rsc->is_remote_node) {
                 pe_rsc_trace(rsc, "Allocating remote connection resource '%s'",
                              rsc->id);
                 rsc->cmds->assign(rsc, rsc->partial_migration_target);
             }
         }
     }
 
     /* now do the rest of the resources */
     for (iter = data_set->resources; iter != NULL; iter = iter->next) {
         pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
         if (!rsc->is_remote_node) {
             pe_rsc_trace(rsc, "Allocating %s resource '%s'",
                          crm_element_name(rsc->xml), rsc->id);
             rsc->cmds->assign(rsc, NULL);
         }
     }
 
     pcmk__show_node_capacities("Remaining", data_set);
 }
 
 /*!
  * \internal
  * \brief Schedule fail count clearing on online nodes if resource is orphaned
  *
  * \param[in] rsc       Resource to check
  * \param[in] data_set  Cluster working set
  */
 static void
 clear_failcounts_if_orphaned(pe_resource_t *rsc, pe_working_set_t *data_set)
 {
     if (!pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
         return;
     }
     crm_trace("Clear fail counts for orphaned resource %s", rsc->id);
 
     /* There's no need to recurse into rsc->children because those
      * should just be unallocated clone instances.
      */
 
     for (GList *iter = data_set->nodes; iter != NULL; iter = iter->next) {
         pe_node_t *node = (pe_node_t *) iter->data;
         pe_action_t *clear_op = NULL;
 
         if (!node->details->online) {
             continue;
         }
         if (pe_get_failcount(node, rsc, NULL, pe_fc_effective, NULL) == 0) {
             continue;
         }
 
         clear_op = pe__clear_failcount(rsc, node, "it is orphaned", data_set);
 
         /* We can't use order_action_then_stop() here because its
          * pe_order_preserve breaks things
          */
         pcmk__new_ordering(clear_op->rsc, NULL, clear_op, rsc, stop_key(rsc),
                            NULL, pe_order_optional, data_set);
     }
 }
 
 /*!
  * \internal
  * \brief Schedule any resource actions needed
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 schedule_resource_actions(pe_working_set_t *data_set)
 {
     // Process deferred action checks
     pe__foreach_param_check(data_set, check_params);
     pe__free_param_checks(data_set);
 
     if (pcmk_is_set(data_set->flags, pe_flag_startup_probes)) {
         crm_trace("Scheduling probes");
         pcmk__schedule_probes(data_set);
     }
 
     if (pcmk_is_set(data_set->flags, pe_flag_stop_rsc_orphans)) {
         g_list_foreach(data_set->resources,
                        (GFunc) clear_failcounts_if_orphaned, data_set);
     }
 
     crm_trace("Scheduling resource actions");
     for (GList *iter = data_set->resources; iter != NULL; iter = iter->next) {
         pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
         rsc->cmds->create_actions(rsc);
     }
 }
 
 /*!
  * \internal
  * \brief Check whether a resource or any of its descendants are managed
  *
  * \param[in] rsc  Resource to check
  *
  * \return true if resource or any descendent is managed, otherwise false
  */
 static bool
 is_managed(const pe_resource_t *rsc)
 {
     if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
         return true;
     }
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         if (is_managed((pe_resource_t *) iter->data)) {
             return true;
         }
     }
     return false;
 }
 
 /*!
  * \internal
  * \brief Check whether any resources in the cluster are managed
  *
  * \param[in] data_set  Cluster working set
  *
  * \return true if any resource is managed, otherwise false
  */
 static bool
 any_managed_resources(pe_working_set_t *data_set)
 {
     for (GList *iter = data_set->resources; iter != NULL; iter = iter->next) {
         if (is_managed((pe_resource_t *) iter->data)) {
             return true;
         }
     }
     return false;
 }
 
 /*!
  * \internal
  * \brief Check whether a node requires fencing
  *
  * \param[in] node          Node to check
  * \param[in] have_managed  Whether any resource in cluster is managed
  * \param[in] data_set      Cluster working set
  *
  * \return true if \p node should be fenced, otherwise false
  */
 static bool
 needs_fencing(pe_node_t *node, bool have_managed, pe_working_set_t *data_set)
 {
     return have_managed && node->details->unclean
            && pe_can_fence(data_set, node);
 }
 
 /*!
  * \internal
  * \brief Check whether a node requires shutdown
  *
  * \param[in] node          Node to check
  *
  * \return true if \p node should be shut down, otherwise false
  */
 static bool
 needs_shutdown(pe_node_t *node)
 {
     if (pe__is_guest_or_remote_node(node)) {
        /* Do not send shutdown actions for Pacemaker Remote nodes.
         * @TODO We might come up with a good use for this in the future.
         */
         return false;
     }
     return node->details->online && node->details->shutdown;
 }
 
 /*!
  * \internal
  * \brief Track and order non-DC fencing
  *
  * \param[in] list    List of existing non-DC fencing actions
  * \param[in] action  Fencing action to prepend to \p list
  *
  * \return (Possibly new) head of \p list
  */
 static GList *
 add_nondc_fencing(GList *list, pe_action_t *action, pe_working_set_t *data_set)
 {
     if (!pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)
         && (list != NULL)) {
         /* Concurrent fencing is disabled, so order each non-DC
          * fencing in a chain. If there is any DC fencing or
          * shutdown, it will be ordered after the last action in the
          * chain later.
          */
         order_actions((pe_action_t *) list->data, action, pe_order_optional);
     }
     return g_list_prepend(list, action);
 }
 
 /*!
  * \internal
  * \brief Schedule a node for fencing
  *
  * \param[in] node      Node that requires fencing
  * \param[in] data_set  Cluster working set
  */
 static pe_action_t *
 schedule_fencing(pe_node_t *node, pe_working_set_t *data_set)
 {
     pe_action_t *fencing = pe_fence_op(node, NULL, FALSE, "node is unclean",
                                        FALSE, data_set);
 
     pe_warn("Scheduling node %s for fencing", pe__node_name(node));
     pcmk__order_vs_fence(fencing, data_set);
     return fencing;
 }
 
 /*!
  * \internal
  * \brief Create and order node fencing and shutdown actions
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 schedule_fencing_and_shutdowns(pe_working_set_t *data_set)
 {
     pe_action_t *dc_down = NULL;
     bool integrity_lost = false;
     bool have_managed = any_managed_resources(data_set);
     GList *fencing_ops = NULL;
     GList *shutdown_ops = NULL;
 
     crm_trace("Scheduling fencing and shutdowns as needed");
     if (!have_managed) {
         crm_notice("No fencing will be done until there are resources to manage");
     }
 
     // Check each node for whether it needs fencing or shutdown
     for (GList *iter = data_set->nodes; iter != NULL; iter = iter->next) {
         pe_node_t *node = (pe_node_t *) iter->data;
         pe_action_t *fencing = NULL;
 
         /* Guest nodes are "fenced" by recovering their container resource,
          * so handle them separately.
          */
         if (pe__is_guest_node(node)) {
             if (node->details->remote_requires_reset && have_managed
                 && pe_can_fence(data_set, node)) {
                 pcmk__fence_guest(node);
             }
             continue;
         }
 
         if (needs_fencing(node, have_managed, data_set)) {
             fencing = schedule_fencing(node, data_set);
 
             // Track DC and non-DC fence actions separately
             if (node->details->is_dc) {
                 dc_down = fencing;
             } else {
                 fencing_ops = add_nondc_fencing(fencing_ops, fencing, data_set);
             }
 
         } else if (needs_shutdown(node)) {
             pe_action_t *down_op = pcmk__new_shutdown_action(node);
 
             // Track DC and non-DC shutdown actions separately
             if (node->details->is_dc) {
                 dc_down = down_op;
             } else {
                 shutdown_ops = g_list_prepend(shutdown_ops, down_op);
             }
         }
 
         if ((fencing == NULL) && node->details->unclean) {
             integrity_lost = true;
             pe_warn("Node %s is unclean but cannot be fenced",
                     pe__node_name(node));
         }
     }
 
     if (integrity_lost) {
         if (!pcmk_is_set(data_set->flags, pe_flag_stonith_enabled)) {
             pe_warn("Resource functionality and data integrity cannot be "
                     "guaranteed (configure, enable, and test fencing to "
                     "correct this)");
 
         } else if (!pcmk_is_set(data_set->flags, pe_flag_have_quorum)) {
             crm_notice("Unclean nodes will not be fenced until quorum is "
                        "attained or no-quorum-policy is set to ignore");
         }
     }
 
     if (dc_down != NULL) {
         /* Order any non-DC shutdowns before any DC shutdown, to avoid repeated
          * DC elections. However, we don't want to order non-DC shutdowns before
          * a DC *fencing*, because even though we don't want a node that's
          * shutting down to become DC, the DC fencing could be ordered before a
          * clone stop that's also ordered before the shutdowns, thus leading to
          * a graph loop.
          */
         if (pcmk__str_eq(dc_down->task, CRM_OP_SHUTDOWN, pcmk__str_none)) {
             pcmk__order_after_each(dc_down, shutdown_ops);
         }
 
         // Order any non-DC fencing before any DC fencing or shutdown
 
         if (pcmk_is_set(data_set->flags, pe_flag_concurrent_fencing)) {
             /* With concurrent fencing, order each non-DC fencing action
              * separately before any DC fencing or shutdown.
              */
             pcmk__order_after_each(dc_down, fencing_ops);
         } else if (fencing_ops != NULL) {
             /* Without concurrent fencing, the non-DC fencing actions are
              * already ordered relative to each other, so we just need to order
              * the DC fencing after the last action in the chain (which is the
              * first item in the list).
              */
             order_actions((pe_action_t *) fencing_ops->data, dc_down,
                           pe_order_optional);
         }
     }
     g_list_free(fencing_ops);
     g_list_free(shutdown_ops);
 }
 
 static void
 log_resource_details(pe_working_set_t *data_set)
 {
     pcmk__output_t *out = data_set->priv;
     GList *all = NULL;
 
     /* We need a list of nodes that we are allowed to output information for.
      * This is necessary because out->message for all the resource-related
      * messages expects such a list, due to the `crm_mon --node=` feature.  Here,
      * we just make it a list of all the nodes.
      */
     all = g_list_prepend(all, (gpointer) "*");
 
     for (GList *item = data_set->resources; item != NULL; item = item->next) {
         pe_resource_t *rsc = (pe_resource_t *) item->data;
 
         // Log all resources except inactive orphans
         if (!pcmk_is_set(rsc->flags, pe_rsc_orphan)
             || (rsc->role != RSC_ROLE_STOPPED)) {
             out->message(out, crm_map_element_name(rsc->xml), 0, rsc, all, all);
         }
     }
 
     g_list_free(all);
 }
 
 static void
 log_all_actions(pe_working_set_t *data_set)
 {
     /* This only ever outputs to the log, so ignore whatever output object was
      * previously set and just log instead.
      */
     pcmk__output_t *prev_out = data_set->priv;
     pcmk__output_t *out = NULL;
 
     if (pcmk__log_output_new(&out) != pcmk_rc_ok) {
         return;
     }
 
     pe__register_messages(out);
     pcmk__register_lib_messages(out);
     pcmk__output_set_log_level(out, LOG_NOTICE);
     data_set->priv = out;
 
     out->begin_list(out, NULL, NULL, "Actions");
     pcmk__output_actions(data_set);
     out->end_list(out);
     out->finish(out, CRM_EX_OK, true, NULL);
     pcmk__output_free(out);
 
     data_set->priv = prev_out;
 }
 
 /*!
  * \internal
  * \brief Log all required but unrunnable actions at trace level
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 log_unrunnable_actions(pe_working_set_t *data_set)
 {
     const uint64_t flags = pe_action_optional|pe_action_runnable|pe_action_pseudo;
 
     crm_trace("Required but unrunnable actions:");
     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
         pe_action_t *action = (pe_action_t *) iter->data;
 
         if (!pcmk_any_flags_set(action->flags, flags)) {
             pcmk__log_action("\t", action, true);
         }
     }
 }
 
 /*!
  * \internal
  * \brief Unpack the CIB for scheduling
  *
  * \param[in] cib       CIB XML to unpack (may be NULL if previously unpacked)
  * \param[in] flags     Working set flags to set in addition to defaults
  * \param[in] data_set  Cluster working set
  */
 static void
 unpack_cib(xmlNode *cib, unsigned long long flags, pe_working_set_t *data_set)
 {
     const char* localhost_save = NULL;
 
     if (pcmk_is_set(data_set->flags, pe_flag_have_status)) {
         crm_trace("Reusing previously calculated cluster status");
         pe__set_working_set_flags(data_set, flags);
         return;
     }
 
     if (data_set->localhost) {
         localhost_save = data_set->localhost;
     }
 
     CRM_ASSERT(cib != NULL);
     crm_trace("Calculating cluster status");
 
     /* This will zero the entire struct without freeing anything first, so
      * callers should never call pcmk__schedule_actions() with a populated data
      * set unless pe_flag_have_status is set (i.e. cluster_status() was
      * previously called, whether directly or via pcmk__schedule_actions()).
      */
     set_working_set_defaults(data_set);
 
     if (localhost_save) {
         data_set->localhost = localhost_save;
     }
 
     pe__set_working_set_flags(data_set, flags);
     data_set->input = cib;
     cluster_status(data_set); // Sets pe_flag_have_status
 }
 
 /*!
  * \internal
  * \brief Run the scheduler for a given CIB
  *
  * \param[in]     cib       CIB XML to use as scheduler input
  * \param[in]     flags     Working set flags to set in addition to defaults
  * \param[in,out] data_set  Cluster working set
  */
 void
 pcmk__schedule_actions(xmlNode *cib, unsigned long long flags,
                        pe_working_set_t *data_set)
 {
     unpack_cib(cib, flags, data_set);
     pcmk__set_allocation_methods(data_set);
     pcmk__apply_node_health(data_set);
     pcmk__unpack_constraints(data_set);
     if (pcmk_is_set(data_set->flags, pe_flag_check_config)) {
         return;
     }
 
     if (!pcmk_is_set(data_set->flags, pe_flag_quick_location) &&
          pcmk__is_daemon) {
         log_resource_details(data_set);
     }
 
     apply_node_criteria(data_set);
 
     if (pcmk_is_set(data_set->flags, pe_flag_quick_location)) {
         return;
     }
 
     pcmk__create_internal_constraints(data_set);
     pcmk__handle_rsc_config_changes(data_set);
     allocate_resources(data_set);
     schedule_resource_actions(data_set);
 
     /* Remote ordering constraints need to happen prior to calculating fencing
      * because it is one more place we can mark nodes as needing fencing.
      */
     pcmk__order_remote_connection_actions(data_set);
 
     schedule_fencing_and_shutdowns(data_set);
     pcmk__apply_orderings(data_set);
     log_all_actions(data_set);
     pcmk__create_graph(data_set);
 
     if (get_crm_log_level() == LOG_TRACE) {
         log_unrunnable_actions(data_set);
     }
 }
diff --git a/lib/pacemaker/pcmk_sched_group.c b/lib/pacemaker/pcmk_sched_group.c
index 668fef76a5..51e866b512 100644
--- a/lib/pacemaker/pcmk_sched_group.c
+++ b/lib/pacemaker/pcmk_sched_group.c
@@ -1,763 +1,764 @@
 /*
- * Copyright 2004-2022 the Pacemaker project contributors
+ * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <stdbool.h>
 
 #include <crm/msg_xml.h>
 
 #include <pacemaker-internal.h>
 #include "libpacemaker_private.h"
 
 /*!
  * \internal
  * \brief Expand a group's colocations to its members
  *
  * \param[in,out] rsc  Group resource
  */
 static void
 expand_group_colocations(pe_resource_t *rsc)
 {
     pe_resource_t *member = NULL;
     bool any_unmanaged = false;
     GList *item = NULL;
 
     if (rsc->children == NULL) {
         return;
     }
 
     // Treat "group with R" colocations as "first member with R"
     member = (pe_resource_t *) rsc->children->data;
     for (item = rsc->rsc_cons; item != NULL; item = item->next) {
         pcmk__add_this_with(member, (pcmk__colocation_t *) (item->data));
     }
 
 
     /* The above works for the whole group because each group member is
      * colocated with the previous one.
      *
      * However, there is a special case when a group has a mandatory colocation
      * with a resource that can't start. In that case,
      * pcmk__block_colocation_dependents() will ensure that dependent resources
      * in mandatory colocations (i.e. the first member for groups) can't start
      * either. But if any group member is unmanaged and already started, the
      * internal group colocations are no longer sufficient to make that apply to
      * later members.
      *
      * To handle that case, add mandatory colocations to each member after the
      * first.
      */
     any_unmanaged = !pcmk_is_set(member->flags, pe_rsc_managed);
     for (item = rsc->children->next; item != NULL; item = item->next) {
         member = item->data;
         if (any_unmanaged) {
             for (GList *cons_iter = rsc->rsc_cons; cons_iter != NULL;
                  cons_iter = cons_iter->next) {
 
                 pcmk__colocation_t *constraint = (pcmk__colocation_t *) cons_iter->data;
 
                 if (constraint->score == INFINITY) {
                     pcmk__add_this_with(member, constraint);
                 }
             }
         } else if (!pcmk_is_set(member->flags, pe_rsc_managed)) {
             any_unmanaged = true;
         }
     }
 
     g_list_free(rsc->rsc_cons);
     rsc->rsc_cons = NULL;
 
     // Treat "R with group" colocations as "R with last member"
     member = pe__last_group_member(rsc);
     for (item = rsc->rsc_cons_lhs; item != NULL; item = item->next) {
         pcmk__add_with_this(member, (pcmk__colocation_t *) (item->data));
     }
     g_list_free(rsc->rsc_cons_lhs);
     rsc->rsc_cons_lhs = NULL;
 }
 
 /*!
  * \internal
  * \brief Assign a group resource to a node
  *
  * \param[in,out] rsc     Group resource to assign to a node
  * \param[in]     prefer  Node to prefer, if all else is equal
  *
  * \return Node that \p rsc is assigned to, if assigned entirely to one node
  */
 pe_node_t *
 pcmk__group_assign(pe_resource_t *rsc, const pe_node_t *prefer)
 {
     pe_node_t *first_assigned_node = NULL;
     pe_resource_t *first_member = NULL;
 
     CRM_ASSERT(rsc != NULL);
 
     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
         return rsc->allocated_to; // Assignment already done
     }
     if (pcmk_is_set(rsc->flags, pe_rsc_allocating)) {
         pe_rsc_debug(rsc, "Assignment dependency loop detected involving %s",
                      rsc->id);
         return NULL;
     }
 
     if (rsc->children == NULL) {
         // No members to assign
         pe__clear_resource_flags(rsc, pe_rsc_provisional);
         return NULL;
     }
 
     pe__set_resource_flags(rsc, pe_rsc_allocating);
     first_member = (pe_resource_t *) rsc->children->data;
     rsc->role = first_member->role;
 
     expand_group_colocations(rsc);
 
     pe__show_node_weights(!pcmk_is_set(rsc->cluster->flags, pe_flag_show_scores),
                           rsc, __func__, rsc->allowed_nodes, rsc->cluster);
 
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *member = (pe_resource_t *) iter->data;
         pe_node_t *node = NULL;
 
         pe_rsc_trace(rsc, "Assigning group %s member %s",
                      rsc->id, member->id);
         node = member->cmds->assign(member, prefer);
         if (first_assigned_node == NULL) {
             first_assigned_node = node;
         }
     }
 
     pe__set_next_role(rsc, first_member->next_role, "first group member");
     pe__clear_resource_flags(rsc, pe_rsc_allocating|pe_rsc_provisional);
 
     if (!pe__group_flag_is_set(rsc, pe__group_colocated)) {
         return NULL;
     }
     return first_assigned_node;
 }
 
 /*!
  * \internal
  * \brief Create a pseudo-operation for a group as an ordering point
  *
  * \param[in,out] group   Group resource to create action for
  * \param[in]     action  Action name
  *
  * \return Newly created pseudo-operation
  */
 static pe_action_t *
 create_group_pseudo_op(pe_resource_t *group, const char *action)
 {
     pe_action_t *op = custom_action(group, pcmk__op_key(group->id, action, 0),
                                     action, NULL, TRUE, TRUE, group->cluster);
     pe__set_action_flags(op, pe_action_pseudo|pe_action_runnable);
     return op;
 }
 
 /*!
  * \internal
  * \brief Create all actions needed for a given group resource
  *
  * \param[in,out] rsc  Group resource to create actions for
  */
 void
 pcmk__group_create_actions(pe_resource_t *rsc)
 {
     CRM_ASSERT(rsc != NULL);
 
     pe_rsc_trace(rsc, "Creating actions for group %s", rsc->id);
 
     // Create actions for individual group members
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *member = (pe_resource_t *) iter->data;
 
         member->cmds->create_actions(member);
     }
 
     // Create pseudo-actions for group itself to serve as ordering points
     create_group_pseudo_op(rsc, RSC_START);
     create_group_pseudo_op(rsc, RSC_STARTED);
     create_group_pseudo_op(rsc, RSC_STOP);
     create_group_pseudo_op(rsc, RSC_STOPPED);
     if (crm_is_true(g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_PROMOTABLE))) {
         create_group_pseudo_op(rsc, RSC_DEMOTE);
         create_group_pseudo_op(rsc, RSC_DEMOTED);
         create_group_pseudo_op(rsc, RSC_PROMOTE);
         create_group_pseudo_op(rsc, RSC_PROMOTED);
     }
 }
 
 // User data for member_internal_constraints()
 struct member_data {
     // These could be derived from member but this avoids some function calls
     bool ordered;
     bool colocated;
     bool promotable;
 
     pe_resource_t *last_active;
     pe_resource_t *previous_member;
 };
 
 /*!
  * \internal
  * \brief Create implicit constraints needed for a group member
  *
  * \param[in,out] data       Group member to create implicit constraints for
  * \param[in,out] user_data  Group member to create implicit constraints for
  */
 static void
 member_internal_constraints(gpointer data, gpointer user_data)
 {
     pe_resource_t *member = (pe_resource_t *) data;
     struct member_data *member_data = (struct member_data *) user_data;
 
     // For ordering demote vs demote or stop vs stop
     uint32_t down_flags = pe_order_implies_first_printed;
 
     // For ordering demote vs demoted or stop vs stopped
     uint32_t post_down_flags = pe_order_implies_then_printed;
 
     // Create the individual member's implicit constraints
     member->cmds->internal_constraints(member);
 
     if (member_data->previous_member == NULL) {
         // This is first member
         if (member_data->ordered) {
             pe__set_order_flags(down_flags, pe_order_optional);
             post_down_flags = pe_order_implies_then;
         }
 
     } else if (member_data->colocated) {
         // Colocate this member with the previous one
         pcmk__new_colocation("group:internal_colocation", NULL, INFINITY,
                              member, member_data->previous_member, NULL, NULL,
                              pcmk_is_set(member->flags, pe_rsc_critical),
                              member->cluster);
     }
 
     if (member_data->promotable) {
         // Demote group -> demote member -> group is demoted
         pcmk__order_resource_actions(member->parent, RSC_DEMOTE,
                                      member, RSC_DEMOTE, down_flags);
         pcmk__order_resource_actions(member, RSC_DEMOTE,
                                      member->parent, RSC_DEMOTED,
                                      post_down_flags);
 
         // Promote group -> promote member -> group is promoted
         pcmk__order_resource_actions(member, RSC_PROMOTE,
                                      member->parent, RSC_PROMOTED,
                                      pe_order_runnable_left
                                      |pe_order_implies_then
                                      |pe_order_implies_then_printed);
         pcmk__order_resource_actions(member->parent, RSC_PROMOTE,
                                      member, RSC_PROMOTE,
                                      pe_order_implies_first_printed);
     }
 
     // Stop group -> stop member -> group is stopped
     pcmk__order_stops(member->parent, member, down_flags);
     pcmk__order_resource_actions(member, RSC_STOP, member->parent, RSC_STOPPED,
                                  post_down_flags);
 
     // Start group -> start member -> group is started
     pcmk__order_starts(member->parent, member, pe_order_implies_first_printed);
     pcmk__order_resource_actions(member, RSC_START, member->parent, RSC_STARTED,
                                  pe_order_runnable_left
                                  |pe_order_implies_then
                                  |pe_order_implies_then_printed);
 
     if (!member_data->ordered) {
         pcmk__order_starts(member->parent, member,
                            pe_order_implies_then
                            |pe_order_runnable_left
                            |pe_order_implies_first_printed);
         if (member_data->promotable) {
             pcmk__order_resource_actions(member->parent, RSC_PROMOTE, member,
                                          RSC_PROMOTE,
                                          pe_order_implies_then
                                          |pe_order_runnable_left
                                          |pe_order_implies_first_printed);
         }
 
     } else if (member_data->previous_member == NULL) {
         pcmk__order_starts(member->parent, member, pe_order_none);
         if (member_data->promotable) {
             pcmk__order_resource_actions(member->parent, RSC_PROMOTE, member,
                                          RSC_PROMOTE, pe_order_none);
         }
 
     } else {
         // Order this member relative to the previous one
         pcmk__order_starts(member_data->previous_member, member,
                            pe_order_implies_then|pe_order_runnable_left);
         pcmk__order_stops(member, member_data->previous_member,
                           pe_order_optional|pe_order_restart);
         if (member_data->promotable) {
             pcmk__order_resource_actions(member_data->previous_member,
                                          RSC_PROMOTE, member, RSC_PROMOTE,
                                          pe_order_implies_then
                                          |pe_order_runnable_left);
             pcmk__order_resource_actions(member, RSC_DEMOTE,
                                          member_data->previous_member,
                                          RSC_DEMOTE, pe_order_optional);
         }
     }
 
     // Make sure partially active groups shut down in sequence
     if (member->running_on != NULL) {
         if (member_data->ordered && (member_data->previous_member != NULL)
             && (member_data->previous_member->running_on == NULL)
             && (member_data->last_active != NULL)
             && (member_data->last_active->running_on != NULL)) {
             pcmk__order_stops(member, member_data->last_active, pe_order_optional);
         }
         member_data->last_active = member;
     }
 
     member_data->previous_member = member;
 }
 
 /*!
  * \internal
  * \brief Create implicit constraints needed for a group resource
  *
  * \param[in,out] rsc  Group resource to create implicit constraints for
  */
 void
 pcmk__group_internal_constraints(pe_resource_t *rsc)
 {
     struct member_data member_data = { false, };
 
     CRM_ASSERT(rsc != NULL);
 
     /* Order group pseudo-actions relative to each other for restarting:
      * stop group -> group is stopped -> start group -> group is started
      */
     pcmk__order_resource_actions(rsc, RSC_STOP, rsc, RSC_STOPPED,
                                  pe_order_runnable_left);
     pcmk__order_resource_actions(rsc, RSC_STOPPED, rsc, RSC_START,
                                  pe_order_optional);
     pcmk__order_resource_actions(rsc, RSC_START, rsc, RSC_STARTED,
                                  pe_order_runnable_left);
 
     member_data.ordered = pe__group_flag_is_set(rsc, pe__group_ordered);
     member_data.colocated = pe__group_flag_is_set(rsc, pe__group_colocated);
-    member_data.promotable = pcmk_is_set(uber_parent(rsc)->flags, pe_rsc_promotable);
+    member_data.promotable = pcmk_is_set(pe__const_top_resource(rsc, false)->flags,
+                                         pe_rsc_promotable);
     g_list_foreach(rsc->children, member_internal_constraints, &member_data);
 }
 
 /*!
  * \internal
  * \brief Apply a colocation's score to node weights or resource priority
  *
  * Given a colocation constraint for a group with some other resource, apply the
  * score to the dependent's allowed node weights (if we are still placing
  * resources) or priority (if we are choosing promotable clone instance roles).
  *
  * \param[in,out] dependent      Dependent group resource in colocation
  * \param[in]     primary        Primary resource in colocation
  * \param[in]     colocation     Colocation constraint to apply
  */
 static void
 colocate_group_with(pe_resource_t *dependent, const pe_resource_t *primary,
                     const pcmk__colocation_t *colocation)
 {
     pe_resource_t *member = NULL;
 
     if (dependent->children == NULL) {
         return;
     }
 
     pe_rsc_trace(primary, "Processing %s (group %s with %s) for dependent",
                  colocation->id, dependent->id, primary->id);
 
     if (pe__group_flag_is_set(dependent, pe__group_colocated)) {
         // Colocate first member (internal colocations will handle the rest)
         member = (pe_resource_t *) dependent->children->data;
         member->cmds->apply_coloc_score(member, primary, colocation, true);
         return;
     }
 
     if (colocation->score >= INFINITY) {
         pcmk__config_err("%s: Cannot perform mandatory colocation between "
                          "non-colocated group and %s",
                          dependent->id, primary->id);
         return;
     }
 
     // Colocate each member individually
     for (GList *iter = dependent->children; iter != NULL; iter = iter->next) {
         member = (pe_resource_t *) iter->data;
         member->cmds->apply_coloc_score(member, primary, colocation, true);
     }
 }
 
 /*!
  * \internal
  * \brief Apply a colocation's score to node weights or resource priority
  *
  * Given a colocation constraint for some other resource with a group, apply the
  * score to the dependent's allowed node weights (if we are still placing
  * resources) or priority (if we are choosing promotable clone instance roles).
  *
  * \param[in,out] dependent      Dependent resource in colocation
  * \param[in]     primary        Primary group resource in colocation
  * \param[in]     colocation     Colocation constraint to apply
  */
 static void
 colocate_with_group(pe_resource_t *dependent, const pe_resource_t *primary,
                     const pcmk__colocation_t *colocation)
 {
     pe_resource_t *member = NULL;
 
     pe_rsc_trace(primary,
                  "Processing colocation %s (%s with group %s) for primary",
                  colocation->id, dependent->id, primary->id);
 
     if (pcmk_is_set(primary->flags, pe_rsc_provisional)) {
         return;
     }
 
     if (pe__group_flag_is_set(primary, pe__group_colocated)) {
 
         if (colocation->score >= INFINITY) {
             /* For mandatory colocations, the entire group must be assignable
              * (and in the specified role if any), so apply the colocation based
              * on the last member.
              */
             member = pe__last_group_member(primary);
         } else if (primary->children != NULL) {
             /* For optional colocations, whether the group is partially or fully
              * up doesn't matter, so apply the colocation based on the first
              * member.
              */
             member = (pe_resource_t *) primary->children->data;
         }
         if (member == NULL) {
             return; // Nothing to colocate with
         }
 
         member->cmds->apply_coloc_score(dependent, member, colocation, false);
         return;
     }
 
     if (colocation->score >= INFINITY) {
         pcmk__config_err("%s: Cannot perform mandatory colocation with"
                          " non-colocated group %s",
                          dependent->id, primary->id);
         return;
     }
 
     // Colocate dependent with each member individually
     for (GList *iter = primary->children; iter != NULL; iter = iter->next) {
         member = (pe_resource_t *) iter->data;
         member->cmds->apply_coloc_score(dependent, member, colocation, false);
     }
 }
 
 /*!
  * \internal
  * \brief Apply a colocation's score to node weights or resource priority
  *
  * Given a colocation constraint, apply its score to the dependent's
  * allowed node weights (if we are still placing resources) or priority (if
  * we are choosing promotable clone instance roles).
  *
  * \param[in,out] dependent      Dependent resource in colocation
  * \param[in]     primary        Primary resource in colocation
  * \param[in]     colocation     Colocation constraint to apply
  * \param[in]     for_dependent  true if called on behalf of dependent
  */
 void
 pcmk__group_apply_coloc_score(pe_resource_t *dependent,
                               const pe_resource_t *primary,
                               const pcmk__colocation_t *colocation,
                               bool for_dependent)
 {
     CRM_ASSERT((dependent != NULL) && (primary != NULL)
                && (colocation != NULL));
 
     if (for_dependent) {
         colocate_group_with(dependent, primary, colocation);
 
     } else {
         // Method should only be called for primitive dependents
         CRM_ASSERT(dependent->variant == pe_native);
 
         colocate_with_group(dependent, primary, colocation);
     }
 }
 
 /*!
  * \internal
  * \brief Return action flags for a given group resource action
  *
  * \param[in,out] action  Group action to get flags for
  * \param[in]     node    If not NULL, limit effects to this node
  *
  * \return Flags appropriate to \p action on \p node
  */
 enum pe_action_flags
 pcmk__group_action_flags(pe_action_t *action, const pe_node_t *node)
 {
     // Default flags for a group action
     enum pe_action_flags flags = pe_action_optional
                                  |pe_action_runnable
                                  |pe_action_pseudo;
 
     CRM_ASSERT(action != NULL);
 
     // Update flags considering each member's own flags for same action
     for (GList *iter = action->rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *member = (pe_resource_t *) iter->data;
 
         // Check whether member has the same action
         enum action_tasks task = get_complex_task(member, action->task, TRUE);
         const char *task_s = task2text(task);
         pe_action_t *member_action = find_first_action(member->actions, NULL,
                                                        task_s, node);
 
         if (member_action != NULL) {
             enum pe_action_flags member_flags;
 
             member_flags = member->cmds->action_flags(member_action, node);
 
             // Group action is mandatory if any member action is
             if (pcmk_is_set(flags, pe_action_optional)
                 && !pcmk_is_set(member_flags, pe_action_optional)) {
                 pe_rsc_trace(action->rsc, "%s is mandatory because %s is",
                              action->uuid, member_action->uuid);
                 pe__clear_raw_action_flags(flags, "group action",
                                            pe_action_optional);
                 pe__clear_action_flags(action, pe_action_optional);
             }
 
             // Group action is unrunnable if any member action is
             if (!pcmk__str_eq(task_s, action->task, pcmk__str_none)
                 && pcmk_is_set(flags, pe_action_runnable)
                 && !pcmk_is_set(member_flags, pe_action_runnable)) {
 
                 pe_rsc_trace(action->rsc, "%s is unrunnable because %s is",
                              action->uuid, member_action->uuid);
                 pe__clear_raw_action_flags(flags, "group action",
                                            pe_action_runnable);
                 pe__clear_action_flags(action, pe_action_runnable);
             }
 
         /* Group (pseudo-)actions other than stop or demote are unrunnable
          * unless every member will do it.
          */
         } else if ((task != stop_rsc) && (task != action_demote)) {
             pe_rsc_trace(action->rsc,
                          "%s is not runnable because %s will not %s",
                          action->uuid, member->id, task_s);
             pe__clear_raw_action_flags(flags, "group action",
                                        pe_action_runnable);
         }
     }
 
     return flags;
 }
 
 /*!
  * \internal
  * \brief Update two actions according to an ordering between them
  *
  * Given information about an ordering of two actions, update the actions'
  * flags (and runnable_before members if appropriate) as appropriate for the
  * ordering. In some cases, the ordering could be disabled as well.
  *
  * \param[in,out] first     'First' action in an ordering
  * \param[in,out] then      'Then' action in an ordering
  * \param[in]     node      If not NULL, limit scope of ordering to this node
  *                          (only used when interleaving instances)
  * \param[in]     flags     Action flags for \p first for ordering purposes
  * \param[in]     filter    Action flags to limit scope of certain updates (may
  *                          include pe_action_optional to affect only mandatory
  *                          actions, and pe_action_runnable to affect only
  *                          runnable actions)
  * \param[in]     type      Group of enum pe_ordering flags to apply
  * \param[in,out] data_set  Cluster working set
  *
  * \return Group of enum pcmk__updated flags indicating what was updated
  */
 uint32_t
 pcmk__group_update_ordered_actions(pe_action_t *first, pe_action_t *then,
                                    const pe_node_t *node, uint32_t flags,
                                    uint32_t filter, uint32_t type,
                                    pe_working_set_t *data_set)
 {
     uint32_t changed = pcmk__updated_none;
 
     CRM_ASSERT((first != NULL) && (then != NULL) && (data_set != NULL));
 
     // Group method can be called only for group action as "then" action
     CRM_ASSERT(then->rsc != NULL);
 
     // Update the actions for the group itself
     changed |= pcmk__update_ordered_actions(first, then, node, flags, filter,
                                             type, data_set);
 
     // Update the actions for each group member
     for (GList *iter = then->rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *member = (pe_resource_t *) iter->data;
 
         pe_action_t *member_action = find_first_action(member->actions, NULL,
                                                        then->task, node);
 
         if (member_action != NULL) {
             changed |= member->cmds->update_ordered_actions(first,
                                                             member_action, node,
                                                             flags, filter, type,
                                                             data_set);
         }
     }
     return changed;
 }
 
 /*!
  * \internal
  * \brief Apply a location constraint to a group's allowed node scores
  *
  * \param[in,out] rsc       Group resource to apply constraint to
  * \param[in,out] location  Location constraint to apply
  */
 void
 pcmk__group_apply_location(pe_resource_t *rsc, pe__location_t *location)
 {
     GList *node_list_orig = NULL;
     GList *node_list_copy = NULL;
     bool reset_scores = true;
 
     CRM_ASSERT((rsc != NULL) && (location != NULL));
 
     node_list_orig = location->node_list_rh;
     node_list_copy = pcmk__copy_node_list(node_list_orig, true);
     reset_scores = pe__group_flag_is_set(rsc, pe__group_colocated);
 
     // Apply the constraint for the group itself (updates node scores)
     pcmk__apply_location(rsc, location);
 
     // Apply the constraint for each member
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *member = (pe_resource_t *) iter->data;
 
         member->cmds->apply_location(member, location);
 
         if (reset_scores) {
             /* The first member of colocated groups needs to use the original
              * node scores, but subsequent members should work on a copy, since
              * the first member's scores already incorporate theirs.
              */
             reset_scores = false;
             location->node_list_rh = node_list_copy;
         }
     }
 
     location->node_list_rh = node_list_orig;
     g_list_free_full(node_list_copy, free);
 }
 
 // Group implementation of resource_alloc_functions_t:colocated_resources()
 GList *
 pcmk__group_colocated_resources(pe_resource_t *rsc, pe_resource_t *orig_rsc,
                                 GList *colocated_rscs)
 {
     pe_resource_t *member = NULL;
 
     CRM_ASSERT(rsc != NULL);
 
     if (orig_rsc == NULL) {
         orig_rsc = rsc;
     }
 
     if (pe__group_flag_is_set(rsc, pe__group_colocated)
         || pe_rsc_is_clone(rsc->parent)) {
         /* This group has colocated members and/or is cloned -- either way,
          * add every child's colocated resources to the list.
          */
         for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
             member = (pe_resource_t *) iter->data;
             colocated_rscs = member->cmds->colocated_resources(member, orig_rsc,
                                                                colocated_rscs);
         }
 
     } else if (rsc->children != NULL) {
         /* This group's members are not colocated, and the group is not cloned,
          * so just add the first child's colocations to the list.
          */
         member = (pe_resource_t *) rsc->children->data;
         colocated_rscs = member->cmds->colocated_resources(member, orig_rsc,
                                                            colocated_rscs);
     }
 
     // Now consider colocations where the group itself is specified
     colocated_rscs = pcmk__colocated_resources(rsc, orig_rsc, colocated_rscs);
 
     return colocated_rscs;
 }
 
 // Group implementation of resource_alloc_functions_t:add_utilization()
 void
 pcmk__group_add_utilization(const pe_resource_t *rsc,
                             const pe_resource_t *orig_rsc, GList *all_rscs,
                             GHashTable *utilization)
 {
     pe_resource_t *member = NULL;
 
     CRM_ASSERT((rsc != NULL) && (orig_rsc != NULL) && (utilization != NULL));
 
     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
         return;
     }
 
     pe_rsc_trace(orig_rsc, "%s: Adding group %s as colocated utilization",
                  orig_rsc->id, rsc->id);
     if (pe__group_flag_is_set(rsc, pe__group_colocated)
         || pe_rsc_is_clone(rsc->parent)) {
         // Every group member will be on same node, so sum all members
         for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
             member = (pe_resource_t *) iter->data;
 
             if (pcmk_is_set(member->flags, pe_rsc_provisional)
                 && (g_list_find(all_rscs, member) == NULL)) {
                 member->cmds->add_utilization(member, orig_rsc, all_rscs,
                                               utilization);
             }
         }
 
     } else if (rsc->children != NULL) {
         // Just add first member's utilization
         member = (pe_resource_t *) rsc->children->data;
         if ((member != NULL)
             && pcmk_is_set(member->flags, pe_rsc_provisional)
             && (g_list_find(all_rscs, member) == NULL)) {
 
             member->cmds->add_utilization(member, orig_rsc, all_rscs,
                                           utilization);
         }
     }
 }
 
 // Group implementation of resource_alloc_functions_t:shutdown_lock()
 void
 pcmk__group_shutdown_lock(pe_resource_t *rsc)
 {
     CRM_ASSERT(rsc != NULL);
 
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *member = (pe_resource_t *) iter->data;
 
         member->cmds->shutdown_lock(member);
     }
 }
diff --git a/lib/pacemaker/pcmk_sched_primitive.c b/lib/pacemaker/pcmk_sched_primitive.c
index 0b822b0891..33fb3c0720 100644
--- a/lib/pacemaker/pcmk_sched_primitive.c
+++ b/lib/pacemaker/pcmk_sched_primitive.c
@@ -1,1521 +1,1519 @@
 /*
- * Copyright 2004-2022 the Pacemaker project contributors
+ * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <stdbool.h>
 
 #include <crm/msg_xml.h>
 #include <pacemaker-internal.h>
 
 #include "libpacemaker_private.h"
 
 static void stop_resource(pe_resource_t *rsc, pe_node_t *node, bool optional);
 static void start_resource(pe_resource_t *rsc, pe_node_t *node, bool optional);
 static void demote_resource(pe_resource_t *rsc, pe_node_t *node, bool optional);
 static void promote_resource(pe_resource_t *rsc, pe_node_t *node,
                              bool optional);
 static void assert_role_error(pe_resource_t *rsc, pe_node_t *node,
                               bool optional);
 
 static enum rsc_role_e rsc_state_matrix[RSC_ROLE_MAX][RSC_ROLE_MAX] = {
     /* This array lists the immediate next role when transitioning from one role
      * to a target role. For example, when going from Stopped to Promoted, the
      * next role is Unpromoted, because the resource must be started before it
      * can be promoted. The current state then becomes Started, which is fed
      * into this array again, giving a next role of Promoted.
      *
      * Current role       Immediate next role   Final target role
      * ------------       -------------------   -----------------
      */
     /* Unknown */       { RSC_ROLE_UNKNOWN,     /* Unknown */
                           RSC_ROLE_STOPPED,     /* Stopped */
                           RSC_ROLE_STOPPED,     /* Started */
                           RSC_ROLE_STOPPED,     /* Unpromoted */
                           RSC_ROLE_STOPPED,     /* Promoted */
                         },
     /* Stopped */       { RSC_ROLE_STOPPED,     /* Unknown */
                           RSC_ROLE_STOPPED,     /* Stopped */
                           RSC_ROLE_STARTED,     /* Started */
                           RSC_ROLE_UNPROMOTED,  /* Unpromoted */
                           RSC_ROLE_UNPROMOTED,  /* Promoted */
                         },
     /* Started */       { RSC_ROLE_STOPPED,     /* Unknown */
                           RSC_ROLE_STOPPED,     /* Stopped */
                           RSC_ROLE_STARTED,     /* Started */
                           RSC_ROLE_UNPROMOTED,  /* Unpromoted */
                           RSC_ROLE_PROMOTED,    /* Promoted */
                         },
     /* Unpromoted */    { RSC_ROLE_STOPPED,     /* Unknown */
                           RSC_ROLE_STOPPED,     /* Stopped */
                           RSC_ROLE_STOPPED,     /* Started */
                           RSC_ROLE_UNPROMOTED,  /* Unpromoted */
                           RSC_ROLE_PROMOTED,    /* Promoted */
                         },
     /* Promoted  */     { RSC_ROLE_STOPPED,     /* Unknown */
                           RSC_ROLE_UNPROMOTED,  /* Stopped */
                           RSC_ROLE_UNPROMOTED,  /* Started */
                           RSC_ROLE_UNPROMOTED,  /* Unpromoted */
                           RSC_ROLE_PROMOTED,    /* Promoted */
                         },
 };
 
 /*!
  * \internal
  * \brief Function to schedule actions needed for a role change
  *
  * \param[in,out] rsc       Resource whose role is changing
  * \param[in]     node      Node where resource will be in its next role
  * \param[in]     optional  Whether scheduled actions should be optional
  */
 typedef void (*rsc_transition_fn)(pe_resource_t *rsc, pe_node_t *node,
                                   bool optional);
 
 static rsc_transition_fn rsc_action_matrix[RSC_ROLE_MAX][RSC_ROLE_MAX] = {
     /* This array lists the function needed to transition directly from one role
      * to another. NULL indicates that nothing is needed.
      *
      * Current role         Transition function             Next role
      * ------------         -------------------             ----------
      */
     /* Unknown */       {   assert_role_error,              /* Unknown */
                             stop_resource,                  /* Stopped */
                             assert_role_error,              /* Started */
                             assert_role_error,              /* Unpromoted */
                             assert_role_error,              /* Promoted */
                         },
     /* Stopped */       {   assert_role_error,              /* Unknown */
                             NULL,                           /* Stopped */
                             start_resource,                 /* Started */
                             start_resource,                 /* Unpromoted */
                             assert_role_error,              /* Promoted */
                         },
     /* Started */       {   assert_role_error,              /* Unknown */
                             stop_resource,                  /* Stopped */
                             NULL,                           /* Started */
                             NULL,                           /* Unpromoted */
                             promote_resource,               /* Promoted */
                         },
     /* Unpromoted */    {   assert_role_error,              /* Unknown */
                             stop_resource,                  /* Stopped */
                             stop_resource,                  /* Started */
                             NULL,                           /* Unpromoted */
                             promote_resource,               /* Promoted */
                         },
     /* Promoted  */     {   assert_role_error,              /* Unknown */
                             demote_resource,                /* Stopped */
                             demote_resource,                /* Started */
                             demote_resource,                /* Unpromoted */
                             NULL,                           /* Promoted */
                         },
 };
 
 /*!
  * \internal
  * \brief Get a list of a resource's allowed nodes sorted by node weight
  *
  * \param[in] rsc  Resource to check
  *
  * \return List of allowed nodes sorted by node weight
  */
 static GList *
 sorted_allowed_nodes(const pe_resource_t *rsc)
 {
     if (rsc->allowed_nodes != NULL) {
         GList *nodes = g_hash_table_get_values(rsc->allowed_nodes);
 
         if (nodes != NULL) {
             return pcmk__sort_nodes(nodes, pe__current_node(rsc));
         }
     }
     return NULL;
 }
 
 /*!
  * \internal
  * \brief Assign a resource to its best allowed node, if possible
  *
  * \param[in,out] rsc     Resource to choose a node for
  * \param[in]     prefer  If not NULL, prefer this node when all else equal
  *
  * \return true if \p rsc could be assigned to a node, otherwise false
  */
 static bool
 assign_best_node(pe_resource_t *rsc, const pe_node_t *prefer)
 {
     GList *nodes = NULL;
     pe_node_t *chosen = NULL;
     pe_node_t *best = NULL;
     bool result = false;
     const pe_node_t *most_free_node = pcmk__ban_insufficient_capacity(rsc);
 
     if (prefer == NULL) {
         prefer = most_free_node;
     }
 
     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
         // We've already finished assignment of resources to nodes
         return rsc->allocated_to != NULL;
     }
 
     // Sort allowed nodes by weight
     nodes = sorted_allowed_nodes(rsc);
     if (nodes != NULL) {
         best = (pe_node_t *) nodes->data; // First node has best score
     }
 
     if ((prefer != NULL) && (nodes != NULL)) {
         // Get the allowed node version of prefer
         chosen = g_hash_table_lookup(rsc->allowed_nodes, prefer->details->id);
 
         if (chosen == NULL) {
             pe_rsc_trace(rsc, "Preferred node %s for %s was unknown",
                          pe__node_name(prefer), rsc->id);
 
         /* Favor the preferred node as long as its weight is at least as good as
          * the best allowed node's.
          *
          * An alternative would be to favor the preferred node even if the best
          * node is better, when the best node's weight is less than INFINITY.
          */
         } else if (chosen->weight < best->weight) {
             pe_rsc_trace(rsc, "Preferred node %s for %s was unsuitable",
                          pe__node_name(chosen), rsc->id);
             chosen = NULL;
 
         } else if (!pcmk__node_available(chosen, true, false)) {
             pe_rsc_trace(rsc, "Preferred node %s for %s was unavailable",
                          pe__node_name(chosen), rsc->id);
             chosen = NULL;
 
         } else {
             pe_rsc_trace(rsc,
                          "Chose preferred node %s for %s (ignoring %d candidates)",
                          pe__node_name(chosen), rsc->id, g_list_length(nodes));
         }
     }
 
     if ((chosen == NULL) && (best != NULL)) {
         /* Either there is no preferred node, or the preferred node is not
          * suitable, but another node is allowed to run the resource.
          */
 
         chosen = best;
 
         if (!pe_rsc_is_unique_clone(rsc->parent)
             && (chosen->weight > 0) // Zero not acceptable
             && pcmk__node_available(chosen, false, false)) {
             /* If the resource is already running on a node, prefer that node if
              * it is just as good as the chosen node.
              *
              * We don't do this for unique clone instances, because
              * distribute_children() has already assigned instances to their
              * running nodes when appropriate, and if we get here, we don't want
              * remaining unassigned instances to prefer a node that's already
              * running another instance.
              */
             pe_node_t *running = pe__current_node(rsc);
 
             if (running == NULL) {
                 // Nothing to do
 
             } else if (!pcmk__node_available(running, true, false)) {
                 pe_rsc_trace(rsc, "Current node for %s (%s) can't run resources",
                              rsc->id, pe__node_name(running));
 
             } else {
                 int nodes_with_best_score = 1;
 
                 for (GList *iter = nodes->next; iter; iter = iter->next) {
                     pe_node_t *allowed = (pe_node_t *) iter->data;
 
                     if (allowed->weight != chosen->weight) {
                         // The nodes are sorted by weight, so no more are equal
                         break;
                     }
                     if (pe__same_node(allowed, running)) {
                         // Scores are equal, so prefer the current node
                         chosen = allowed;
                     }
                     nodes_with_best_score++;
                 }
 
                 if (nodes_with_best_score > 1) {
                     do_crm_log(((chosen->weight >= INFINITY)? LOG_WARNING : LOG_INFO),
                                "Chose %s for %s from %d nodes with score %s",
                                pe__node_name(chosen), rsc->id,
                                nodes_with_best_score,
                                pcmk_readable_score(chosen->weight));
                 }
             }
         }
 
         pe_rsc_trace(rsc, "Chose %s for %s from %d candidates",
                      pe__node_name(chosen), rsc->id, g_list_length(nodes));
     }
 
     result = pcmk__finalize_assignment(rsc, chosen, false);
     g_list_free(nodes);
     return result;
 }
 
 /*!
  * \internal
  * \brief Apply a "this with" colocation to a node's allowed node scores
  *
  * \param[in,out] data       Colocation to apply
  * \param[in,out] user_data  Resource being assigned
  */
 static void
 apply_this_with(gpointer data, gpointer user_data)
 {
     pcmk__colocation_t *colocation = (pcmk__colocation_t *) data;
     pe_resource_t *rsc = (pe_resource_t *) user_data;
 
     GHashTable *archive = NULL;
     pe_resource_t *other = colocation->primary;
 
     // In certain cases, we will need to revert the node scores
     if ((colocation->dependent_role >= RSC_ROLE_PROMOTED)
         || ((colocation->score < 0) && (colocation->score > -INFINITY))) {
         archive = pcmk__copy_node_table(rsc->allowed_nodes);
     }
 
     pe_rsc_trace(rsc,
                  "%s: Assigning colocation %s primary %s first"
                  "(score=%d role=%s)",
                  rsc->id, colocation->id, other->id,
                  colocation->score, role2text(colocation->dependent_role));
     other->cmds->assign(other, NULL);
 
     // Apply the colocation score to this resource's allowed node scores
     rsc->cmds->apply_coloc_score(rsc, other, colocation, true);
     if ((archive != NULL)
         && !pcmk__any_node_available(rsc->allowed_nodes)) {
         pe_rsc_info(rsc,
                     "%s: Reverting scores from colocation with %s "
                     "because no nodes allowed",
                     rsc->id, other->id);
         g_hash_table_destroy(rsc->allowed_nodes);
         rsc->allowed_nodes = archive;
         archive = NULL;
     }
     if (archive != NULL) {
         g_hash_table_destroy(archive);
     }
 }
 
 /*!
  * \internal
  * \brief Apply a "with this" colocation to a node's allowed node scores
  *
  * \param[in,out] data       Colocation to apply
  * \param[in,out] user_data  Resource being assigned
  */
 static void
 apply_with_this(void *data, void *user_data)
 {
     pcmk__colocation_t *colocation = (pcmk__colocation_t *) data;
     pe_resource_t *rsc = (pe_resource_t *) user_data;
 
     pe_resource_t *other = colocation->dependent;
     const float factor = colocation->score / (float) INFINITY;
 
     if (!pcmk__colocation_has_influence(colocation, NULL)) {
         return;
     }
     pe_rsc_trace(rsc,
                  "%s: Incorporating attenuated %s assignment scores due "
                  "to colocation %s", rsc->id, other->id, colocation->id);
     pcmk__add_colocated_node_scores(other, rsc->id, &rsc->allowed_nodes,
                                     colocation->node_attribute, factor,
                                     pcmk__coloc_select_active);
 }
 
 /*!
  * \internal
  * \brief Update a Pacemaker Remote node once its connection has been assigned
  *
  * \param[in] connection  Connection resource that has been assigned
  */
 static void
 remote_connection_assigned(const pe_resource_t *connection)
 {
     pe_node_t *remote_node = pe_find_node(connection->cluster->nodes,
                                           connection->id);
 
     CRM_CHECK(remote_node != NULL, return);
 
     if ((connection->allocated_to != NULL)
         && (connection->next_role != RSC_ROLE_STOPPED)) {
 
         crm_trace("Pacemaker Remote node %s will be online",
                   remote_node->details->id);
         remote_node->details->online = TRUE;
         if (remote_node->details->unseen) {
             // Avoid unnecessary fence, since we will attempt connection
             remote_node->details->unclean = FALSE;
         }
 
     } else {
         crm_trace("Pacemaker Remote node %s will be shut down "
                   "(%sassigned connection's next role is %s)",
                   remote_node->details->id,
                   ((connection->allocated_to == NULL)? "un" : ""),
                   role2text(connection->next_role));
         remote_node->details->shutdown = TRUE;
     }
 }
 
 /*!
  * \internal
  * \brief Assign a primitive resource to a node
  *
  * \param[in,out] rsc     Resource to assign to a node
  * \param[in]     prefer  Node to prefer, if all else is equal
  *
  * \return Node that \p rsc is assigned to, if assigned entirely to one node
  */
 pe_node_t *
 pcmk__primitive_assign(pe_resource_t *rsc, const pe_node_t *prefer)
 {
     CRM_ASSERT(rsc != NULL);
 
     // Never assign a child without parent being assigned first
     if ((rsc->parent != NULL)
         && !pcmk_is_set(rsc->parent->flags, pe_rsc_allocating)) {
         pe_rsc_debug(rsc, "%s: Assigning parent %s first",
                      rsc->id, rsc->parent->id);
         rsc->parent->cmds->assign(rsc->parent, prefer);
     }
 
     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
         return rsc->allocated_to; // Assignment has already been done
     }
 
     // Ensure we detect assignment loops
     if (pcmk_is_set(rsc->flags, pe_rsc_allocating)) {
         pe_rsc_debug(rsc, "Breaking assignment loop involving %s", rsc->id);
         return NULL;
     }
     pe__set_resource_flags(rsc, pe_rsc_allocating);
 
     pe__show_node_weights(true, rsc, "Pre-assignment", rsc->allowed_nodes,
                           rsc->cluster);
 
     g_list_foreach(rsc->rsc_cons, apply_this_with, rsc);
     pe__show_node_weights(true, rsc, "Post-this-with", rsc->allowed_nodes,
                           rsc->cluster);
 
     g_list_foreach(rsc->rsc_cons_lhs, apply_with_this, rsc);
 
     if (rsc->next_role == RSC_ROLE_STOPPED) {
         pe_rsc_trace(rsc,
                      "Banning %s from all nodes because it will be stopped",
                      rsc->id);
         resource_location(rsc, NULL, -INFINITY, XML_RSC_ATTR_TARGET_ROLE,
                           rsc->cluster);
 
     } else if ((rsc->next_role > rsc->role)
                && !pcmk_is_set(rsc->cluster->flags, pe_flag_have_quorum)
                && (rsc->cluster->no_quorum_policy == no_quorum_freeze)) {
         crm_notice("Resource %s cannot be elevated from %s to %s due to "
                    "no-quorum-policy=freeze",
                    rsc->id, role2text(rsc->role), role2text(rsc->next_role));
         pe__set_next_role(rsc, rsc->role, "no-quorum-policy=freeze");
     }
 
     pe__show_node_weights(!pcmk_is_set(rsc->cluster->flags, pe_flag_show_scores),
                           rsc, __func__, rsc->allowed_nodes, rsc->cluster);
 
     // Unmanage resource if fencing is enabled but no device is configured
     if (pcmk_is_set(rsc->cluster->flags, pe_flag_stonith_enabled)
         && !pcmk_is_set(rsc->cluster->flags, pe_flag_have_stonith_resource)) {
         pe__clear_resource_flags(rsc, pe_rsc_managed);
     }
 
     if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
         // Unmanaged resources stay on their current node
         const char *reason = NULL;
         pe_node_t *assign_to = NULL;
 
         pe__set_next_role(rsc, rsc->role, "unmanaged");
         assign_to = pe__current_node(rsc);
         if (assign_to == NULL) {
             reason = "inactive";
         } else if (rsc->role == RSC_ROLE_PROMOTED) {
             reason = "promoted";
         } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
             reason = "failed";
         } else {
             reason = "active";
         }
         pe_rsc_info(rsc, "Unmanaged resource %s assigned to %s: %s", rsc->id,
                     (assign_to? assign_to->details->uname : "no node"), reason);
         pcmk__finalize_assignment(rsc, assign_to, true);
 
     } else if (pcmk_is_set(rsc->cluster->flags, pe_flag_stop_everything)) {
         pe_rsc_debug(rsc, "Forcing %s to stop: stop-all-resources", rsc->id);
         pcmk__finalize_assignment(rsc, NULL, true);
 
     } else if (pcmk_is_set(rsc->flags, pe_rsc_provisional)
                && assign_best_node(rsc, prefer)) {
         // Assignment successful
 
     } else if (rsc->allocated_to == NULL) {
         if (!pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
             pe_rsc_info(rsc, "Resource %s cannot run anywhere", rsc->id);
         } else if (rsc->running_on != NULL) {
             pe_rsc_info(rsc, "Stopping orphan resource %s", rsc->id);
         }
 
     } else {
         pe_rsc_debug(rsc, "%s: pre-assigned to %s", rsc->id,
                      pe__node_name(rsc->allocated_to));
     }
 
     pe__clear_resource_flags(rsc, pe_rsc_allocating);
 
     if (rsc->is_remote_node) {
         remote_connection_assigned(rsc);
     }
 
     return rsc->allocated_to;
 }
 
 /*!
  * \internal
  * \brief Schedule actions to bring resource down and back to current role
  *
  * \param[in,out] rsc           Resource to restart
  * \param[in]     current       Node that resource should be brought down on
  * \param[in]     need_stop     Whether the resource must be stopped
  * \param[in]     need_promote  Whether the resource must be promoted
  *
  * \return Role that resource would have after scheduled actions are taken
  */
 static void
 schedule_restart_actions(pe_resource_t *rsc, pe_node_t *current,
                          bool need_stop, bool need_promote)
 {
     enum rsc_role_e role = rsc->role;
     enum rsc_role_e next_role;
     rsc_transition_fn fn = NULL;
 
     pe__set_resource_flags(rsc, pe_rsc_restarting);
 
     // Bring resource down to a stop on its current node
     while (role != RSC_ROLE_STOPPED) {
         next_role = rsc_state_matrix[role][RSC_ROLE_STOPPED];
         pe_rsc_trace(rsc, "Creating %s action to take %s down from %s to %s",
                      (need_stop? "required" : "optional"), rsc->id,
                      role2text(role), role2text(next_role));
         fn = rsc_action_matrix[role][next_role];
         if (fn == NULL) {
             break;
         }
         fn(rsc, current, !need_stop);
         role = next_role;
     }
 
     // Bring resource up to its next role on its next node
     while ((rsc->role <= rsc->next_role) && (role != rsc->role)
            && !pcmk_is_set(rsc->flags, pe_rsc_block)) {
         bool required = need_stop;
 
         next_role = rsc_state_matrix[role][rsc->role];
         if ((next_role == RSC_ROLE_PROMOTED) && need_promote) {
             required = true;
         }
         pe_rsc_trace(rsc, "Creating %s action to take %s up from %s to %s",
                      (required? "required" : "optional"), rsc->id,
                      role2text(role), role2text(next_role));
         fn = rsc_action_matrix[role][next_role];
         if (fn == NULL) {
             break;
         }
         fn(rsc, rsc->allocated_to, !required);
         role = next_role;
     }
 
     pe__clear_resource_flags(rsc, pe_rsc_restarting);
 }
 
 /*!
  * \internal
  * \brief If a resource's next role is not explicitly specified, set a default
  *
  * \param[in,out] rsc  Resource to set next role for
  *
  * \return "explicit" if next role was explicitly set, otherwise "implicit"
  */
 static const char *
 set_default_next_role(pe_resource_t *rsc)
 {
     if (rsc->next_role != RSC_ROLE_UNKNOWN) {
         return "explicit";
     }
 
     if (rsc->allocated_to == NULL) {
         pe__set_next_role(rsc, RSC_ROLE_STOPPED, "assignment");
     } else {
         pe__set_next_role(rsc, RSC_ROLE_STARTED, "assignment");
     }
     return "implicit";
 }
 
 /*!
  * \internal
  * \brief Create an action to represent an already pending start
  *
  * \param[in,out] rsc  Resource to create start action for
  */
 static void
 create_pending_start(pe_resource_t *rsc)
 {
     pe_action_t *start = NULL;
 
     pe_rsc_trace(rsc,
                  "Creating action for %s to represent already pending start",
                  rsc->id);
     start = start_action(rsc, rsc->allocated_to, TRUE);
     pe__set_action_flags(start, pe_action_print_always);
 }
 
 /*!
  * \internal
  * \brief Schedule actions needed to take a resource to its next role
  *
  * \param[in,out] rsc  Resource to schedule actions for
  */
 static void
 schedule_role_transition_actions(pe_resource_t *rsc)
 {
     enum rsc_role_e role = rsc->role;
 
     while (role != rsc->next_role) {
         enum rsc_role_e next_role = rsc_state_matrix[role][rsc->next_role];
         rsc_transition_fn fn = NULL;
 
         pe_rsc_trace(rsc,
                      "Creating action to take %s from %s to %s (ending at %s)",
                      rsc->id, role2text(role), role2text(next_role),
                      role2text(rsc->next_role));
         fn = rsc_action_matrix[role][next_role];
         if (fn == NULL) {
             break;
         }
         fn(rsc, rsc->allocated_to, false);
         role = next_role;
     }
 }
 
 /*!
  * \internal
  * \brief Create all actions needed for a given primitive resource
  *
  * \param[in,out] rsc  Primitive resource to create actions for
  */
 void
 pcmk__primitive_create_actions(pe_resource_t *rsc)
 {
     bool need_stop = false;
     bool need_promote = false;
     bool is_moving = false;
     bool allow_migrate = false;
     bool multiply_active = false;
 
     pe_node_t *current = NULL;
     unsigned int num_all_active = 0;
     unsigned int num_clean_active = 0;
     const char *next_role_source = NULL;
 
     CRM_ASSERT(rsc != NULL);
 
     next_role_source = set_default_next_role(rsc);
     pe_rsc_trace(rsc,
                  "Creating all actions for %s transition from %s to %s "
                  "(%s) on %s",
                  rsc->id, role2text(rsc->role), role2text(rsc->next_role),
                  next_role_source, pe__node_name(rsc->allocated_to));
 
     current = pe__find_active_on(rsc, &num_all_active, &num_clean_active);
 
     g_list_foreach(rsc->dangling_migrations, pcmk__abort_dangling_migration,
                    rsc);
 
     if ((current != NULL) && (rsc->allocated_to != NULL)
         && (current->details != rsc->allocated_to->details)
         && (rsc->next_role >= RSC_ROLE_STARTED)) {
 
         pe_rsc_trace(rsc, "Moving %s from %s to %s",
                      rsc->id, pe__node_name(current),
                      pe__node_name(rsc->allocated_to));
         is_moving = true;
         allow_migrate = pcmk__rsc_can_migrate(rsc, current);
 
         // This is needed even if migrating (though I'm not sure why ...)
         need_stop = true;
     }
 
     // Check whether resource is partially migrated and/or multiply active
     if ((rsc->partial_migration_source != NULL)
         && (rsc->partial_migration_target != NULL)
         && allow_migrate && (num_all_active == 2)
         && pe__same_node(current, rsc->partial_migration_source)
         && pe__same_node(rsc->allocated_to, rsc->partial_migration_target)) {
         /* A partial migration is in progress, and the migration target remains
          * the same as when the migration began.
          */
         pe_rsc_trace(rsc, "Partial migration of %s from %s to %s will continue",
                      rsc->id, pe__node_name(rsc->partial_migration_source),
                      pe__node_name(rsc->partial_migration_target));
 
     } else if ((rsc->partial_migration_source != NULL)
                || (rsc->partial_migration_target != NULL)) {
         // A partial migration is in progress but can't be continued
 
         if (num_all_active > 2) {
             // The resource is migrating *and* multiply active!
             crm_notice("Forcing recovery of %s because it is migrating "
                        "from %s to %s and possibly active elsewhere",
                        rsc->id, pe__node_name(rsc->partial_migration_source),
                        pe__node_name(rsc->partial_migration_target));
         } else {
             // The migration source or target isn't available
             crm_notice("Forcing recovery of %s because it can no longer "
                        "migrate from %s to %s",
                        rsc->id, pe__node_name(rsc->partial_migration_source),
                        pe__node_name(rsc->partial_migration_target));
         }
         need_stop = true;
         rsc->partial_migration_source = rsc->partial_migration_target = NULL;
         allow_migrate = false;
 
     } else if (pcmk_is_set(rsc->flags, pe_rsc_needs_fencing)) {
         multiply_active = (num_all_active > 1);
     } else {
         /* If a resource has "requires" set to nothing or quorum, don't consider
          * it active on unclean nodes (similar to how all resources behave when
          * stonith-enabled is false). We can start such resources elsewhere
          * before fencing completes, and if we considered the resource active on
          * the failed node, we would attempt recovery for being active on
          * multiple nodes.
          */
         multiply_active = (num_clean_active > 1);
     }
 
     if (multiply_active) {
         const char *class = crm_element_value(rsc->xml, XML_AGENT_ATTR_CLASS);
 
         // Resource was (possibly) incorrectly multiply active
         pe_proc_err("%s resource %s might be active on %u nodes (%s)",
                     pcmk__s(class, "Untyped"), rsc->id, num_all_active,
                     recovery2text(rsc->recovery_type));
         crm_notice("See https://wiki.clusterlabs.org/wiki/FAQ"
                    "#Resource_is_Too_Active for more information");
 
         switch (rsc->recovery_type) {
             case recovery_stop_start:
                 need_stop = true;
                 break;
             case recovery_stop_unexpected:
                 need_stop = true; // stop_resource() will skip expected node
                 pe__set_resource_flags(rsc, pe_rsc_stop_unexpected);
                 break;
             default:
                 break;
         }
 
     } else {
         pe__clear_resource_flags(rsc, pe_rsc_stop_unexpected);
     }
 
     if (pcmk_is_set(rsc->flags, pe_rsc_start_pending)) {
         create_pending_start(rsc);
     }
 
     if (is_moving) {
         // Remaining tests are only for resources staying where they are
 
     } else if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
         if (pcmk_is_set(rsc->flags, pe_rsc_stop)) {
             need_stop = true;
             pe_rsc_trace(rsc, "Recovering %s", rsc->id);
         } else {
             pe_rsc_trace(rsc, "Recovering %s by demotion", rsc->id);
             if (rsc->next_role == RSC_ROLE_PROMOTED) {
                 need_promote = true;
             }
         }
 
     } else if (pcmk_is_set(rsc->flags, pe_rsc_block)) {
         pe_rsc_trace(rsc, "Blocking further actions on %s", rsc->id);
         need_stop = true;
 
     } else if ((rsc->role > RSC_ROLE_STARTED) && (current != NULL)
                && (rsc->allocated_to != NULL)) {
         pe_action_t *start = NULL;
 
         pe_rsc_trace(rsc, "Creating start action for promoted resource %s",
                      rsc->id);
         start = start_action(rsc, rsc->allocated_to, TRUE);
         if (!pcmk_is_set(start->flags, pe_action_optional)) {
             // Recovery of a promoted resource
             pe_rsc_trace(rsc, "%s restart is required for recovery", rsc->id);
             need_stop = true;
         }
     }
 
     // Create any actions needed to bring resource down and back up to same role
     schedule_restart_actions(rsc, current, need_stop, need_promote);
 
     // Create any actions needed to take resource from this role to the next
     schedule_role_transition_actions(rsc);
 
     pcmk__create_recurring_actions(rsc);
 
     if (allow_migrate) {
         pcmk__create_migration_actions(rsc, current);
     }
 }
 
 /*!
  * \internal
  * \brief Ban a resource from any allowed nodes that are Pacemaker Remote nodes
  *
  * \param[in] rsc  Resource to check
  */
 static void
 rsc_avoids_remote_nodes(const pe_resource_t *rsc)
 {
     GHashTableIter iter;
     pe_node_t *node = NULL;
 
     g_hash_table_iter_init(&iter, rsc->allowed_nodes);
     while (g_hash_table_iter_next(&iter, NULL, (void **) &node)) {
         if (node->details->remote_rsc != NULL) {
             node->weight = -INFINITY;
         }
     }
 }
 
 /*!
  * \internal
  * \brief Return allowed nodes as (possibly sorted) list
  *
  * Convert a resource's hash table of allowed nodes to a list. If printing to
  * stdout, sort the list, to keep action ID numbers consistent for regression
  * test output (while avoiding the performance hit on a live cluster).
  *
  * \param[in] rsc       Resource to check for allowed nodes
  *
  * \return List of resource's allowed nodes
  * \note Callers should take care not to rely on the list being sorted.
  */
 static GList *
 allowed_nodes_as_list(const pe_resource_t *rsc)
 {
     GList *allowed_nodes = NULL;
 
     if (rsc->allowed_nodes) {
         allowed_nodes = g_hash_table_get_values(rsc->allowed_nodes);
     }
 
     if (!pcmk__is_daemon) {
         allowed_nodes = g_list_sort(allowed_nodes, pe__cmp_node_name);
     }
 
     return allowed_nodes;
 }
 
 /*!
  * \internal
  * \brief Create implicit constraints needed for a primitive resource
  *
  * \param[in,out] rsc  Primitive resource to create implicit constraints for
  */
 void
 pcmk__primitive_internal_constraints(pe_resource_t *rsc)
 {
-    pe_resource_t *top = NULL;
     GList *allowed_nodes = NULL;
     bool check_unfencing = false;
     bool check_utilization = false;
 
     CRM_ASSERT(rsc != NULL);
 
     if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
         pe_rsc_trace(rsc,
                      "Skipping implicit constraints for unmanaged resource %s",
                      rsc->id);
         return;
     }
 
-    top = uber_parent(rsc);
-
     // Whether resource requires unfencing
     check_unfencing = !pcmk_is_set(rsc->flags, pe_rsc_fence_device)
                       && pcmk_is_set(rsc->cluster->flags, pe_flag_enable_unfencing)
                       && pcmk_is_set(rsc->flags, pe_rsc_needs_unfencing);
 
     // Whether a non-default placement strategy is used
     check_utilization = (g_hash_table_size(rsc->utilization) > 0)
                          && !pcmk__str_eq(rsc->cluster->placement_strategy,
                                           "default", pcmk__str_casei);
 
     // Order stops before starts (i.e. restart)
     pcmk__new_ordering(rsc, pcmk__op_key(rsc->id, RSC_STOP, 0), NULL,
                        rsc, pcmk__op_key(rsc->id, RSC_START, 0), NULL,
                        pe_order_optional|pe_order_implies_then|pe_order_restart,
                        rsc->cluster);
 
     // Promotable ordering: demote before stop, start before promote
-    if (pcmk_is_set(top->flags, pe_rsc_promotable)
+    if (pcmk_is_set(pe__const_top_resource(rsc, false)->flags,
+                    pe_rsc_promotable)
         || (rsc->role > RSC_ROLE_UNPROMOTED)) {
 
         pcmk__new_ordering(rsc, pcmk__op_key(rsc->id, RSC_DEMOTE, 0), NULL,
                            rsc, pcmk__op_key(rsc->id, RSC_STOP, 0), NULL,
                            pe_order_promoted_implies_first, rsc->cluster);
 
         pcmk__new_ordering(rsc, pcmk__op_key(rsc->id, RSC_START, 0), NULL,
                            rsc, pcmk__op_key(rsc->id, RSC_PROMOTE, 0), NULL,
                            pe_order_runnable_left, rsc->cluster);
     }
 
     // Don't clear resource history if probing on same node
     pcmk__new_ordering(rsc, pcmk__op_key(rsc->id, CRM_OP_LRM_DELETE, 0),
                        NULL, rsc, pcmk__op_key(rsc->id, RSC_STATUS, 0),
                        NULL, pe_order_same_node|pe_order_then_cancels_first,
                        rsc->cluster);
 
     // Certain checks need allowed nodes
     if (check_unfencing || check_utilization || (rsc->container != NULL)) {
         allowed_nodes = allowed_nodes_as_list(rsc);
     }
 
     if (check_unfencing) {
         g_list_foreach(allowed_nodes, pcmk__order_restart_vs_unfence, rsc);
     }
 
     if (check_utilization) {
         pcmk__create_utilization_constraints(rsc, allowed_nodes);
     }
 
     if (rsc->container != NULL) {
         pe_resource_t *remote_rsc = NULL;
 
         if (rsc->is_remote_node) {
             // rsc is the implicit remote connection for a guest or bundle node
 
             /* Guest resources are not allowed to run on Pacemaker Remote nodes,
              * to avoid nesting remotes. However, bundles are allowed.
              */
             if (!pcmk_is_set(rsc->flags, pe_rsc_allow_remote_remotes)) {
                 rsc_avoids_remote_nodes(rsc->container);
             }
 
             /* If someone cleans up a guest or bundle node's container, we will
              * likely schedule a (re-)probe of the container and recovery of the
              * connection. Order the connection stop after the container probe,
              * so that if we detect the container running, we will trigger a new
              * transition and avoid the unnecessary recovery.
              */
             pcmk__order_resource_actions(rsc->container, RSC_STATUS, rsc,
                                          RSC_STOP, pe_order_optional);
 
         /* A user can specify that a resource must start on a Pacemaker Remote
          * node by explicitly configuring it with the container=NODENAME
          * meta-attribute. This is of questionable merit, since location
          * constraints can accomplish the same thing. But we support it, so here
          * we check whether a resource (that is not itself a remote connection)
          * has container set to a remote node or guest node resource.
          */
         } else if (rsc->container->is_remote_node) {
             remote_rsc = rsc->container;
         } else  {
             remote_rsc = pe__resource_contains_guest_node(rsc->cluster,
                                                           rsc->container);
         }
 
         if (remote_rsc != NULL) {
             /* Force the resource on the Pacemaker Remote node instead of
              * colocating the resource with the container resource.
              */
             for (GList *item = allowed_nodes; item; item = item->next) {
                 pe_node_t *node = item->data;
 
                 if (node->details->remote_rsc != remote_rsc) {
                     node->weight = -INFINITY;
                 }
             }
 
         } else {
             /* This resource is either a filler for a container that does NOT
              * represent a Pacemaker Remote node, or a Pacemaker Remote
              * connection resource for a guest node or bundle.
              */
             int score;
 
             crm_trace("Order and colocate %s relative to its container %s",
                       rsc->id, rsc->container->id);
 
             pcmk__new_ordering(rsc->container,
                                pcmk__op_key(rsc->container->id, RSC_START, 0),
                                NULL, rsc, pcmk__op_key(rsc->id, RSC_START, 0),
                                NULL,
                                pe_order_implies_then|pe_order_runnable_left,
                                rsc->cluster);
 
             pcmk__new_ordering(rsc, pcmk__op_key(rsc->id, RSC_STOP, 0), NULL,
                                rsc->container,
                                pcmk__op_key(rsc->container->id, RSC_STOP, 0),
                                NULL, pe_order_implies_first, rsc->cluster);
 
             if (pcmk_is_set(rsc->flags, pe_rsc_allow_remote_remotes)) {
                 score = 10000;    /* Highly preferred but not essential */
             } else {
                 score = INFINITY; /* Force them to run on the same host */
             }
             pcmk__new_colocation("resource-with-container", NULL, score, rsc,
                                  rsc->container, NULL, NULL, true,
                                  rsc->cluster);
         }
     }
 
     if (rsc->is_remote_node || pcmk_is_set(rsc->flags, pe_rsc_fence_device)) {
         /* Remote connections and fencing devices are not allowed to run on
          * Pacemaker Remote nodes
          */
         rsc_avoids_remote_nodes(rsc);
     }
     g_list_free(allowed_nodes);
 }
 
 /*!
  * \internal
  * \brief Apply a colocation's score to node weights or resource priority
  *
  * Given a colocation constraint, apply its score to the dependent's
  * allowed node weights (if we are still placing resources) or priority (if
  * we are choosing promotable clone instance roles).
  *
  * \param[in,out] dependent      Dependent resource in colocation
  * \param[in]     primary        Primary resource in colocation
  * \param[in]     colocation     Colocation constraint to apply
  * \param[in] for_dependent  true if called on behalf of dependent
  */
 void
 pcmk__primitive_apply_coloc_score(pe_resource_t *dependent,
                                   const pe_resource_t *primary,
                                   const pcmk__colocation_t *colocation,
                                   bool for_dependent)
 {
     enum pcmk__coloc_affects filter_results;
 
     CRM_CHECK((colocation != NULL) && (dependent != NULL) && (primary != NULL),
               return);
 
     if (for_dependent) {
         // Always process on behalf of primary resource
         primary->cmds->apply_coloc_score(dependent, primary, colocation, false);
         return;
     }
 
     filter_results = pcmk__colocation_affects(dependent, primary, colocation,
                                               false);
     pe_rsc_trace(dependent, "%s %s with %s (%s, score=%d, filter=%d)",
                  ((colocation->score > 0)? "Colocating" : "Anti-colocating"),
                  dependent->id, primary->id, colocation->id, colocation->score,
                  filter_results);
 
     switch (filter_results) {
         case pcmk__coloc_affects_role:
             pcmk__apply_coloc_to_priority(dependent, primary, colocation);
             break;
         case pcmk__coloc_affects_location:
             pcmk__apply_coloc_to_weights(dependent, primary, colocation);
             break;
         default: // pcmk__coloc_affects_nothing
             return;
     }
 }
 
 /*!
  * \internal
  * \brief Return action flags for a given primitive resource action
  *
  * \param[in,out] action  Action to get flags for
  * \param[in]     node    If not NULL, limit effects to this node (ignored)
  *
  * \return Flags appropriate to \p action on \p node
  */
 enum pe_action_flags
 pcmk__primitive_action_flags(pe_action_t *action, const pe_node_t *node)
 {
     CRM_ASSERT(action != NULL);
     return action->flags;
 }
 
 /*!
  * \internal
  * \brief Check whether a node is a multiply active resource's expected node
  *
  * \param[in] rsc  Resource to check
  * \param[in] node  Node to check
  *
  * \return true if \p rsc is multiply active with multiple-active set to
  *         stop_unexpected, and \p node is the node where it will remain active
  * \note This assumes that the resource's next role cannot be changed to stopped
  *       after this is called, which should be reasonable if status has already
  *       been unpacked and resources have been assigned to nodes.
  */
 static bool
 is_expected_node(const pe_resource_t *rsc, const pe_node_t *node)
 {
     return pcmk_all_flags_set(rsc->flags,
                               pe_rsc_stop_unexpected|pe_rsc_restarting)
            && (rsc->next_role > RSC_ROLE_STOPPED)
            && pe__same_node(rsc->allocated_to, node);
 }
 
 /*!
  * \internal
  * \brief Schedule actions needed to stop a resource wherever it is active
  *
  * \param[in,out] rsc       Resource being stopped
  * \param[in]     node      Node where resource is being stopped (ignored)
  * \param[in]     optional  Whether actions should be optional
  */
 static void
 stop_resource(pe_resource_t *rsc, pe_node_t *node, bool optional)
 {
     for (GList *iter = rsc->running_on; iter != NULL; iter = iter->next) {
         pe_node_t *current = (pe_node_t *) iter->data;
         pe_action_t *stop = NULL;
 
         if (is_expected_node(rsc, current)) {
             /* We are scheduling restart actions for a multiply active resource
              * with multiple-active=stop_unexpected, and this is where it should
              * not be stopped.
              */
             pe_rsc_trace(rsc,
                          "Skipping stop of multiply active resource %s "
                          "on expected node %s",
                          rsc->id, pe__node_name(current));
             continue;
         }
 
         if (rsc->partial_migration_target != NULL) {
             // Continue migration if node originally was and remains target
             if (pe__same_node(current, rsc->partial_migration_target)
                 && pe__same_node(current, rsc->allocated_to)) {
                 pe_rsc_trace(rsc,
                              "Skipping stop of %s on %s "
                              "because partial migration there will continue",
                              rsc->id, pe__node_name(current));
                 continue;
             } else {
                 pe_rsc_trace(rsc,
                              "Forcing stop of %s on %s "
                              "because migration target changed",
                              rsc->id, pe__node_name(current));
                 optional = false;
             }
         }
 
         pe_rsc_trace(rsc, "Scheduling stop of %s on %s",
                      rsc->id, pe__node_name(current));
         stop = stop_action(rsc, current, optional);
 
         if (rsc->allocated_to == NULL) {
             pe_action_set_reason(stop, "node availability", true);
         } else if (pcmk_all_flags_set(rsc->flags, pe_rsc_restarting
                                                   |pe_rsc_stop_unexpected)) {
             /* We are stopping a multiply active resource on a node that is
              * not its expected node, and we are still scheduling restart
              * actions, so the stop is for being multiply active.
              */
             pe_action_set_reason(stop, "being multiply active", true);
         }
 
         if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
             pe__clear_action_flags(stop, pe_action_runnable);
         }
 
         if (pcmk_is_set(rsc->cluster->flags, pe_flag_remove_after_stop)) {
             pcmk__schedule_cleanup(rsc, current, optional);
         }
 
         if (pcmk_is_set(rsc->flags, pe_rsc_needs_unfencing)) {
             pe_action_t *unfence = pe_fence_op(current, "on", true, NULL, false,
                                                rsc->cluster);
 
             order_actions(stop, unfence, pe_order_implies_first);
             if (!pcmk__node_unfenced(current)) {
                 pe_proc_err("Stopping %s until %s can be unfenced",
                             rsc->id, pe__node_name(current));
             }
         }
     }
 }
 
 /*!
  * \internal
  * \brief Schedule actions needed to start a resource on a node
  *
  * \param[in,out] rsc       Resource being started
  * \param[in]     node      Node where resource should be started
  * \param[in]     optional  Whether actions should be optional
  */
 static void
 start_resource(pe_resource_t *rsc, pe_node_t *node, bool optional)
 {
     pe_action_t *start = NULL;
 
     CRM_ASSERT(node != NULL);
 
     pe_rsc_trace(rsc, "Scheduling %s start of %s on %s (score %d)",
                  (optional? "optional" : "required"), rsc->id,
                  pe__node_name(node), node->weight);
     start = start_action(rsc, node, TRUE);
 
     pcmk__order_vs_unfence(rsc, node, start, pe_order_implies_then);
 
     if (pcmk_is_set(start->flags, pe_action_runnable) && !optional) {
         pe__clear_action_flags(start, pe_action_optional);
     }
 
     if (is_expected_node(rsc, node)) {
         /* This could be a problem if the start becomes necessary for other
          * reasons later.
          */
         pe_rsc_trace(rsc,
                      "Start of multiply active resouce %s "
                      "on expected node %s will be a pseudo-action",
                      rsc->id, pe__node_name(node));
         pe__set_action_flags(start, pe_action_pseudo);
     }
 }
 
 /*!
  * \internal
  * \brief Schedule actions needed to promote a resource on a node
  *
  * \param[in,out] rsc       Resource being promoted
  * \param[in]     node      Node where resource should be promoted
  * \param[in]     optional  Whether actions should be optional
  */
 static void
 promote_resource(pe_resource_t *rsc, pe_node_t *node, bool optional)
 {
     GList *iter = NULL;
     GList *action_list = NULL;
     bool runnable = true;
 
     CRM_ASSERT(node != NULL);
 
     // Any start must be runnable for promotion to be runnable
     action_list = pe__resource_actions(rsc, node, RSC_START, true);
     for (iter = action_list; iter != NULL; iter = iter->next) {
         pe_action_t *start = (pe_action_t *) iter->data;
 
         if (!pcmk_is_set(start->flags, pe_action_runnable)) {
             runnable = false;
         }
     }
     g_list_free(action_list);
 
     if (runnable) {
         pe_action_t *promote = promote_action(rsc, node, optional);
 
         pe_rsc_trace(rsc, "Scheduling %s promotion of %s on %s",
                      (optional? "optional" : "required"), rsc->id,
                      pe__node_name(node));
 
         if (is_expected_node(rsc, node)) {
             /* This could be a problem if the promote becomes necessary for
              * other reasons later.
              */
             pe_rsc_trace(rsc,
                          "Promotion of multiply active resouce %s "
                          "on expected node %s will be a pseudo-action",
                          rsc->id, pe__node_name(node));
             pe__set_action_flags(promote, pe_action_pseudo);
         }
     } else {
         pe_rsc_trace(rsc, "Not promoting %s on %s: start unrunnable",
                      rsc->id, pe__node_name(node));
         action_list = pe__resource_actions(rsc, node, RSC_PROMOTE, true);
         for (iter = action_list; iter != NULL; iter = iter->next) {
             pe_action_t *promote = (pe_action_t *) iter->data;
 
             pe__clear_action_flags(promote, pe_action_runnable);
         }
         g_list_free(action_list);
     }
 }
 
 /*!
  * \internal
  * \brief Schedule actions needed to demote a resource wherever it is active
  *
  * \param[in,out] rsc       Resource being demoted
  * \param[in]     node      Node where resource should be demoted (ignored)
  * \param[in]     optional  Whether actions should be optional
  */
 static void
 demote_resource(pe_resource_t *rsc, pe_node_t *node, bool optional)
 {
     /* Since this will only be called for a primitive (possibly as an instance
      * of a collective resource), the resource is multiply active if it is
      * running on more than one node, so we want to demote on all of them as
      * part of recovery, regardless of which one is the desired node.
      */
     for (GList *iter = rsc->running_on; iter != NULL; iter = iter->next) {
         pe_node_t *current = (pe_node_t *) iter->data;
 
         if (is_expected_node(rsc, current)) {
             pe_rsc_trace(rsc,
                          "Skipping demote of multiply active resource %s "
                          "on expected node %s",
                          rsc->id, pe__node_name(current));
         } else {
             pe_rsc_trace(rsc, "Scheduling %s demotion of %s on %s",
                          (optional? "optional" : "required"), rsc->id,
                          pe__node_name(current));
             demote_action(rsc, current, optional);
         }
     }
 }
 
 static void
 assert_role_error(pe_resource_t *rsc, pe_node_t *node, bool optional)
 {
     CRM_ASSERT(false);
 }
 
 /*!
  * \internal
  * \brief Schedule cleanup of a resource
  *
  * \param[in,out] rsc       Resource to clean up
  * \param[in]     node      Node to clean up on
  * \param[in]     optional  Whether clean-up should be optional
  */
 void
 pcmk__schedule_cleanup(pe_resource_t *rsc, const pe_node_t *node, bool optional)
 {
     /* If the cleanup is required, its orderings are optional, because they're
      * relevant only if both actions are required. Conversely, if the cleanup is
      * optional, the orderings make the then action required if the first action
      * becomes required.
      */
     uint32_t flag = optional? pe_order_implies_then : pe_order_optional;
 
     CRM_CHECK((rsc != NULL) && (node != NULL), return);
 
     if (pcmk_is_set(rsc->flags, pe_rsc_failed)) {
         pe_rsc_trace(rsc, "Skipping clean-up of %s on %s: resource failed",
                      rsc->id, pe__node_name(node));
         return;
     }
 
     if (node->details->unclean || !node->details->online) {
         pe_rsc_trace(rsc, "Skipping clean-up of %s on %s: node unavailable",
                      rsc->id, pe__node_name(node));
         return;
     }
 
     crm_notice("Scheduling clean-up of %s on %s", rsc->id, pe__node_name(node));
     delete_action(rsc, node, optional);
 
     // stop -> clean-up -> start
     pcmk__order_resource_actions(rsc, RSC_STOP, rsc, RSC_DELETE, flag);
     pcmk__order_resource_actions(rsc, RSC_DELETE, rsc, RSC_START, flag);
 }
 
 /*!
  * \internal
  * \brief Add primitive meta-attributes relevant to graph actions to XML
  *
  * \param[in]     rsc  Primitive resource whose meta-attributes should be added
  * \param[in,out] xml  Transition graph action attributes XML to add to
  */
 void
 pcmk__primitive_add_graph_meta(pe_resource_t *rsc, xmlNode *xml)
 {
     char *name = NULL;
     char *value = NULL;
     const pe_resource_t *parent = NULL;
 
     CRM_ASSERT((rsc != NULL) && (xml != NULL));
 
     /* Clone instance numbers get set internally as meta-attributes, and are
      * needed in the transition graph (for example, to tell unique clone
      * instances apart).
      */
     value = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_INCARNATION);
     if (value != NULL) {
         name = crm_meta_name(XML_RSC_ATTR_INCARNATION);
         crm_xml_add(xml, name, value);
         free(name);
     }
 
     // Not sure if this one is really needed ...
     value = g_hash_table_lookup(rsc->meta, XML_RSC_ATTR_REMOTE_NODE);
     if (value != NULL) {
         name = crm_meta_name(XML_RSC_ATTR_REMOTE_NODE);
         crm_xml_add(xml, name, value);
         free(name);
     }
 
     /* The container meta-attribute can be set on the primitive itself or one of
      * its parents (for example, a group inside a container resource), so check
      * them all, and keep the highest one found.
      */
     for (parent = rsc; parent != NULL; parent = parent->parent) {
         if (parent->container != NULL) {
             crm_xml_add(xml, CRM_META "_" XML_RSC_ATTR_CONTAINER,
                         parent->container->id);
         }
     }
 
     /* Bundle replica children will get their external-ip set internally as a
      * meta-attribute. The graph action needs it, but under a different naming
      * convention than other meta-attributes.
      */
     value = g_hash_table_lookup(rsc->meta, "external-ip");
     if (value != NULL) {
         crm_xml_add(xml, "pcmk_external_ip", value);
     }
 }
 
 // Primitive implementation of resource_alloc_functions_t:add_utilization()
 void
 pcmk__primitive_add_utilization(const pe_resource_t *rsc,
                                 const pe_resource_t *orig_rsc, GList *all_rscs,
                                 GHashTable *utilization)
 {
     if (!pcmk_is_set(rsc->flags, pe_rsc_provisional)) {
         return;
     }
 
     pe_rsc_trace(orig_rsc, "%s: Adding primitive %s as colocated utilization",
                  orig_rsc->id, rsc->id);
     pcmk__release_node_capacity(utilization, rsc);
 }
 
 /*!
  * \internal
  * \brief Get epoch time of node's shutdown attribute (or now if none)
  *
  * \param[in] node      Node to check
  * \param[in] data_set  Cluster working set
  *
  * \return Epoch time corresponding to shutdown attribute if set or now if not
  */
 static time_t
 shutdown_time(const pe_node_t *node)
 {
     const char *shutdown = pe_node_attribute_raw(node, XML_CIB_ATTR_SHUTDOWN);
     time_t result = 0;
 
     if (shutdown != NULL) {
         long long result_ll;
 
         if (pcmk__scan_ll(shutdown, &result_ll, 0LL) == pcmk_rc_ok) {
             result = (time_t) result_ll;
         }
     }
     return (result == 0)? get_effective_time(node->details->data_set) : result;
 }
 
 /*!
  * \internal
  * \brief Ban a resource from a node if it's not locked to the node
  *
  * \param[in] data  Node to check
  * \param[in] user_data  Resource to check
  */
 static void
 ban_if_not_locked(gpointer data, gpointer user_data)
 {
     pe_node_t *node = (pe_node_t *) data;
     pe_resource_t *rsc = (pe_resource_t *) user_data;
 
     if (strcmp(node->details->uname, rsc->lock_node->details->uname) != 0) {
         resource_location(rsc, node, -CRM_SCORE_INFINITY,
                           XML_CONFIG_ATTR_SHUTDOWN_LOCK, rsc->cluster);
     }
 }
 
 // Primitive implementation of resource_alloc_functions_t:shutdown_lock()
 void
 pcmk__primitive_shutdown_lock(pe_resource_t *rsc)
 {
     const char *class = crm_element_value(rsc->xml, XML_AGENT_ATTR_CLASS);
 
     // Fence devices and remote connections can't be locked
     if (pcmk__str_eq(class, PCMK_RESOURCE_CLASS_STONITH, pcmk__str_null_matches)
         || pe__resource_is_remote_conn(rsc, rsc->cluster)) {
         return;
     }
 
     if (rsc->lock_node != NULL) {
         // The lock was obtained from resource history
 
         if (rsc->running_on != NULL) {
             /* The resource was started elsewhere even though it is now
              * considered locked. This shouldn't be possible, but as a
              * failsafe, we don't want to disturb the resource now.
              */
             pe_rsc_info(rsc,
                         "Cancelling shutdown lock because %s is already active",
                         rsc->id);
             pe__clear_resource_history(rsc, rsc->lock_node, rsc->cluster);
             rsc->lock_node = NULL;
             rsc->lock_time = 0;
         }
 
     // Only a resource active on exactly one node can be locked
     } else if (pcmk__list_of_1(rsc->running_on)) {
         pe_node_t *node = rsc->running_on->data;
 
         if (node->details->shutdown) {
             if (node->details->unclean) {
                 pe_rsc_debug(rsc, "Not locking %s to unclean %s for shutdown",
                              rsc->id, pe__node_name(node));
             } else {
                 rsc->lock_node = node;
                 rsc->lock_time = shutdown_time(node);
             }
         }
     }
 
     if (rsc->lock_node == NULL) {
         // No lock needed
         return;
     }
 
     if (rsc->cluster->shutdown_lock > 0) {
         time_t lock_expiration = rsc->lock_time + rsc->cluster->shutdown_lock;
 
         pe_rsc_info(rsc, "Locking %s to %s due to shutdown (expires @%lld)",
                     rsc->id, pe__node_name(rsc->lock_node),
                     (long long) lock_expiration);
         pe__update_recheck_time(++lock_expiration, rsc->cluster);
     } else {
         pe_rsc_info(rsc, "Locking %s to %s due to shutdown",
                     rsc->id, pe__node_name(rsc->lock_node));
     }
 
     // If resource is locked to one node, ban it from all other nodes
     g_list_foreach(rsc->cluster->nodes, ban_if_not_locked, rsc);
 }
diff --git a/lib/pacemaker/pcmk_sched_probes.c b/lib/pacemaker/pcmk_sched_probes.c
index 7a66e4c674..7e10a74292 100644
--- a/lib/pacemaker/pcmk_sched_probes.c
+++ b/lib/pacemaker/pcmk_sched_probes.c
@@ -1,892 +1,895 @@
 /*
- * Copyright 2004-2022 the Pacemaker project contributors
+ * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <glib.h>
 
 #include <crm/crm.h>
 #include <crm/pengine/status.h>
 #include <pacemaker-internal.h>
 #include "libpacemaker_private.h"
 
 /*!
  * \internal
  * \brief Add the expected result to a newly created probe
  *
  * \param[in] probe  Probe action to add expected result to
  * \param[in] rsc    Resource that probe is for
  * \param[in] node   Node that probe will run on
  */
 static void
 add_expected_result(pe_action_t *probe, pe_resource_t *rsc, pe_node_t *node)
 {
     // Check whether resource is currently active on node
     pe_node_t *running = pe_find_node_id(rsc->running_on, node->details->id);
 
     // The expected result is what we think the resource's current state is
     if (running == NULL) {
         pe__add_action_expected_result(probe, CRM_EX_NOT_RUNNING);
 
     } else if (rsc->role == RSC_ROLE_PROMOTED) {
         pe__add_action_expected_result(probe, CRM_EX_PROMOTED);
     }
 }
 
 /*!
  * \internal
  * \brief Create any needed robes on a node for a list of resources
  *
  * \param[in] rscs  List of resources to create probes for
  * \param[in] node  Node to create probes on
  *
  * \return true if any probe was created, otherwise false
  */
 bool
 pcmk__probe_resource_list(GList *rscs, pe_node_t *node)
 {
     bool any_created = false;
 
     for (GList *iter = rscs; iter != NULL; iter = iter->next) {
         pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
         if (rsc->cmds->create_probe(rsc, node)) {
             any_created = true;
         }
     }
     return any_created;
 }
 
 /*!
  * \internal
  * \brief Order one resource's start after another's start-up probe
  *
  * \param[in] rsc1  Resource that might get start-up probe
  * \param[in] rsc2  Resource that might be started
  */
 static void
 probe_then_start(pe_resource_t *rsc1, pe_resource_t *rsc2)
 {
     if ((rsc1->allocated_to != NULL)
         && (g_hash_table_lookup(rsc1->known_on,
                                 rsc1->allocated_to->details->id) == NULL)) {
 
         pcmk__new_ordering(rsc1, pcmk__op_key(rsc1->id, RSC_STATUS, 0), NULL,
                            rsc2, pcmk__op_key(rsc2->id, RSC_START, 0), NULL,
                            pe_order_optional, rsc1->cluster);
     }
 }
 
 /*!
  * \internal
  * \brief Check whether a guest resource will stop
  *
  * \param[in] node  Guest node to check
  *
  * \return true if guest resource will likely stop, otherwise false
  */
 static bool
 guest_resource_will_stop(pe_node_t *node)
 {
     pe_resource_t *guest_rsc = node->details->remote_rsc->container;
 
     /* Ideally, we'd check whether the guest has a required stop, but that
      * information doesn't exist yet, so approximate it ...
      */
     return node->details->remote_requires_reset
            || node->details->unclean
            || pcmk_is_set(guest_rsc->flags, pe_rsc_failed)
            || (guest_rsc->next_role == RSC_ROLE_STOPPED)
 
            // Guest is moving
            || ((guest_rsc->role > RSC_ROLE_STOPPED)
                && (guest_rsc->allocated_to != NULL)
                && (pe_find_node(guest_rsc->running_on,
                    guest_rsc->allocated_to->details->uname) == NULL));
 }
 
 /*!
  * \internal
  * \brief Create a probe action for a resource on a node
  *
  * \param[in] rsc   Resource to create probe for
  * \param[in[ node  Node to create probe on
  *
  * \return Newly created probe action
  */
 static pe_action_t *
 probe_action(pe_resource_t *rsc, pe_node_t *node)
 {
     pe_action_t *probe = NULL;
     char *key = pcmk__op_key(rsc->id, RSC_STATUS, 0);
 
     crm_debug("Scheduling probe of %s %s on %s",
               role2text(rsc->role), rsc->id, pe__node_name(node));
 
     probe = custom_action(rsc, key, RSC_STATUS, node, FALSE, TRUE,
                           rsc->cluster);
     pe__clear_action_flags(probe, pe_action_optional);
 
     pcmk__order_vs_unfence(rsc, node, probe, pe_order_optional);
     add_expected_result(probe, rsc, node);
     return probe;
 }
 
 /*!
  * \internal
  * \brief Create probes for a resource on a node, if needed
  *
  * \brief Schedule any probes needed for a resource on a node
  *
  * \param[in] rsc   Resource to create probe for
  * \param[in] node  Node to create probe on
  *
  * \return true if any probe was created, otherwise false
  */
 bool
 pcmk__probe_rsc_on_node(pe_resource_t *rsc, pe_node_t *node)
 {
     uint32_t flags = pe_order_optional;
     pe_action_t *probe = NULL;
     pe_node_t *allowed = NULL;
     pe_resource_t *top = uber_parent(rsc);
     const char *reason = NULL;
 
     CRM_CHECK((rsc != NULL) && (node != NULL), return false);
 
     if (!pcmk_is_set(rsc->cluster->flags, pe_flag_startup_probes)) {
         reason = "start-up probes are disabled";
         goto no_probe;
     }
 
     if (pe__is_guest_or_remote_node(node)) {
         const char *class = crm_element_value(rsc->xml, XML_AGENT_ATTR_CLASS);
 
         if (pcmk__str_eq(class, PCMK_RESOURCE_CLASS_STONITH, pcmk__str_none)) {
             reason = "Pacemaker Remote nodes cannot run stonith agents";
             goto no_probe;
 
         } else if (pe__is_guest_node(node)
                    && pe__resource_contains_guest_node(rsc->cluster, rsc)) {
             reason = "guest nodes cannot run resources containing guest nodes";
             goto no_probe;
 
         } else if (rsc->is_remote_node) {
             reason = "Pacemaker Remote nodes cannot host remote connections";
             goto no_probe;
         }
     }
 
     // If this is a collective resource, probes are created for its children
     if (rsc->children != NULL) {
         return pcmk__probe_resource_list(rsc->children, node);
     }
 
     if ((rsc->container != NULL) && !rsc->is_remote_node) {
         reason = "resource is inside a container";
         goto no_probe;
 
     } else if (pcmk_is_set(rsc->flags, pe_rsc_orphan)) {
         reason = "resource is orphaned";
         goto no_probe;
 
     } else if (g_hash_table_lookup(rsc->known_on, node->details->id) != NULL) {
         reason = "resource state is already known";
         goto no_probe;
     }
 
     allowed = g_hash_table_lookup(rsc->allowed_nodes, node->details->id);
 
     if (rsc->exclusive_discover || top->exclusive_discover) {
         // Exclusive discovery is enabled ...
 
         if (allowed == NULL) {
             // ... but this node is not allowed to run the resource
             reason = "resource has exclusive discovery but is not allowed "
                      "on node";
             goto no_probe;
 
         } else if (allowed->rsc_discover_mode != pe_discover_exclusive) {
             // ... but no constraint marks this node for discovery of resource
             reason = "resource has exclusive discovery but is not enabled "
                      "on node";
             goto no_probe;
         }
     }
 
     if (allowed == NULL) {
         allowed = node;
     }
     if (allowed->rsc_discover_mode == pe_discover_never) {
         reason = "node has discovery disabled";
         goto no_probe;
     }
 
     if (pe__is_guest_node(node)) {
         pe_resource_t *guest = node->details->remote_rsc->container;
 
         if (guest->role == RSC_ROLE_STOPPED) {
             // The guest is stopped, so we know no resource is active there
             reason = "node's guest is stopped";
             probe_then_start(guest, top);
             goto no_probe;
 
         } else if (guest_resource_will_stop(node)) {
             reason = "node's guest will stop";
 
             // Order resource start after guest stop (in case it's restarting)
             pcmk__new_ordering(guest, pcmk__op_key(guest->id, RSC_STOP, 0),
                                NULL, top, pcmk__op_key(top->id, RSC_START, 0),
                                NULL, pe_order_optional, rsc->cluster);
             goto no_probe;
         }
     }
 
     // We've eliminated all cases where a probe is not needed, so now it is
     probe = probe_action(rsc, node);
 
     /* Below, we will order the probe relative to start or reload. If this is a
      * clone instance, the start or reload is for the entire clone rather than
      * just the instance. Otherwise, the start or reload is for the resource
      * itself.
      */
     if (!pe_rsc_is_clone(top)) {
         top = rsc;
     }
 
     /* Prevent a start if the resource can't be probed, but don't cause the
      * resource or entire clone to stop if already active.
      */
     if (!pcmk_is_set(probe->flags, pe_action_runnable)
         && (top->running_on == NULL)) {
         pe__set_order_flags(flags, pe_order_runnable_left);
     }
 
     // Start or reload after probing the resource
     pcmk__new_ordering(rsc, NULL, probe,
                        top, pcmk__op_key(top->id, RSC_START, 0), NULL,
                        flags, rsc->cluster);
     pcmk__new_ordering(rsc, NULL, probe, top, reload_key(rsc), NULL,
                        pe_order_optional, rsc->cluster);
 
     return true;
 
 no_probe:
     pe_rsc_trace(rsc,
                  "Skipping probe for %s on %s because %s",
                  rsc->id, node->details->id, reason);
     return false;
 }
 
 /*!
  * \internal
  * \brief Check whether a probe should be ordered before another action
  *
  * \param[in] probe  Probe action to check
  * \param[in] then   Other action to check
  *
  * \return true if \p probe should be ordered before \p then, otherwise false
  */
 static bool
 probe_needed_before_action(pe_action_t *probe, pe_action_t *then)
 {
     // Probes on a node are performed after unfencing it, not before
     if (pcmk__str_eq(then->task, CRM_OP_FENCE, pcmk__str_casei)
          && (probe->node != NULL) && (then->node != NULL)
          && (probe->node->details == then->node->details)) {
         const char *op = g_hash_table_lookup(then->meta, "stonith_action");
 
         if (pcmk__str_eq(op, "on", pcmk__str_casei)) {
             return false;
         }
     }
 
     // Probes should be done on a node before shutting it down
     if (pcmk__str_eq(then->task, CRM_OP_SHUTDOWN, pcmk__str_none)
         && (probe->node != NULL) && (then->node != NULL)
         && (probe->node->details != then->node->details)) {
         return false;
     }
 
     // Otherwise probes should always be done before any other action
     return true;
 }
 
 /*!
  * \internal
  * \brief Add implicit "probe then X" orderings for "stop then X" orderings
  *
  * If the state of a resource is not known yet, a probe will be scheduled,
  * expecting a "not running" result. If the probe fails, a stop will not be
  * scheduled until the next transition. Thus, if there are ordering constraints
  * like "stop this resource then do something else that's not for the same
  * resource", add implicit "probe this resource then do something" equivalents
  * so the relation is upheld until we know whether a stop is needed.
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 add_probe_orderings_for_stops(pe_working_set_t *data_set)
 {
     for (GList *iter = data_set->ordering_constraints; iter != NULL;
          iter = iter->next) {
 
         pe__ordering_t *order = iter->data;
         uint32_t order_flags = pe_order_optional;
         GList *probes = NULL;
         GList *then_actions = NULL;
 
         // Skip disabled orderings
         if (order->flags == pe_order_none) {
             continue;
         }
 
         // Skip non-resource orderings, and orderings for the same resource
         if ((order->lh_rsc == NULL) || (order->lh_rsc == order->rh_rsc)) {
             continue;
         }
 
         // Skip invalid orderings (shouldn't be possible)
         if (((order->lh_action == NULL) && (order->lh_action_task == NULL)) ||
             ((order->rh_action == NULL) && (order->rh_action_task == NULL))) {
             continue;
         }
 
         // Skip orderings for first actions other than stop
         if ((order->lh_action != NULL)
             && !pcmk__str_eq(order->lh_action->task, RSC_STOP, pcmk__str_none)) {
             continue;
         } else if ((order->lh_action == NULL)
                    && !pcmk__ends_with(order->lh_action_task, "_" RSC_STOP "_0")) {
             continue;
         }
 
         /* Do not imply a probe ordering for a resource inside of a stopping
          * container. Otherwise, it might introduce a transition loop, since a
          * probe could be scheduled after the container starts again.
          */
         if ((order->rh_rsc != NULL)
             && (order->lh_rsc->container == order->rh_rsc)) {
 
             if ((order->rh_action != NULL)
                 && pcmk__str_eq(order->rh_action->task, RSC_STOP,
                                 pcmk__str_none)) {
                 continue;
             } else if ((order->rh_action == NULL)
                        && pcmk__ends_with(order->rh_action_task,
                                           "_" RSC_STOP "_0")) {
                 continue;
             }
         }
 
         // Preserve certain order options for future filtering
         if (pcmk_is_set(order->flags, pe_order_apply_first_non_migratable)) {
             pe__set_order_flags(order_flags,
                                 pe_order_apply_first_non_migratable);
         }
         if (pcmk_is_set(order->flags, pe_order_same_node)) {
             pe__set_order_flags(order_flags, pe_order_same_node);
         }
 
         // Preserve certain order types for future filtering
         if ((order->flags == pe_order_anti_colocation)
             || (order->flags == pe_order_load)) {
             order_flags = order->flags;
         }
 
         // List all scheduled probes for the first resource
         probes = pe__resource_actions(order->lh_rsc, NULL, RSC_STATUS, FALSE);
         if (probes == NULL) { // There aren't any
             continue;
         }
 
         // List all relevant "then" actions
         if (order->rh_action != NULL) {
             then_actions = g_list_prepend(NULL, order->rh_action);
 
         } else if (order->rh_rsc != NULL) {
             then_actions = find_actions(order->rh_rsc->actions,
                                         order->rh_action_task, NULL);
             if (then_actions == NULL) { // There aren't any
                 g_list_free(probes);
                 continue;
             }
         }
 
         crm_trace("Implying 'probe then' orderings for '%s then %s' "
                   "(id=%d, type=%.6x)",
                   order->lh_action? order->lh_action->uuid : order->lh_action_task,
                   order->rh_action? order->rh_action->uuid : order->rh_action_task,
                   order->id, order->flags);
 
         for (GList *probe_iter = probes; probe_iter != NULL;
              probe_iter = probe_iter->next) {
 
             pe_action_t *probe = (pe_action_t *) probe_iter->data;
 
             for (GList *then_iter = then_actions; then_iter != NULL;
                  then_iter = then_iter->next) {
 
                 pe_action_t *then = (pe_action_t *) then_iter->data;
 
                 if (probe_needed_before_action(probe, then)) {
                     order_actions(probe, then, order_flags);
                 }
             }
         }
 
         g_list_free(then_actions);
         g_list_free(probes);
     }
 }
 
 /*!
  * \internal
  * \brief Add necessary orderings between probe and starts of clone instances
  *
  * , in additon to the ordering with the parent resource added upon creating
  * the probe.
  *
  * \param[in,out] probe     Probe as 'first' action in an ordering
  * \param[in,out] after     'then' action wrapper in the ordering
  */
 static void
 add_start_orderings_for_probe(pe_action_t *probe, pe_action_wrapper_t *after)
 {
     uint32_t flags = pe_order_optional|pe_order_runnable_left;
 
     /* Although the ordering between the probe of the clone instance and the
      * start of its parent has been added in pcmk__probe_rsc_on_node(), we
      * avoided enforcing `pe_order_runnable_left` order type for that as long as
      * any of the clone instances are running to prevent them from being
      * unexpectedly stopped.
      *
      * On the other hand, we still need to prevent any inactive instances from
      * starting unless the probe is runnable so that we don't risk starting too
      * many instances before we know the state on all nodes.
      */
     if (after->action->rsc->variant <= pe_group
         || pcmk_is_set(probe->flags, pe_action_runnable)
         // The order type is already enforced for its parent.
         || pcmk_is_set(after->type, pe_order_runnable_left)
-        || uber_parent(probe->rsc) != after->action->rsc
+        || (pe__const_top_resource(probe->rsc, false) != after->action->rsc)
         || !pcmk__str_eq(after->action->task, RSC_START, pcmk__str_none)) {
         return;
     }
 
     crm_trace("Adding probe start orderings for '%s@%s (%s) "
               "then instances of %s@%s'",
               probe->uuid, pe__node_name(probe->node),
               pcmk_is_set(probe->flags, pe_action_runnable)? "runnable" : "unrunnable",
               after->action->uuid, pe__node_name(after->action->node));
 
     for (GList *then_iter = after->action->actions_after; then_iter != NULL;
          then_iter = then_iter->next) {
 
         pe_action_wrapper_t *then = (pe_action_wrapper_t *) then_iter->data;
 
         if (then->action->rsc->running_on
-            || uber_parent(then->action->rsc) != after->action->rsc
+            || (pe__const_top_resource(then->action->rsc, false)
+                != after->action->rsc)
             || !pcmk__str_eq(then->action->task, RSC_START, pcmk__str_none)) {
             continue;
         }
 
         crm_trace("Adding probe start ordering for '%s@%s (%s) "
                   "then %s@%s' (type=%#.6x)",
                   probe->uuid, pe__node_name(probe->node),
                   pcmk_is_set(probe->flags, pe_action_runnable)? "runnable" : "unrunnable",
                   then->action->uuid, pe__node_name(then->action->node),
                   flags);
 
         /* Prevent the instance from starting if the instance can't, but don't
          * cause any other intances to stop if already active.
          */
         order_actions(probe, then->action, flags);
     }
 
     return;
 }
 
 /*!
  * \internal
  * \brief Order probes before restarts and re-promotes
  *
  * If a given ordering is a "probe then start" or "probe then promote" ordering,
  * add an implicit "probe then stop/demote" ordering in case the action is part
  * of a restart/re-promote, and do the same recursively for all actions ordered
  * after the "then" action.
  *
  * \param[in] probe     Probe as 'first' action in an ordering
  * \param[in] after     'then' action in the ordering
  * \param[in] data_set  Cluster working set
  */
 static void
 add_restart_orderings_for_probe(pe_action_t *probe, pe_action_t *after,
                                 pe_working_set_t *data_set)
 {
     GList *iter = NULL;
     bool interleave = false;
     pe_resource_t *compatible_rsc = NULL;
 
     // Validate that this is a resource probe followed by some action
     if ((after == NULL) || (probe == NULL) || (probe->rsc == NULL)
         || (probe->rsc->variant != pe_native)
         || !pcmk__str_eq(probe->task, RSC_STATUS, pcmk__str_casei)) {
         return;
     }
 
     // Avoid running into any possible loop
     if (pcmk_is_set(after->flags, pe_action_tracking)) {
         return;
     }
     pe__set_action_flags(after, pe_action_tracking);
 
     crm_trace("Adding probe restart orderings for '%s@%s then %s@%s'",
               probe->uuid, pe__node_name(probe->node),
               after->uuid, pe__node_name(after->node));
 
     /* Add restart orderings if "then" is for a different primitive.
      * Orderings for collective resources will be added later.
      */
     if ((after->rsc != NULL) && (after->rsc->variant == pe_native)
         && (probe->rsc != after->rsc)) {
 
             GList *then_actions = NULL;
 
             if (pcmk__str_eq(after->task, RSC_START, pcmk__str_casei)) {
                 then_actions = pe__resource_actions(after->rsc, NULL, RSC_STOP,
                                                     FALSE);
 
             } else if (pcmk__str_eq(after->task, RSC_PROMOTE, pcmk__str_casei)) {
                 then_actions = pe__resource_actions(after->rsc, NULL,
                                                     RSC_DEMOTE, FALSE);
             }
 
             for (iter = then_actions; iter != NULL; iter = iter->next) {
                 pe_action_t *then = (pe_action_t *) iter->data;
 
                 // Skip pseudo-actions (for example, those implied by fencing)
                 if (!pcmk_is_set(then->flags, pe_action_pseudo)) {
                     order_actions(probe, then, pe_order_optional);
                 }
             }
             g_list_free(then_actions);
     }
 
     /* Detect whether "then" is an interleaved clone action. For these, we want
      * to add orderings only for the relevant instance.
      */
     if ((after->rsc != NULL)
         && (after->rsc->variant > pe_group)) {
         const char *interleave_s = g_hash_table_lookup(after->rsc->meta,
                                                        XML_RSC_ATTR_INTERLEAVE);
 
         interleave = crm_is_true(interleave_s);
         if (interleave) {
             compatible_rsc = find_compatible_child(probe->rsc,
                                                    after->rsc,
                                                    RSC_ROLE_UNKNOWN,
                                                    FALSE);
         }
     }
 
     /* Now recursively do the same for all actions ordered after "then". This
      * also handles collective resources since the collective action will be
      * ordered before its individual instances' actions.
      */
     for (iter = after->actions_after; iter != NULL; iter = iter->next) {
         pe_action_wrapper_t *after_wrapper = (pe_action_wrapper_t *) iter->data;
 
         /* pe_order_implies_then is the reason why a required A.start
          * implies/enforces B.start to be required too, which is the cause of
          * B.restart/re-promote.
          *
          * Not sure about pe_order_implies_then_on_node though. It's now only
          * used for unfencing case, which tends to introduce transition
          * loops...
          */
         if (!pcmk_is_set(after_wrapper->type, pe_order_implies_then)) {
             /* The order type between a group/clone and its child such as
              * B.start-> B_child.start is:
              * pe_order_implies_first_printed | pe_order_runnable_left
              *
              * Proceed through the ordering chain and build dependencies with
              * its children.
              */
             if ((after->rsc == NULL)
                 || (after->rsc->variant < pe_group)
                 || (probe->rsc->parent == after->rsc)
                 || (after_wrapper->action->rsc == NULL)
                 || (after_wrapper->action->rsc->variant > pe_group)
                 || (after->rsc != after_wrapper->action->rsc->parent)) {
                 continue;
             }
 
             /* Proceed to the children of a group or a non-interleaved clone.
              * For an interleaved clone, proceed only to the relevant child.
              */
             if ((after->rsc->variant > pe_group) && interleave
                 && ((compatible_rsc == NULL)
                     || (compatible_rsc != after_wrapper->action->rsc))) {
                 continue;
             }
         }
 
         crm_trace("Recursively adding probe restart orderings for "
                   "'%s@%s then %s@%s' (type=%#.6x)",
                   after->uuid, pe__node_name(after->node),
                   after_wrapper->action->uuid,
                   pe__node_name(after_wrapper->action->node),
                   after_wrapper->type);
 
         add_restart_orderings_for_probe(probe, after_wrapper->action, data_set);
     }
 }
 
 /*!
  * \internal
  * \brief Clear the tracking flag on all scheduled actions
  *
  * \param[in] data_set  Cluster working set
  */
 static void
 clear_actions_tracking_flag(pe_working_set_t *data_set)
 {
     GList *gIter = NULL;
 
     for (gIter = data_set->actions; gIter != NULL; gIter = gIter->next) {
         pe_action_t *action = (pe_action_t *) gIter->data;
 
         pe__clear_action_flags(action, pe_action_tracking);
     }
 }
 
 /*!
  * \internal
  * \brief Add start and restart orderings for any scheduled probes for a given resource
  *
  * \param[in] rsc       Resource whose probes should be ordered
  * \param[in] data_set  Cluster working set
  */
 static void
 add_start_restart_orderings_for_rsc(pe_resource_t *rsc,
                                     pe_working_set_t *data_set)
 {
     GList *probes = NULL;
 
     // For collective resources, order each instance recursively
     if (rsc->variant != pe_native) {
         g_list_foreach(rsc->children,
                        (GFunc) add_start_restart_orderings_for_rsc, data_set);
         return;
     }
 
     // Find all probes for given resource
     probes = pe__resource_actions(rsc, NULL, RSC_STATUS, FALSE);
 
     // Add probe restart orderings for each probe found
     for (GList *iter = probes; iter != NULL; iter = iter->next) {
         pe_action_t *probe = (pe_action_t *) iter->data;
 
         for (GList *then_iter = probe->actions_after; then_iter != NULL;
              then_iter = then_iter->next) {
 
             pe_action_wrapper_t *then = (pe_action_wrapper_t *) then_iter->data;
 
             add_start_orderings_for_probe(probe, then);
             add_restart_orderings_for_probe(probe, then->action, data_set);
             clear_actions_tracking_flag(data_set);
         }
     }
 
     g_list_free(probes);
 }
 
 /*!
  * \internal
  * \brief Add "A then probe B" orderings for "A then B" orderings
  *
  * \param[in] data_set  Cluster working set
  *
  * \note This function is currently disabled (see next comment).
  */
 static void
 order_then_probes(pe_working_set_t *data_set)
 {
 #if 0
     /* Given an ordering "A then B", we would prefer to wait for A to be started
      * before probing B.
      *
      * For example, if A is a filesystem which B can't even run without, it
      * would be helpful if the author of B's agent could assume that A is
      * running before B.monitor will be called.
      *
      * However, we can't _only_ probe after A is running, otherwise we wouldn't
      * detect the state of B if A could not be started. We can't even do an
      * opportunistic version of this, because B may be moving:
      *
      *   A.stop -> A.start -> B.probe -> B.stop -> B.start
      *
      * and if we add B.stop -> A.stop here, we get a loop:
      *
      *   A.stop -> A.start -> B.probe -> B.stop -> A.stop
      *
      * We could kill the "B.probe -> B.stop" dependency, but that could mean
      * stopping B "too" soon, because B.start must wait for the probe, and
      * we don't want to stop B if we can't start it.
      *
      * We could add the ordering only if A is an anonymous clone with
      * clone-max == node-max (since we'll never be moving it). However, we could
      * still be stopping one instance at the same time as starting another.
      *
      * The complexity of checking for allowed conditions combined with the ever
      * narrowing use case suggests that this code should remain disabled until
      * someone gets smarter.
      */
     for (GList *iter = data_set->resources; iter != NULL; iter = iter->next) {
         pe_resource_t *rsc = (pe_resource_t *) iter->data;
 
         pe_action_t *start = NULL;
         GList *actions = NULL;
         GList *probes = NULL;
 
         actions = pe__resource_actions(rsc, NULL, RSC_START, FALSE);
 
         if (actions) {
             start = actions->data;
             g_list_free(actions);
         }
 
         if (start == NULL) {
             crm_err("No start action for %s", rsc->id);
             continue;
         }
 
         probes = pe__resource_actions(rsc, NULL, RSC_STATUS, FALSE);
 
         for (actions = start->actions_before; actions != NULL;
              actions = actions->next) {
 
             pe_action_wrapper_t *before = (pe_action_wrapper_t *) actions->data;
 
             pe_action_t *first = before->action;
             pe_resource_t *first_rsc = first->rsc;
 
             if (first->required_runnable_before) {
                 for (GList *clone_actions = first->actions_before;
                      clone_actions != NULL;
                      clone_actions = clone_actions->next) {
 
                     before = (pe_action_wrapper_t *) clone_actions->data;
 
                     crm_trace("Testing '%s then %s' for %s",
                               first->uuid, before->action->uuid, start->uuid);
 
                     CRM_ASSERT(before->action->rsc != NULL);
                     first_rsc = before->action->rsc;
                     break;
                 }
 
             } else if (!pcmk__str_eq(first->task, RSC_START, pcmk__str_none)) {
                 crm_trace("Not a start op %s for %s", first->uuid, start->uuid);
             }
 
             if (first_rsc == NULL) {
                 continue;
 
-            } else if (uber_parent(first_rsc) == uber_parent(start->rsc)) {
+            } else if (pe__const_top_resource(first_rsc, false)
+                       == pe__const_top_resource(start->rsc, false)) {
                 crm_trace("Same parent %s for %s", first_rsc->id, start->uuid);
                 continue;
 
-            } else if (!pe_rsc_is_clone(uber_parent(first_rsc))) {
+            } else if (!pe_rsc_is_clone(pe__const_top_resource(first_rsc,
+                                                               false))) {
                 crm_trace("Not a clone %s for %s", first_rsc->id, start->uuid);
                 continue;
             }
 
             crm_err("Applying %s before %s %d", first->uuid, start->uuid,
-                    uber_parent(first_rsc)->variant);
+                    pe__const_top_resource(first_rsc, false)->variant);
 
             for (GList *probe_iter = probes; probe_iter != NULL;
                  probe_iter = probe_iter->next) {
 
                 pe_action_t *probe = (pe_action_t *) probe_iter->data;
 
                 crm_err("Ordering %s before %s", first->uuid, probe->uuid);
                 order_actions(first, probe, pe_order_optional);
             }
         }
     }
 #endif
 }
 
 void
 pcmk__order_probes(pe_working_set_t *data_set)
 {
     // Add orderings for "probe then X"
     g_list_foreach(data_set->resources,
                    (GFunc) add_start_restart_orderings_for_rsc, data_set);
     add_probe_orderings_for_stops(data_set);
 
     order_then_probes(data_set);
 }
 
 /*!
  * \internal
  * \brief Schedule any probes needed
  *
  * \param[in] data_set  Cluster working set
  *
  * \note This may also schedule fencing of failed remote nodes.
  */
 void
 pcmk__schedule_probes(pe_working_set_t *data_set)
 {
     // Schedule probes on each node in the cluster as needed
     for (GList *iter = data_set->nodes; iter != NULL; iter = iter->next) {
         pe_node_t *node = (pe_node_t *) iter->data;
         const char *probed = NULL;
 
         if (!node->details->online) { // Don't probe offline nodes
             if (pcmk__is_failed_remote_node(node)) {
                 pe_fence_node(data_set, node,
                               "the connection is unrecoverable", FALSE);
             }
             continue;
 
         } else if (node->details->unclean) { // ... or nodes that need fencing
             continue;
 
         } else if (!node->details->rsc_discovery_enabled) {
             // The user requested that probes not be done on this node
             continue;
         }
 
         /* This is no longer needed for live clusters, since the probe_complete
          * node attribute will never be in the CIB. However this is still useful
          * for processing old saved CIBs (< 1.1.14), including the
          * reprobe-target_rc regression test.
          */
         probed = pe_node_attribute_raw(node, CRM_OP_PROBED);
         if (probed != NULL && crm_is_true(probed) == FALSE) {
             pe_action_t *probe_op = NULL;
 
             probe_op = custom_action(NULL,
                                      crm_strdup_printf("%s-%s", CRM_OP_REPROBE,
                                                        node->details->uname),
                                      CRM_OP_REPROBE, node, FALSE, TRUE,
                                      data_set);
             add_hash_param(probe_op->meta, XML_ATTR_TE_NOWAIT,
                            XML_BOOLEAN_TRUE);
             continue;
         }
 
         // Probe each resource in the cluster on this node, as needed
         pcmk__probe_resource_list(data_set->resources, node);
     }
 }
diff --git a/lib/pacemaker/pcmk_sched_promotable.c b/lib/pacemaker/pcmk_sched_promotable.c
index 5186a2be79..7c06ba9139 100644
--- a/lib/pacemaker/pcmk_sched_promotable.c
+++ b/lib/pacemaker/pcmk_sched_promotable.c
@@ -1,1264 +1,1265 @@
 /*
- * Copyright 2004-2022 the Pacemaker project contributors
+ * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU General Public License version 2
  * or later (GPLv2+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <crm/msg_xml.h>
 #include <pacemaker-internal.h>
 
 #include "libpacemaker_private.h"
 
 /*!
  * \internal
  * \brief Add implicit promotion ordering for a promotable instance
  *
  * \param[in] clone  Clone resource
  * \param[in] child  Instance of \p clone being ordered
  * \param[in] last   Previous instance ordered (NULL if \p child is first)
  */
 static void
 order_instance_promotion(pe_resource_t *clone, pe_resource_t *child,
                          pe_resource_t *last)
 {
     // "Promote clone" -> promote instance -> "clone promoted"
     pcmk__order_resource_actions(clone, RSC_PROMOTE, child, RSC_PROMOTE,
                                  pe_order_optional);
     pcmk__order_resource_actions(child, RSC_PROMOTE, clone, RSC_PROMOTED,
                                  pe_order_optional);
 
     // If clone is ordered, order this instance relative to last
     if ((last != NULL) && pe__clone_is_ordered(clone)) {
         pcmk__order_resource_actions(last, RSC_PROMOTE, child, RSC_PROMOTE,
                                      pe_order_optional);
     }
 }
 
 /*!
  * \internal
  * \brief Add implicit demotion ordering for a promotable instance
  *
  * \param[in] clone  Clone resource
  * \param[in] child  Instance of \p clone being ordered
  * \param[in] last   Previous instance ordered (NULL if \p child is first)
  */
 static void
 order_instance_demotion(pe_resource_t *clone, pe_resource_t *child,
                         pe_resource_t *last)
 {
     // "Demote clone" -> demote instance -> "clone demoted"
     pcmk__order_resource_actions(clone, RSC_DEMOTE, child, RSC_DEMOTE,
                                  pe_order_implies_first_printed);
     pcmk__order_resource_actions(child, RSC_DEMOTE, clone, RSC_DEMOTED,
                                  pe_order_implies_then_printed);
 
     // If clone is ordered, order this instance relative to last
     if ((last != NULL) && pe__clone_is_ordered(clone)) {
         pcmk__order_resource_actions(child, RSC_DEMOTE, last, RSC_DEMOTE,
                                      pe_order_optional);
     }
 }
 
 /*!
  * \internal
  * \brief Check whether an instance will be promoted or demoted
  *
  * \param[in] rsc        Instance to check
  * \param[in] demoting   If \p rsc will be demoted, this will be set to true
  * \param[in] promoting  If \p rsc will be promoted, this will be set to true
  */
 static void
 check_for_role_change(pe_resource_t *rsc, bool *demoting, bool *promoting)
 {
     GList *iter = NULL;
 
     // If this is a cloned group, check group members recursively
     if (rsc->children != NULL) {
         for (iter = rsc->children; iter != NULL; iter = iter->next) {
             check_for_role_change((pe_resource_t *) iter->data,
                                   demoting, promoting);
         }
         return;
     }
 
     for (iter = rsc->actions; iter != NULL; iter = iter->next) {
         pe_action_t *action = (pe_action_t *) iter->data;
 
         if (*promoting && *demoting) {
             return;
 
         } else if (pcmk_is_set(action->flags, pe_action_optional)) {
             continue;
 
         } else if (pcmk__str_eq(RSC_DEMOTE, action->task, pcmk__str_none)) {
             *demoting = true;
 
         } else if (pcmk__str_eq(RSC_PROMOTE, action->task, pcmk__str_none)) {
             *promoting = true;
         }
     }
 }
 
 /*!
  * \internal
  * \brief Add promoted-role location constraint scores to an instance's priority
  *
  * Adjust a promotable clone instance's promotion priority by the scores of any
  * location constraints in a list that are both limited to the promoted role and
  * for the node where the instance will be placed.
  *
  * \param[in] child                 Promotable clone instance
  * \param[in] location_constraints  List of location constraints to apply
  * \param[in] chosen                Node where \p child will be placed
  */
 static void
 apply_promoted_locations(pe_resource_t *child, GList *location_constraints,
                          pe_node_t *chosen)
 {
     for (GList *iter = location_constraints; iter; iter = iter->next) {
         pe__location_t *location = iter->data;
         pe_node_t *weighted_node = NULL;
 
         if (location->role_filter == RSC_ROLE_PROMOTED) {
             weighted_node = pe_find_node_id(location->node_list_rh,
                                             chosen->details->id);
         }
         if (weighted_node != NULL) {
             int new_priority = pcmk__add_scores(child->priority,
                                                 weighted_node->weight);
 
             pe_rsc_trace(child,
                          "Applying location %s to %s promotion priority on %s: "
                          "%d + %d = %d",
                          location->id, child->id, pe__node_name(weighted_node),
                          child->priority, weighted_node->weight, new_priority);
             child->priority = new_priority;
         }
     }
 }
 
 /*!
  * \internal
  * \brief Get the node that an instance will be promoted on
  *
  * \param[in] rsc  Promotable clone instance to check
  *
  * \return Node that \p rsc will be promoted on, or NULL if none
  */
 static pe_node_t *
 node_to_be_promoted_on(pe_resource_t *rsc)
 {
     pe_node_t *node = NULL;
     pe_node_t *local_node = NULL;
-    pe_resource_t *parent = uber_parent(rsc);
+    const pe_resource_t *parent = NULL;
 
     // If this is a cloned group, bail if any group member can't be promoted
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *child = (pe_resource_t *) iter->data;
 
         if (node_to_be_promoted_on(child) == NULL) {
             pe_rsc_trace(rsc,
                          "%s can't be promoted because member %s can't",
                          rsc->id, child->id);
             return NULL;
         }
     }
 
     node = rsc->fns->location(rsc, NULL, FALSE);
     if (node == NULL) {
         pe_rsc_trace(rsc, "%s can't be promoted because it won't be active",
                      rsc->id);
         return NULL;
 
     } else if (!pcmk_is_set(rsc->flags, pe_rsc_managed)) {
         if (rsc->fns->state(rsc, TRUE) == RSC_ROLE_PROMOTED) {
             crm_notice("Unmanaged instance %s will be left promoted on %s",
                        rsc->id, pe__node_name(node));
         } else {
             pe_rsc_trace(rsc, "%s can't be promoted because it is unmanaged",
                          rsc->id);
             return NULL;
         }
 
     } else if (rsc->priority < 0) {
         pe_rsc_trace(rsc,
                      "%s can't be promoted because its promotion priority %d "
                      "is negative",
                      rsc->id, rsc->priority);
         return NULL;
 
     } else if (!pcmk__node_available(node, false, true)) {
         pe_rsc_trace(rsc, "%s can't be promoted because %s can't run resources",
                      rsc->id, pe__node_name(node));
         return NULL;
     }
 
+    parent = pe__const_top_resource(rsc, false);
     local_node = pe_hash_table_lookup(parent->allowed_nodes, node->details->id);
 
     if (local_node == NULL) {
         /* It should not be possible for the scheduler to have allocated the
          * instance to a node where its parent is not allowed, but it's good to
          * have a fail-safe.
          */
         if (pcmk_is_set(rsc->flags, pe_rsc_managed)) {
             crm_warn("%s can't be promoted because %s is not allowed on %s "
                      "(scheduler bug?)",
                      rsc->id, parent->id, pe__node_name(node));
         } // else the instance is unmanaged and already promoted
         return NULL;
 
     } else if ((local_node->count >= pe__clone_promoted_node_max(parent))
                && pcmk_is_set(rsc->flags, pe_rsc_managed)) {
         pe_rsc_trace(rsc,
                      "%s can't be promoted because %s has "
                      "maximum promoted instances already",
                      rsc->id, pe__node_name(node));
         return NULL;
     }
 
     return local_node;
 }
 
 /*!
  * \internal
  * \brief Compare two promotable clone instances by promotion priority
  *
  * \param[in] a  First instance to compare
  * \param[in] b  Second instance to compare
  *
  * \return A negative number if \p a has higher promotion priority,
  *         a positive number if \p b has higher promotion priority,
  *         or 0 if promotion priorities are equal
  */
 static gint
 cmp_promotable_instance(gconstpointer a, gconstpointer b)
 {
     const pe_resource_t *rsc1 = (const pe_resource_t *) a;
     const pe_resource_t *rsc2 = (const pe_resource_t *) b;
 
     enum rsc_role_e role1 = RSC_ROLE_UNKNOWN;
     enum rsc_role_e role2 = RSC_ROLE_UNKNOWN;
 
     CRM_ASSERT((rsc1 != NULL) && (rsc2 != NULL));
 
     // Check sort index set by pcmk__set_instance_roles()
     if (rsc1->sort_index > rsc2->sort_index) {
         pe_rsc_trace(rsc1,
                      "%s has higher promotion priority than %s "
                      "(sort index %d > %d)",
                      rsc1->id, rsc2->id, rsc1->sort_index, rsc2->sort_index);
         return -1;
     } else if (rsc1->sort_index < rsc2->sort_index) {
         pe_rsc_trace(rsc1,
                      "%s has lower promotion priority than %s "
                      "(sort index %d < %d)",
                      rsc1->id, rsc2->id, rsc1->sort_index, rsc2->sort_index);
         return 1;
     }
 
     // If those are the same, prefer instance whose current role is higher
     role1 = rsc1->fns->state(rsc1, TRUE);
     role2 = rsc2->fns->state(rsc2, TRUE);
     if (role1 > role2) {
         pe_rsc_trace(rsc1,
                      "%s has higher promotion priority than %s "
                      "(higher current role)",
                      rsc1->id, rsc2->id);
         return -1;
     } else if (role1 < role2) {
         pe_rsc_trace(rsc1,
                      "%s has lower promotion priority than %s "
                      "(lower current role)",
                      rsc1->id, rsc2->id);
         return 1;
     }
 
     // Finally, do normal clone instance sorting
     return pcmk__cmp_instance(a, b);
 }
 
 /*!
  * \internal
  * \brief Add a promotable clone instance's sort index to its node's weight
  *
  * Add a promotable clone instance's sort index (which sums its promotion
  * preferences and scores of relevant location constraints for the promoted
  * role) to the node weight of the instance's allocated node.
  *
  * \param[in] data       Promotable clone instance
  * \param[in] user_data  Clone parent of \p data
  */
 static void
 add_sort_index_to_node_weight(gpointer data, gpointer user_data)
 {
     pe_resource_t *child = (pe_resource_t *) data;
     pe_resource_t *clone = (pe_resource_t *) user_data;
 
     pe_node_t *node = NULL;
     pe_node_t *chosen = NULL;
 
     if (child->sort_index < 0) {
         pe_rsc_trace(clone, "Not adding sort index of %s: negative", child->id);
         return;
     }
 
     chosen = child->fns->location(child, NULL, FALSE);
     if (chosen == NULL) {
         pe_rsc_trace(clone, "Not adding sort index of %s: inactive", child->id);
         return;
     }
 
     node = (pe_node_t *) pe_hash_table_lookup(clone->allowed_nodes,
                                               chosen->details->id);
     CRM_ASSERT(node != NULL);
 
     pe_rsc_trace(clone, "Adding sort index %s of %s to weight for %s",
                  pcmk_readable_score(child->sort_index), child->id,
                  pe__node_name(node));
     node->weight = pcmk__add_scores(child->sort_index, node->weight);
 }
 
 /*!
  * \internal
  * \brief Apply colocation to dependent's node weights if for promoted role
  *
  * \param[in] data       Colocation constraint to apply
  * \param[in] user_data  Promotable clone that is constraint's dependent
  */
 static void
 apply_coloc_to_dependent(gpointer data, gpointer user_data)
 {
     pcmk__colocation_t *constraint = (pcmk__colocation_t *) data;
     pe_resource_t *clone = (pe_resource_t *) user_data;
     pe_resource_t *primary = constraint->primary;
     uint32_t flags = pcmk__coloc_select_default;
     float factor = constraint->score / (float) INFINITY;
 
     if (constraint->dependent_role != RSC_ROLE_PROMOTED) {
         return;
     }
     if (constraint->score < INFINITY) {
         flags = pcmk__coloc_select_active;
     }
     pe_rsc_trace(clone, "Applying colocation %s (promoted %s with %s) @%s",
                  constraint->id, constraint->dependent->id,
                  constraint->primary->id,
                  pcmk_readable_score(constraint->score));
     pcmk__add_colocated_node_scores(primary, clone->id, &clone->allowed_nodes,
                                     constraint->node_attribute, factor, flags);
 }
 
 /*!
  * \internal
  * \brief Apply colocation to primary's node weights if for promoted role
  *
  * \param[in] data       Colocation constraint to apply
  * \param[in] user_data  Promotable clone that is constraint's primary
  */
 static void
 apply_coloc_to_primary(gpointer data, gpointer user_data)
 {
     pcmk__colocation_t *constraint = (pcmk__colocation_t *) data;
     pe_resource_t *clone = (pe_resource_t *) user_data;
     pe_resource_t *dependent = constraint->dependent;
     const float factor = constraint->score / (float) INFINITY;
     const uint32_t flags = pcmk__coloc_select_active
                            |pcmk__coloc_select_nonnegative;
 
     if ((constraint->primary_role != RSC_ROLE_PROMOTED)
          || !pcmk__colocation_has_influence(constraint, NULL)) {
         return;
     }
 
     pe_rsc_trace(clone, "Applying colocation %s (%s with promoted %s) @%s",
                  constraint->id, constraint->dependent->id,
                  constraint->primary->id,
                  pcmk_readable_score(constraint->score));
     pcmk__add_colocated_node_scores(dependent, clone->id, &clone->allowed_nodes,
                                     constraint->node_attribute, factor, flags);
 }
 
 /*!
  * \internal
  * \brief Set clone instance's sort index to its node's weight
  *
  * \param[in] data       Promotable clone instance
  * \param[in] user_data  Parent clone of \p data
  */
 static void
 set_sort_index_to_node_weight(gpointer data, gpointer user_data)
 {
     pe_resource_t *child = (pe_resource_t *) data;
     pe_resource_t *clone = (pe_resource_t *) user_data;
 
     pe_node_t *chosen = child->fns->location(child, NULL, FALSE);
 
     if (!pcmk_is_set(child->flags, pe_rsc_managed)
         && (child->next_role == RSC_ROLE_PROMOTED)) {
         child->sort_index = INFINITY;
         pe_rsc_trace(clone,
                      "Final sort index for %s is INFINITY (unmanaged promoted)",
                      child->id);
 
     } else if ((chosen == NULL) || (child->sort_index < 0)) {
         pe_rsc_trace(clone,
                      "Final sort index for %s is %d (ignoring node weight)",
                      child->id, child->sort_index);
 
     } else {
         pe_node_t *node = NULL;
 
         node = (pe_node_t *) pe_hash_table_lookup(clone->allowed_nodes,
                                                   chosen->details->id);
         CRM_ASSERT(node != NULL);
 
         child->sort_index = node->weight;
         pe_rsc_trace(clone,
                      "Merging weights for %s: final sort index for %s is %d",
                      clone->id, child->id, child->sort_index);
     }
 }
 
 /*!
  * \internal
  * \brief Sort a promotable clone's instances by descending promotion priority
  *
  * \param[in] clone  Promotable clone to sort
  */
 static void
 sort_promotable_instances(pe_resource_t *clone)
 {
     if (pe__set_clone_flag(clone, pe__clone_promotion_constrained)
             == pcmk_rc_already) {
         return;
     }
     pe__set_resource_flags(clone, pe_rsc_merging);
 
     for (GList *iter = clone->children; iter != NULL; iter = iter->next) {
         pe_resource_t *child = (pe_resource_t *) iter->data;
 
         pe_rsc_trace(clone,
                      "Merging weights for %s: initial sort index for %s is %d",
                      clone->id, child->id, child->sort_index);
     }
     pe__show_node_weights(true, clone, "Before", clone->allowed_nodes,
                           clone->cluster);
 
     g_list_foreach(clone->children, add_sort_index_to_node_weight, clone);
     g_list_foreach(clone->rsc_cons, apply_coloc_to_dependent, clone);
     g_list_foreach(clone->rsc_cons_lhs, apply_coloc_to_primary, clone);
 
     // Ban resource from all nodes if it needs a ticket but doesn't have it
     pcmk__require_promotion_tickets(clone);
 
     pe__show_node_weights(true, clone, "After", clone->allowed_nodes,
                           clone->cluster);
 
     // Reset sort indexes to final node weights
     g_list_foreach(clone->children, set_sort_index_to_node_weight, clone);
 
     // Finally, sort instances in descending order of promotion priority
     clone->children = g_list_sort(clone->children, cmp_promotable_instance);
     pe__clear_resource_flags(clone, pe_rsc_merging);
 }
 
 /*!
  * \internal
  * \brief Find the active instance (if any) of an anonymous clone on a node
  *
  * \param[in] clone  Anonymous clone to check
  * \param[in] id     Instance ID (without instance number) to check
  * \param[in] node   Node to check
  *
  * \return
  */
 static pe_resource_t *
 find_active_anon_instance(pe_resource_t *clone, const char *id,
                           const pe_node_t *node)
 {
     for (GList *iter = clone->children; iter; iter = iter->next) {
         pe_resource_t *child = iter->data;
         pe_resource_t *active = NULL;
 
         // Use ->find_rsc() in case this is a cloned group
         active = clone->fns->find_rsc(child, id, node,
                                       pe_find_clone|pe_find_current);
         if (active != NULL) {
             return active;
         }
     }
     return NULL;
 }
 
 /*
  * \brief Check whether an anonymous clone instance is known on a node
  *
  * \param[in] clone  Anonymous clone to check
  * \param[in] id     Instance ID (without instance number) to check
  * \param[in] node   Node to check
  *
  * \return true if \p id instance of \p clone is known on \p node,
  *         otherwise false
  */
 static bool
 anonymous_known_on(const pe_resource_t *clone, const char *id,
                    const pe_node_t *node)
 {
     for (GList *iter = clone->children; iter; iter = iter->next) {
         pe_resource_t *child = iter->data;
 
         /* Use ->find_rsc() because this might be a cloned group, and knowing
          * that other members of the group are known here implies nothing.
          */
         child = clone->fns->find_rsc(child, id, NULL, pe_find_clone);
         CRM_LOG_ASSERT(child != NULL);
         if (child != NULL) {
             if (g_hash_table_lookup(child->known_on, node->details->id)) {
                 return true;
             }
         }
     }
     return false;
 }
 
 /*!
  * \internal
  * \brief Check whether a node is allowed to run a resource
  *
  * \param[in] rsc   Resource to check
  * \param[in] node  Node to check
  *
  * \return true if \p node is allowed to run \p rsc, otherwise false
  */
 static bool
 is_allowed(const pe_resource_t *rsc, const pe_node_t *node)
 {
     pe_node_t *allowed = pe_hash_table_lookup(rsc->allowed_nodes,
                                               node->details->id);
 
     return (allowed != NULL) && (allowed->weight >= 0);
 }
 
 /*!
  * \brief Check whether a clone instance's promotion score should be considered
  *
  * \param[in] rsc   Promotable clone instance to check
  * \param[in] node  Node where score would be applied
  *
  * \return true if \p rsc's promotion score should be considered on \p node,
  *         otherwise false
  */
 static bool
 promotion_score_applies(pe_resource_t *rsc, const pe_node_t *node)
 {
     char *id = clone_strip(rsc->id);
     pe_resource_t *parent = uber_parent(rsc);
     pe_resource_t *active = NULL;
     const char *reason = "allowed";
 
     // Some checks apply only to anonymous clone instances
     if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
 
         // If instance is active on the node, its score definitely applies
         active = find_active_anon_instance(parent, id, node);
         if (active == rsc) {
             reason = "active";
             goto check_allowed;
         }
 
         /* If *no* instance is active on this node, this instance's score will
          * count if it has been probed on this node.
          */
         if ((active == NULL) && anonymous_known_on(parent, id, node)) {
             reason = "probed";
             goto check_allowed;
         }
     }
 
     /* If this clone's status is unknown on *all* nodes (e.g. cluster startup),
      * take all instances' scores into account, to make sure we use any
      * permanent promotion scores.
      */
     if ((rsc->running_on == NULL) && (g_hash_table_size(rsc->known_on) == 0)) {
         reason = "none probed";
         goto check_allowed;
     }
 
     /* Otherwise, we've probed and/or started the resource *somewhere*, so
      * consider promotion scores on nodes where we know the status.
      */
     if ((pe_hash_table_lookup(rsc->known_on, node->details->id) != NULL)
         || (pe_find_node_id(rsc->running_on, node->details->id) != NULL)) {
         reason = "known";
     } else {
         pe_rsc_trace(rsc,
                      "Ignoring %s promotion score (for %s) on %s: not probed",
                      rsc->id, id, pe__node_name(node));
         free(id);
         return false;
     }
 
 check_allowed:
     if (is_allowed(rsc, node)) {
         pe_rsc_trace(rsc, "Counting %s promotion score (for %s) on %s: %s",
                      rsc->id, id, pe__node_name(node), reason);
         free(id);
         return true;
     }
 
     pe_rsc_trace(rsc, "Ignoring %s promotion score (for %s) on %s: not allowed",
                  rsc->id, id, pe__node_name(node));
     free(id);
     return false;
 }
 
 /*!
  * \internal
  * \brief Get the value of a promotion score node attribute
  *
  * \param[in] rsc   Promotable clone instance to get promotion score for
  * \param[in] node  Node to get promotion score for
  * \param[in] name  Resource name to use in promotion score attribute name
  *
  * \return Value of promotion score node attribute for \p rsc on \p node
  */
 static const char *
 promotion_attr_value(pe_resource_t *rsc, const pe_node_t *node,
                      const char *name)
 {
     char *attr_name = NULL;
     const char *attr_value = NULL;
 
     CRM_CHECK((rsc != NULL) && (node != NULL) && (name != NULL), return NULL);
 
     attr_name = pcmk_promotion_score_name(name);
     attr_value = pe_node_attribute_calculated(node, attr_name, rsc);
     free(attr_name);
     return attr_value;
 }
 
 /*!
  * \internal
  * \brief Get the promotion score for a clone instance on a node
  *
  * \param[in]  rsc         Promotable clone instance to get score for
  * \param[in]  node        Node to get score for
  * \param[out] is_default  If non-NULL, will be set true if no score available
  *
  * \return Promotion score for \p rsc on \p node (or 0 if none)
  */
 static int
 promotion_score(pe_resource_t *rsc, const pe_node_t *node, bool *is_default)
 {
     char *name = NULL;
     const char *attr_value = NULL;
 
     if (is_default != NULL) {
         *is_default = true;
     }
 
     CRM_CHECK((rsc != NULL) && (node != NULL), return 0);
 
     /* If this is an instance of a cloned group, the promotion score is the sum
      * of all members' promotion scores.
      */
     if (rsc->children != NULL) {
         int score = 0;
 
         for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
             pe_resource_t *child = (pe_resource_t *) iter->data;
             bool child_default = false;
             int child_score = promotion_score(child, node, &child_default);
 
             if (!child_default && (is_default != NULL)) {
                 *is_default = false;
             }
             score += child_score;
         }
         return score;
     }
 
     if (!promotion_score_applies(rsc, node)) {
         return 0;
     }
 
     /* For the promotion score attribute name, use the name the resource is
      * known as in resource history, since that's what crm_attribute --promotion
      * would have used.
      */
     name = (rsc->clone_name == NULL)? rsc->id : rsc->clone_name;
 
     attr_value = promotion_attr_value(rsc, node, name);
     if (attr_value != NULL) {
         pe_rsc_trace(rsc, "Promotion score for %s on %s = %s",
                      name, pe__node_name(node), pcmk__s(attr_value, "(unset)"));
     } else if (!pcmk_is_set(rsc->flags, pe_rsc_unique)) {
         /* If we don't have any resource history yet, we won't have clone_name.
          * In that case, for anonymous clones, try the resource name without
          * any instance number.
          */
         name = clone_strip(rsc->id);
         if (strcmp(rsc->id, name) != 0) {
             attr_value = promotion_attr_value(rsc, node, name);
             pe_rsc_trace(rsc, "Promotion score for %s on %s (for %s) = %s",
                          name, pe__node_name(node), rsc->id,
                          pcmk__s(attr_value, "(unset)"));
         }
         free(name);
     }
 
     if (attr_value == NULL) {
         return 0;
     }
 
     if (is_default != NULL) {
         *is_default = false;
     }
     return char2score(attr_value);
 }
 
 /*!
  * \internal
  * \brief Include promotion scores in instances' node weights and priorities
  *
  * \param[in] rsc  Promotable clone resource to update
  */
 void
 pcmk__add_promotion_scores(pe_resource_t *rsc)
 {
     if (pe__set_clone_flag(rsc, pe__clone_promotion_added) == pcmk_rc_already) {
         return;
     }
 
     for (GList *iter = rsc->children; iter != NULL; iter = iter->next) {
         pe_resource_t *child_rsc = (pe_resource_t *) iter->data;
 
         GHashTableIter iter;
         pe_node_t *node = NULL;
         int score, new_score;
 
         g_hash_table_iter_init(&iter, child_rsc->allowed_nodes);
         while (g_hash_table_iter_next(&iter, NULL, (void **) &node)) {
             if (!pcmk__node_available(node, false, false)) {
                 /* This node will never be promoted, so don't apply the
                  * promotion score, as that may lead to clone shuffling.
                  */
                 continue;
             }
 
             score = promotion_score(child_rsc, node, NULL);
             if (score > 0) {
                 new_score = pcmk__add_scores(node->weight, score);
                 if (new_score != node->weight) {
                     pe_rsc_trace(rsc,
                                  "Adding promotion score to preference "
                                  "for %s on %s (%d->%d)",
                                  child_rsc->id, pe__node_name(node),
                                  node->weight, new_score);
                     node->weight = new_score;
                 }
             }
 
             if (score > child_rsc->priority) {
                 pe_rsc_trace(rsc,
                              "Updating %s priority to promotion score (%d->%d)",
                              child_rsc->id, child_rsc->priority, score);
                 child_rsc->priority = score;
             }
         }
     }
 }
 
 /*!
  * \internal
  * \brief If a resource's current role is started, change it to unpromoted
  *
  * \param[in] data       Resource to update
  * \param[in] user_data  Ignored
  */
 static void
 set_current_role_unpromoted(void *data, void *user_data)
 {
     pe_resource_t *rsc = (pe_resource_t *) data;
 
     if (rsc->role == RSC_ROLE_STARTED) {
         // Promotable clones should use unpromoted role instead of started
         rsc->role = RSC_ROLE_UNPROMOTED;
     }
     g_list_foreach(rsc->children, set_current_role_unpromoted, NULL);
 }
 
 /*!
  * \internal
  * \brief Set a resource's next role to unpromoted (or stopped if unassigned)
  *
  * \param[in] data       Resource to update
  * \param[in] user_data  Ignored
  */
 static void
 set_next_role_unpromoted(void *data, void *user_data)
 {
     pe_resource_t *rsc = (pe_resource_t *) data;
     GList *assigned = NULL;
 
     rsc->fns->location(rsc, &assigned, FALSE);
     if (assigned == NULL) {
         pe__set_next_role(rsc, RSC_ROLE_STOPPED, "stopped instance");
     } else {
         pe__set_next_role(rsc, RSC_ROLE_UNPROMOTED, "unpromoted instance");
         g_list_free(assigned);
     }
     g_list_foreach(rsc->children, set_next_role_unpromoted, NULL);
 }
 
 /*!
  * \internal
  * \brief Set a resource's next role to promoted if not already set
  *
  * \param[in] data       Resource to update
  * \param[in] user_data  Ignored
  */
 static void
 set_next_role_promoted(void *data, gpointer user_data)
 {
     pe_resource_t *rsc = (pe_resource_t *) data;
 
     if (rsc->next_role == RSC_ROLE_UNKNOWN) {
         pe__set_next_role(rsc, RSC_ROLE_PROMOTED, "promoted instance");
     }
     g_list_foreach(rsc->children, set_next_role_promoted, NULL);
 }
 
 /*!
  * \internal
  * \brief Show instance's promotion score on node where it will be active
  *
  * \param[in] instance  Promotable clone instance to show
  */
 static void
 show_promotion_score(pe_resource_t *instance)
 {
     pe_node_t *chosen = instance->fns->location(instance, NULL, FALSE);
 
     if (pcmk_is_set(instance->cluster->flags, pe_flag_show_scores)
         && !pcmk__is_daemon && (instance->cluster->priv != NULL)) {
 
         pcmk__output_t *out = instance->cluster->priv;
 
         out->message(out, "promotion-score", instance, chosen,
                      pcmk_readable_score(instance->sort_index));
     } else {
-        pe_rsc_debug(uber_parent(instance),
+        pe_rsc_debug(pe__const_top_resource(instance, false),
                      "%s promotion score on %s: sort=%s priority=%s",
                      instance->id,
                      ((chosen == NULL)? "none" : pe__node_name(chosen)),
                      pcmk_readable_score(instance->sort_index),
                      pcmk_readable_score(instance->priority));
     }
 }
 
 /*!
  * \internal
  * \brief Set a clone instance's promotion priority
  *
  * \param[in] data       Promotable clone instance to update
  * \param[in] user_data  Instance's parent clone
  */
 static void
 set_instance_priority(gpointer data, gpointer user_data)
 {
     pe_resource_t *instance = (pe_resource_t *) data;
     pe_resource_t *clone = (pe_resource_t *) user_data;
     pe_node_t *chosen = NULL;
     enum rsc_role_e next_role = RSC_ROLE_UNKNOWN;
     GList *list = NULL;
 
     pe_rsc_trace(clone, "Assigning priority for %s: %s", instance->id,
                  role2text(instance->next_role));
 
     if (instance->fns->state(instance, TRUE) == RSC_ROLE_STARTED) {
         set_current_role_unpromoted(instance, NULL);
     }
 
     // Only an instance that will be active can be promoted
     chosen = instance->fns->location(instance, &list, FALSE);
     if (pcmk__list_of_multiple(list)) {
         pcmk__config_err("Cannot promote non-colocated child %s",
                          instance->id);
     }
     g_list_free(list);
     if (chosen == NULL) {
         return;
     }
 
     next_role = instance->fns->state(instance, FALSE);
     switch (next_role) {
         case RSC_ROLE_STARTED:
         case RSC_ROLE_UNKNOWN:
             // Set instance priority to its promotion score (or -1 if none)
             {
                 bool is_default = false;
 
                 instance->priority = promotion_score(instance, chosen,
                                                       &is_default);
                 if (is_default) {
                     /*
                      * Default to -1 if no value is set. This allows
                      * instances eligible for promotion to be specified
                      * based solely on rsc_location constraints, but
                      * prevents any instance from being promoted if neither
                      * a constraint nor a promotion score is present
                      */
                     instance->priority = -1;
                 }
             }
             break;
 
         case RSC_ROLE_UNPROMOTED:
         case RSC_ROLE_STOPPED:
             // Instance can't be promoted
             instance->priority = -INFINITY;
             break;
 
         case RSC_ROLE_PROMOTED:
             // Nothing needed (re-creating actions after scheduling fencing)
             break;
 
         default:
             CRM_CHECK(FALSE, crm_err("Unknown resource role %d for %s",
                                      next_role, instance->id));
     }
 
     // Add relevant location constraint scores for promoted role
     apply_promoted_locations(instance, instance->rsc_location, chosen);
     apply_promoted_locations(instance, clone->rsc_location, chosen);
 
     // Apply relevant colocations with promoted role
     for (GList *iter = instance->rsc_cons; iter != NULL; iter = iter->next) {
         pcmk__colocation_t *cons = (pcmk__colocation_t *) iter->data;
 
         instance->cmds->apply_coloc_score(instance, cons->primary, cons, true);
     }
 
     instance->sort_index = instance->priority;
     if (next_role == RSC_ROLE_PROMOTED) {
         instance->sort_index = INFINITY;
     }
     pe_rsc_trace(clone, "Assigning %s priority = %d",
                  instance->id, instance->priority);
 }
 
 /*!
  * \internal
  * \brief Set a promotable clone instance's role
  *
  * \param[in] data       Promotable clone instance to update
  * \param[in] user_data  Pointer to count of instances chosen for promotion
  */
 static void
 set_instance_role(gpointer data, gpointer user_data)
 {
     pe_resource_t *instance = (pe_resource_t *) data;
     int *count = (int *) user_data;
 
-    pe_resource_t *clone = uber_parent(instance);
+    const pe_resource_t *clone = pe__const_top_resource(instance, false);
     pe_node_t *chosen = NULL;
 
     show_promotion_score(instance);
 
     if (instance->sort_index < 0) {
         pe_rsc_trace(clone, "Not supposed to promote instance %s",
                      instance->id);
 
     } else if ((*count < pe__clone_promoted_max(instance))
                || !pcmk_is_set(clone->flags, pe_rsc_managed)) {
         chosen = node_to_be_promoted_on(instance);
     }
 
     if (chosen == NULL) {
         set_next_role_unpromoted(instance, NULL);
         return;
     }
 
     if ((instance->role < RSC_ROLE_PROMOTED)
         && !pcmk_is_set(instance->cluster->flags, pe_flag_have_quorum)
         && (instance->cluster->no_quorum_policy == no_quorum_freeze)) {
         crm_notice("Clone instance %s cannot be promoted without quorum",
                    instance->id);
         set_next_role_unpromoted(instance, NULL);
         return;
     }
 
     chosen->count++;
     pe_rsc_info(clone, "Choosing %s (%s) on %s for promotion",
                 instance->id, role2text(instance->role),
                 pe__node_name(chosen));
     set_next_role_promoted(instance, NULL);
     (*count)++;
 }
 
 /*!
  * \internal
  * \brief Set roles for all instances of a promotable clone
  *
  * \param[in] clone  Promotable clone resource to update
  */
 void
 pcmk__set_instance_roles(pe_resource_t *rsc)
 {
     int promoted = 0;
     GHashTableIter iter;
     pe_node_t *node = NULL;
 
     // Repurpose count to track the number of promoted instances allocated
     g_hash_table_iter_init(&iter, rsc->allowed_nodes);
     while (g_hash_table_iter_next(&iter, NULL, (void **)&node)) {
         node->count = 0;
     }
 
     // Set instances' promotion priorities and sort by highest priority first
     g_list_foreach(rsc->children, set_instance_priority, rsc);
     sort_promotable_instances(rsc);
 
     // Choose the first N eligible instances to be promoted
     g_list_foreach(rsc->children, set_instance_role, &promoted);
     pe_rsc_info(rsc, "%s: Promoted %d instances of a possible %d",
                 rsc->id, promoted, pe__clone_promoted_max(rsc));
 }
 
 /*!
  *
  * \internal
  * \brief Create actions for promotable clone instances
  *
  * \param[in]  clone          Promotable clone to create actions for
  * \param[out] any_promoting  Will be set true if any instance is promoting
  * \param[out] any_demoting   Will be set true if any instance is demoting
  */
 static void
 create_promotable_instance_actions(pe_resource_t *clone,
                                    bool *any_promoting, bool *any_demoting)
 {
     for (GList *iter = clone->children; iter != NULL; iter = iter->next) {
         pe_resource_t *instance = (pe_resource_t *) iter->data;
 
         instance->cmds->create_actions(instance);
         check_for_role_change(instance, any_demoting, any_promoting);
     }
 }
 
 /*!
  * \internal
  * \brief Reset each promotable instance's resource priority
  *
  * Reset the priority of each instance of a promotable clone to the clone's
  * priority (after promotion actions are scheduled, when instance priorities
  * were repurposed as promotion scores).
  *
  * \param[in] clone  Promotable clone to reset
  */
 static void
 reset_instance_priorities(pe_resource_t *clone)
 {
     for (GList *iter = clone->children; iter != NULL; iter = iter->next) {
         pe_resource_t *instance = (pe_resource_t *) iter->data;
 
         instance->priority = clone->priority;
     }
 }
 
 /*!
  * \internal
  * \brief Create actions specific to promotable clones
  *
  * \param[in] clone  Promotable clone to create actions for
  */
 void
 pcmk__create_promotable_actions(pe_resource_t *clone)
 {
     bool any_promoting = false;
     bool any_demoting = false;
 
     // Create actions for each clone instance individually
     create_promotable_instance_actions(clone, &any_promoting, &any_demoting);
 
     // Create pseudo-actions for clone as a whole
     pe__create_promotable_pseudo_ops(clone, any_promoting, any_demoting);
 
     // Undo our temporary repurposing of resource priority for instances
     reset_instance_priorities(clone);
 }
 
 /*!
  * \internal
  * \brief Create internal orderings for a promotable clone's instances
  *
  * \param[in] clone  Promotable clone instance to order
  */
 void
 pcmk__order_promotable_instances(pe_resource_t *clone)
 {
     pe_resource_t *previous = NULL; // Needed for ordered clones
 
     pcmk__promotable_restart_ordering(clone);
 
     for (GList *iter = clone->children; iter != NULL; iter = iter->next) {
         pe_resource_t *instance = (pe_resource_t *) iter->data;
 
         // Demote before promote
         pcmk__order_resource_actions(instance, RSC_DEMOTE,
                                      instance, RSC_PROMOTE,
                                      pe_order_optional);
 
         order_instance_promotion(clone, instance, previous);
         order_instance_demotion(clone, instance, previous);
         previous = instance;
     }
 }
 
 /*!
  * \internal
  * \brief Update dependent's allowed nodes for colocation with promotable
  *
  * \param[in,out] dependent     Dependent resource to update
  * \param[in]     primary_node  Node where an instance of the primary will be
  * \param[in]     colocation    Colocation constraint to apply
  */
 static void
 update_dependent_allowed_nodes(pe_resource_t *dependent,
                                const pe_node_t *primary_node,
                                const pcmk__colocation_t *colocation)
 {
     GHashTableIter iter;
     pe_node_t *node = NULL;
     const char *primary_value = NULL;
     const char *attr = NULL;
 
     if (colocation->score >= INFINITY) {
         return; // Colocation is mandatory, so allowed node scores don't matter
     }
 
     // Get value of primary's colocation node attribute
     attr = colocation->node_attribute;
     if (attr == NULL) {
         attr = CRM_ATTR_UNAME;
     }
     primary_value = pe_node_attribute_raw(primary_node, attr);
 
     pe_rsc_trace(colocation->primary,
                  "Applying %s (%s with %s on %s by %s @%d) to %s",
                  colocation->id, colocation->dependent->id,
                  colocation->primary->id, pe__node_name(primary_node), attr,
                  colocation->score, dependent->id);
 
     g_hash_table_iter_init(&iter, dependent->allowed_nodes);
     while (g_hash_table_iter_next(&iter, NULL, (void **) &node)) {
         const char *dependent_value = pe_node_attribute_raw(node, attr);
 
         if (pcmk__str_eq(primary_value, dependent_value, pcmk__str_casei)) {
             pe_rsc_trace(colocation->primary, "%s: %d + %d",
                          pe__node_name(node), node->weight, colocation->score);
             node->weight = pcmk__add_scores(node->weight, colocation->score);
         }
     }
 }
 
 /*!
  * \brief Update dependent for a colocation with a promotable clone
  *
  * \param[in]     primary     Primary resource in the colocation
  * \param[in,out] dependent   Dependent resource in the colocation
  * \param[in]     colocation  Colocation constraint to apply
  */
 void
 pcmk__update_dependent_with_promotable(const pe_resource_t *primary,
                                        pe_resource_t *dependent,
                                        const pcmk__colocation_t *colocation)
 {
     GList *affected_nodes = NULL;
 
     /* Build a list of all nodes where an instance of the primary will be, and
      * (for optional colocations) update the dependent's allowed node scores for
      * each one.
      */
     for (GList *iter = primary->children; iter != NULL; iter = iter->next) {
         pe_resource_t *instance = (pe_resource_t *) iter->data;
         pe_node_t *node = instance->fns->location(instance, NULL, FALSE);
 
         if (node == NULL) {
             continue;
         }
         if (instance->fns->state(instance, FALSE) == colocation->primary_role) {
             update_dependent_allowed_nodes(dependent, node, colocation);
             affected_nodes = g_list_prepend(affected_nodes, node);
         }
     }
 
     /* For mandatory colocations, add the primary's node weight to the
      * dependent's node weight for each affected node, and ban the dependent
      * from all other nodes.
      *
      * However, skip this for promoted-with-promoted colocations, otherwise
      * inactive dependent instances can't start (in the unpromoted role).
      */
     if ((colocation->score >= INFINITY)
         && ((colocation->dependent_role != RSC_ROLE_PROMOTED)
             || (colocation->primary_role != RSC_ROLE_PROMOTED))) {
 
         pe_rsc_trace(colocation->primary,
                      "Applying %s (mandatory %s with %s) to %s",
                      colocation->id, colocation->dependent->id,
                      colocation->primary->id, dependent->id);
         node_list_exclude(dependent->allowed_nodes, affected_nodes,
                           TRUE);
     }
     g_list_free(affected_nodes);
 }
 
 /*!
  * \internal
  * \brief Update dependent priority for colocation with promotable
  *
  * \param[in]     primary     Primary resource in the colocation
  * \param[in,out] dependent   Dependent resource in the colocation
  * \param[in]     colocation  Colocation constraint to apply
  */
 void
 pcmk__update_promotable_dependent_priority(const pe_resource_t *primary,
                                            pe_resource_t *dependent,
                                            const pcmk__colocation_t *colocation)
 {
     pe_resource_t *primary_instance = NULL;
 
     // Look for a primary instance where dependent will be
     primary_instance = find_compatible_child(dependent, primary,
                                              colocation->primary_role, FALSE);
 
     if (primary_instance != NULL) {
         // Add primary instance's priority to dependent's
         int new_priority = pcmk__add_scores(dependent->priority,
                                             colocation->score);
 
         pe_rsc_trace(colocation->primary,
                      "Applying %s (%s with %s) to %s priority (%s + %s = %s)",
                      colocation->id, colocation->dependent->id,
                      colocation->primary->id, dependent->id,
                      pcmk_readable_score(dependent->priority),
                      pcmk_readable_score(colocation->score),
                      pcmk_readable_score(new_priority));
         dependent->priority = new_priority;
 
     } else if (colocation->score >= INFINITY) {
         // Mandatory colocation, but primary won't be here
         pe_rsc_trace(colocation->primary,
                      "Applying %s (%s with %s) to %s: can't be promoted",
                      colocation->id, colocation->dependent->id,
                      colocation->primary->id, dependent->id);
         dependent->priority = -INFINITY;
     }
 }