diff --git a/daemons/execd/pacemaker-execd.c b/daemons/execd/pacemaker-execd.c
index 1731ceedaa..e7e30eb009 100644
--- a/daemons/execd/pacemaker-execd.c
+++ b/daemons/execd/pacemaker-execd.c
@@ -1,584 +1,584 @@
 /*
  * Copyright 2012-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <glib.h>
 #include <signal.h>
 #include <sys/types.h>
 
 #include <crm/crm.h>
 #include <crm/msg_xml.h>
 #include <crm/services.h>
 #include <crm/common/cmdline_internal.h>
 #include <crm/common/ipc.h>
 #include <crm/common/ipc_internal.h>
 #include <crm/common/mainloop.h>
 #include <crm/common/output_internal.h>
 #include <crm/common/remote_internal.h>
 #include <crm/lrmd_internal.h>
 
 #include "pacemaker-execd.h"
 
 #ifdef PCMK__COMPILE_REMOTE
 #  define EXECD_TYPE "remote"
 #  define EXECD_NAME "pacemaker-remoted"
 #  define SUMMARY "resource agent executor daemon for Pacemaker Remote nodes"
 #else
 #  define EXECD_TYPE "local"
 #  define EXECD_NAME "pacemaker-execd"
 #  define SUMMARY "resource agent executor daemon for Pacemaker cluster nodes"
 #endif
 
 static GMainLoop *mainloop = NULL;
 static qb_ipcs_service_t *ipcs = NULL;
 static stonith_t *stonith_api = NULL;
 int lrmd_call_id = 0;
 time_t start_time;
 
 static struct {
     gchar **log_files;
 #ifdef PCMK__COMPILE_REMOTE
     gchar *port;
 #endif  // PCMK__COMPILE_REMOTE
 } options;
 
 #ifdef PCMK__COMPILE_REMOTE
 /* whether shutdown request has been sent */
 static gboolean shutting_down = FALSE;
 
 /* timer for waiting for acknowledgment of shutdown request */
 static guint shutdown_ack_timer = 0;
 
 static gboolean lrmd_exit(gpointer data);
 #endif
 
 static void
 stonith_connection_destroy_cb(stonith_t * st, stonith_event_t * e)
 {
     stonith_api->state = stonith_disconnected;
     stonith_connection_failed();
 }
 
 stonith_t *
 get_stonith_connection(void)
 {
     if (stonith_api && stonith_api->state == stonith_disconnected) {
         stonith_api_delete(stonith_api);
         stonith_api = NULL;
     }
 
     if (stonith_api == NULL) {
         int rc = pcmk_ok;
 
         stonith_api = stonith_api_new();
         if (stonith_api == NULL) {
             crm_err("Could not connect to fencer: API memory allocation failed");
             return NULL;
         }
         rc = stonith_api_connect_retry(stonith_api, crm_system_name, 10);
         if (rc != pcmk_ok) {
             crm_err("Could not connect to fencer in 10 attempts: %s "
                     CRM_XS " rc=%d", pcmk_strerror(rc), rc);
             stonith_api_delete(stonith_api);
             stonith_api = NULL;
         } else {
             stonith_api->cmds->register_notification(stonith_api,
                                                      T_STONITH_NOTIFY_DISCONNECT,
                                                      stonith_connection_destroy_cb);
         }
     }
     return stonith_api;
 }
 
 static int32_t
 lrmd_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid)
 {
     crm_trace("Connection %p", c);
     if (pcmk__new_client(c, uid, gid) == NULL) {
         return -EIO;
     }
     return 0;
 }
 
 static void
 lrmd_ipc_created(qb_ipcs_connection_t * c)
 {
     pcmk__client_t *new_client = pcmk__find_client(c);
 
     crm_trace("Connection %p", c);
     CRM_ASSERT(new_client != NULL);
     /* Now that the connection is offically established, alert
      * the other clients a new connection exists. */
 
     notify_of_new_client(new_client);
 }
 
 static int32_t
 lrmd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size)
 {
     uint32_t id = 0;
     uint32_t flags = 0;
     pcmk__client_t *client = pcmk__find_client(c);
     xmlNode *request = pcmk__client_data2xml(client, data, &id, &flags);
 
     CRM_CHECK(client != NULL, crm_err("Invalid client");
               return FALSE);
     CRM_CHECK(client->id != NULL, crm_err("Invalid client: %p", client);
               return FALSE);
 
     CRM_CHECK(flags & crm_ipc_client_response, crm_err("Invalid client request: %p", client);
               return FALSE);
 
     if (!request) {
         return 0;
     }
 
     if (!client->name) {
         const char *value = crm_element_value(request, F_LRMD_CLIENTNAME);
 
         if (value == NULL) {
             client->name = pcmk__itoa(pcmk__client_pid(c));
         } else {
             client->name = strdup(value);
         }
     }
 
     lrmd_call_id++;
     if (lrmd_call_id < 1) {
         lrmd_call_id = 1;
     }
 
     crm_xml_add(request, F_LRMD_CLIENTID, client->id);
     crm_xml_add(request, F_LRMD_CLIENTNAME, client->name);
     crm_xml_add_int(request, F_LRMD_CALLID, lrmd_call_id);
 
     process_lrmd_message(client, id, request);
 
     free_xml(request);
     return 0;
 }
 
 /*!
  * \internal
  * \brief Free a client connection, and exit if appropriate
  *
  * \param[in,out] client  Client connection to free
  */
 void
 lrmd_client_destroy(pcmk__client_t *client)
 {
     pcmk__free_client(client);
 
 #ifdef PCMK__COMPILE_REMOTE
     /* If we were waiting to shut down, we can now safely do so
      * if there are no more proxied IPC providers
      */
     if (shutting_down && (ipc_proxy_get_provider() == NULL)) {
         lrmd_exit(NULL);
     }
 #endif
 }
 
 static int32_t
 lrmd_ipc_closed(qb_ipcs_connection_t * c)
 {
     pcmk__client_t *client = pcmk__find_client(c);
 
     if (client == NULL) {
         return 0;
     }
 
     crm_trace("Connection %p", c);
     client_disconnect_cleanup(client->id);
 #ifdef PCMK__COMPILE_REMOTE
     ipc_proxy_remove_provider(client);
 #endif
     lrmd_client_destroy(client);
     return 0;
 }
 
 static void
 lrmd_ipc_destroy(qb_ipcs_connection_t * c)
 {
     lrmd_ipc_closed(c);
     crm_trace("Connection %p", c);
 }
 
 static struct qb_ipcs_service_handlers lrmd_ipc_callbacks = {
     .connection_accept = lrmd_ipc_accept,
     .connection_created = lrmd_ipc_created,
     .msg_process = lrmd_ipc_dispatch,
     .connection_closed = lrmd_ipc_closed,
     .connection_destroyed = lrmd_ipc_destroy
 };
 
 // \return Standard Pacemaker return code
 int
 lrmd_server_send_reply(pcmk__client_t *client, uint32_t id, xmlNode *reply)
 {
     crm_trace("Sending reply (%d) to client (%s)", id, client->id);
     switch (PCMK__CLIENT_TYPE(client)) {
         case pcmk__client_ipc:
             return pcmk__ipc_send_xml(client, id, reply, FALSE);
 #ifdef PCMK__COMPILE_REMOTE
         case pcmk__client_tls:
             return lrmd__remote_send_xml(client->remote, reply, id, "reply");
 #endif
         default:
             crm_err("Could not send reply: unknown type for client %s "
                     CRM_XS " flags=%#llx",
                     pcmk__client_name(client), client->flags);
     }
     return ENOTCONN;
 }
 
 // \return Standard Pacemaker return code
 int
 lrmd_server_send_notify(pcmk__client_t *client, xmlNode *msg)
 {
     crm_trace("Sending notification to client (%s)", client->id);
     switch (PCMK__CLIENT_TYPE(client)) {
         case pcmk__client_ipc:
             if (client->ipcs == NULL) {
                 crm_trace("Could not notify local client: disconnected");
                 return ENOTCONN;
             }
             return pcmk__ipc_send_xml(client, 0, msg, crm_ipc_server_event);
 #ifdef PCMK__COMPILE_REMOTE
         case pcmk__client_tls:
             if (client->remote == NULL) {
                 crm_trace("Could not notify remote client: disconnected");
                 return ENOTCONN;
             } else {
                 return lrmd__remote_send_xml(client->remote, msg, 0, "notify");
             }
 #endif
         default:
             crm_err("Could not notify client %s with unknown transport "
                     CRM_XS " flags=%#llx",
                     pcmk__client_name(client), client->flags);
     }
     return ENOTCONN;
 }
 
 /*!
  * \internal
  * \brief Clean up and exit immediately
  *
  * \param[in] data  Ignored
  *
  * \return Doesn't return
  * \note   This can be used as a timer callback.
  */
 static gboolean
 lrmd_exit(gpointer data)
 {
     crm_info("Terminating with %d clients", pcmk__ipc_client_count());
     if (stonith_api) {
         stonith_api->cmds->remove_notification(stonith_api, T_STONITH_NOTIFY_DISCONNECT);
         stonith_api->cmds->disconnect(stonith_api);
         stonith_api_delete(stonith_api);
     }
     if (ipcs) {
         mainloop_del_ipc_server(ipcs);
     }
 
 #ifdef PCMK__COMPILE_REMOTE
     execd_stop_tls_server();
     ipc_proxy_cleanup();
 #endif
 
     pcmk__client_cleanup();
     g_hash_table_destroy(rsc_list);
 
     if (mainloop) {
         lrmd_drain_alerts(mainloop);
     }
 
     crm_exit(CRM_EX_OK);
     return FALSE;
 }
 
 /*!
  * \internal
  * \brief Request cluster shutdown if appropriate, otherwise exit immediately
  *
  * \param[in] nsig  Signal that caused invocation (ignored)
  */
 static void
 lrmd_shutdown(int nsig)
 {
 #ifdef PCMK__COMPILE_REMOTE
     pcmk__client_t *ipc_proxy = ipc_proxy_get_provider();
 
     /* If there are active proxied IPC providers, then we may be running
      * resources, so notify the cluster that we wish to shut down.
      */
     if (ipc_proxy) {
         if (shutting_down) {
             crm_notice("Waiting for cluster to stop resources before exiting");
             return;
         }
 
         crm_info("Sending shutdown request to cluster");
         if (ipc_proxy_shutdown_req(ipc_proxy) < 0) {
             crm_crit("Shutdown request failed, exiting immediately");
 
         } else {
             /* We requested a shutdown. Now, we need to wait for an
              * acknowledgement from the proxy host (which ensures the proxy host
              * supports shutdown requests), then wait for all proxy hosts to
              * disconnect (which ensures that all resources have been stopped).
              */
             shutting_down = TRUE;
 
             /* Stop accepting new proxy connections */
             execd_stop_tls_server();
 
             /* Older controller versions will never acknowledge our request, so
              * set a fairly short timeout to exit quickly in that case. If we
              * get the ack, we'll defuse this timer.
              */
             shutdown_ack_timer = g_timeout_add_seconds(20, lrmd_exit, NULL);
 
             /* Currently, we let the OS kill us if the clients don't disconnect
              * in a reasonable time. We could instead set a long timer here
              * (shorter than what the OS is likely to use) and exit immediately
              * if it pops.
              */
             return;
         }
     }
 #endif
     lrmd_exit(NULL);
 }
 
 /*!
  * \internal
  * \brief Defuse short exit timer if shutting down
  */
 void
 handle_shutdown_ack(void)
 {
 #ifdef PCMK__COMPILE_REMOTE
     if (shutting_down) {
         crm_info("Received shutdown ack");
         if (shutdown_ack_timer > 0) {
             g_source_remove(shutdown_ack_timer);
             shutdown_ack_timer = 0;
         }
         return;
     }
 #endif
     crm_debug("Ignoring unexpected shutdown ack");
 }
 
 /*!
  * \internal
  * \brief Make short exit timer fire immediately
  */
 void
 handle_shutdown_nack(void)
 {
 #ifdef PCMK__COMPILE_REMOTE
     if (shutting_down) {
         crm_info("Received shutdown nack");
         if (shutdown_ack_timer > 0) {
             g_source_remove(shutdown_ack_timer);
             shutdown_ack_timer = g_timeout_add(0, lrmd_exit, NULL);
         }
         return;
     }
 #endif
     crm_debug("Ignoring unexpected shutdown nack");
 }
 
 static GOptionEntry entries[] = {
     { "logfile", 'l', G_OPTION_FLAG_NONE, G_OPTION_ARG_FILENAME_ARRAY,
       &options.log_files, "Send logs to the additional named logfile", NULL },
 
 #ifdef PCMK__COMPILE_REMOTE
 
     { "port", 'p', G_OPTION_FLAG_NONE, G_OPTION_ARG_STRING, &options.port,
       "Port to listen on (defaults to " G_STRINGIFY(DEFAULT_REMOTE_PORT) ")", NULL },
 #endif  // PCMK__COMPILE_REMOTE
 
     { NULL }
 };
 
 static pcmk__supported_format_t formats[] = {
     PCMK__SUPPORTED_FORMAT_NONE,
     PCMK__SUPPORTED_FORMAT_TEXT,
     PCMK__SUPPORTED_FORMAT_XML,
     { NULL, NULL, NULL }
 };
 
 static GOptionContext *
 build_arg_context(pcmk__common_args_t *args, GOptionGroup **group)
 {
     GOptionContext *context = NULL;
 
     context = pcmk__build_arg_context(args, "text (default), xml", group, NULL);
     pcmk__add_main_args(context, entries);
     return context;
 }
 
 int
 main(int argc, char **argv, char **envp)
 {
     int rc = pcmk_rc_ok;
     crm_exit_t exit_code = CRM_EX_OK;
 
     const char *option = NULL;
 
     pcmk__output_t *out = NULL;
 
     GError *error = NULL;
 
     GOptionGroup *output_group = NULL;
     pcmk__common_args_t *args = pcmk__new_common_args(SUMMARY);
 #ifdef PCMK__COMPILE_REMOTE
     gchar **processed_args = pcmk__cmdline_preproc(argv, "lp");
 #else
     gchar **processed_args = pcmk__cmdline_preproc(argv, "l");
 #endif  // PCMK__COMPILE_REMOTE
     GOptionContext *context = build_arg_context(args, &output_group);
 
 #ifdef PCMK__COMPILE_REMOTE
     // If necessary, create PID 1 now before any file descriptors are opened
     remoted_spawn_pidone(argc, argv, envp);
 #endif
 
     crm_log_preinit(EXECD_NAME, argc, argv);
 
     pcmk__register_formats(output_group, formats);
     if (!g_option_context_parse_strv(context, &processed_args, &error)) {
         exit_code = CRM_EX_USAGE;
         goto done;
     }
 
     rc = pcmk__output_new(&out, args->output_ty, args->output_dest, argv);
     if (rc != pcmk_rc_ok) {
         exit_code = CRM_EX_ERROR;
         g_set_error(&error, PCMK__EXITC_ERROR, exit_code,
                     "Error creating output format %s: %s",
                     args->output_ty, pcmk_rc_str(rc));
         goto done;
     }
 
     if (args->version) {
         out->version(out, false);
         goto done;
     }
 
     // Open additional log files
     if (options.log_files != NULL) {
         for (gchar **fname = options.log_files; *fname != NULL; fname++) {
             rc = pcmk__add_logfile(*fname);
 
             if (rc != pcmk_rc_ok) {
                 out->err(out, "Logging to %s is disabled: %s",
                          *fname, pcmk_rc_str(rc));
             }
         }
     }
 
     pcmk__cli_init_logging(EXECD_NAME, args->verbosity);
     crm_log_init(NULL, LOG_INFO, TRUE, FALSE, argc, argv, FALSE);
 
     // ocf_log() (in resource-agents) uses the capitalized env options below
     option = pcmk__env_option(PCMK__ENV_LOGFACILITY);
     if (!pcmk__str_eq(option, PCMK__VALUE_NONE,
                       pcmk__str_casei|pcmk__str_null_matches)
         && !pcmk__str_eq(option, "/dev/null", pcmk__str_none)) {
 
         pcmk__set_env_option("LOGFACILITY", option, true);
     }
 
     option = pcmk__env_option(PCMK__ENV_LOGFILE);
     if (!pcmk__str_eq(option, PCMK__VALUE_NONE,
                       pcmk__str_casei|pcmk__str_null_matches)) {
         pcmk__set_env_option("LOGFILE", option, true);
 
         if (pcmk__env_option_enabled(crm_system_name, PCMK__ENV_DEBUG)) {
             pcmk__set_env_option("DEBUGLOG", option, true);
         }
     }
 
 #ifdef PCMK__COMPILE_REMOTE
     if (options.port != NULL) {
-        setenv("PCMK_remote_port", options.port, 1);
+        pcmk__set_env_option(PCMK__ENV_REMOTE_PORT, options.port, false);
     }
 #endif  // PCMK__COMPILE_REMOTE
 
     start_time = time(NULL);
 
     crm_notice("Starting Pacemaker " EXECD_TYPE " executor");
 
     /* The presence of this variable allegedly controls whether child
      * processes like httpd will try and use Systemd's sd_notify
      * API
      */
     unsetenv("NOTIFY_SOCKET");
 
     {
         // Temporary directory for resource agent use (leave owned by root)
         int rc = pcmk__build_path(CRM_RSCTMP_DIR, 0755);
 
         if (rc != pcmk_rc_ok) {
             crm_warn("Could not create resource agent temporary directory "
                      CRM_RSCTMP_DIR ": %s", pcmk_rc_str(rc));
         }
     }
 
     rsc_list = pcmk__strkey_table(NULL, free_rsc);
     ipcs = mainloop_add_ipc_server(CRM_SYSTEM_LRMD, QB_IPC_SHM, &lrmd_ipc_callbacks);
     if (ipcs == NULL) {
         crm_err("Failed to create IPC server: shutting down and inhibiting respawn");
         exit_code = CRM_EX_FATAL;
         goto done;
     }
 
 #ifdef PCMK__COMPILE_REMOTE
     if (lrmd_init_remote_tls_server() < 0) {
         crm_err("Failed to create TLS listener: shutting down and staying down");
         exit_code = CRM_EX_FATAL;
         goto done;
     }
     ipc_proxy_init();
 #endif
 
     mainloop_add_signal(SIGTERM, lrmd_shutdown);
     mainloop = g_main_loop_new(NULL, FALSE);
     crm_notice("Pacemaker " EXECD_TYPE " executor successfully started and accepting connections");
     crm_notice("OCF resource agent search path is %s", OCF_RA_PATH);
     g_main_loop_run(mainloop);
 
     /* should never get here */
     lrmd_exit(NULL);
 
 done:
     g_strfreev(options.log_files);
 #ifdef PCMK__COMPILE_REMOTE
     g_free(options.port);
 #endif  // PCMK__COMPILE_REMOTE
 
     g_strfreev(processed_args);
     pcmk__free_arg_context(context);
 
     pcmk__output_and_clear_error(&error, out);
 
     if (out != NULL) {
         out->finish(out, exit_code, true, NULL);
         pcmk__output_free(out);
     }
     pcmk__unregister_formats();
     crm_exit(exit_code);
 }
diff --git a/include/crm/common/options_internal.h b/include/crm/common/options_internal.h
index 884c8edd99..c59ec9506d 100644
--- a/include/crm/common/options_internal.h
+++ b/include/crm/common/options_internal.h
@@ -1,122 +1,123 @@
 /*
  * Copyright 2006-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #ifndef PCMK__OPTIONS_INTERNAL__H
 #  define PCMK__OPTIONS_INTERNAL__H
 
 #  ifndef PCMK__CONFIG_H
 #    define PCMK__CONFIG_H
 #    include <config.h>   // _Noreturn
 #  endif
 
 #  include <glib.h>     // GHashTable
 #  include <stdbool.h>  // bool
 
 _Noreturn void pcmk__cli_help(char cmd);
 
 
 /*
  * Environment variable option handling
  */
 
 const char *pcmk__env_option(const char *option);
 void pcmk__set_env_option(const char *option, const char *value, bool compat);
 bool pcmk__env_option_enabled(const char *daemon, const char *option);
 
 
 /*
  * Cluster option handling
  */
 
 typedef struct pcmk__cluster_option_s {
     const char *name;
     const char *alt_name;
     const char *type;
     const char *values;
     const char *default_value;
 
     bool (*is_valid)(const char *);
 
     const char *description_short;
     const char *description_long;
 
 } pcmk__cluster_option_t;
 
 const char *pcmk__cluster_option(GHashTable *options,
                                  const pcmk__cluster_option_t *option_list,
                                  int len, const char *name);
 
 gchar *pcmk__format_option_metadata(const char *name, const char *desc_short,
                                     const char *desc_long,
                                     pcmk__cluster_option_t *option_list,
                                     int len);
 
 void pcmk__validate_cluster_options(GHashTable *options,
                                     pcmk__cluster_option_t *option_list,
                                     int len);
 
 bool pcmk__valid_interval_spec(const char *value);
 bool pcmk__valid_boolean(const char *value);
 bool pcmk__valid_number(const char *value);
 bool pcmk__valid_positive_number(const char *value);
 bool pcmk__valid_quorum(const char *value);
 bool pcmk__valid_script(const char *value);
 bool pcmk__valid_percentage(const char *value);
 
 // from watchdog.c
 long pcmk__get_sbd_timeout(void);
 bool pcmk__get_sbd_sync_resource_startup(void);
 long pcmk__auto_watchdog_timeout(void);
 bool pcmk__valid_sbd_timeout(const char *value);
 
 // Constants for environment variable names
 #define PCMK__ENV_BLACKBOX                  "blackbox"
 #define PCMK__ENV_CLUSTER_TYPE              "cluster_type"
 #define PCMK__ENV_DEBUG                     "debug"
 #define PCMK__ENV_LOGFACILITY               "logfacility"
 #define PCMK__ENV_LOGFILE                   "logfile"
 #define PCMK__ENV_LOGPRIORITY               "logpriority"
 #define PCMK__ENV_NODE_START_STATE          "node_start_state"
 #define PCMK__ENV_PHYSICAL_HOST             "physical_host"
+#define PCMK__ENV_REMOTE_PORT               "remote_port"
 #define PCMK__ENV_SHUTDOWN_DELAY            "shutdown_delay"
 #define PCMK__ENV_STDERR                    "stderr"
 
 // @COMPAT Drop at 3.0.0; likely last used in 1.1.24
 #define PCMK__ENV_MCP                       "mcp"
 
 // @COMPAT Drop at 3.0.0; added unused in 1.1.9
 #define PCMK__ENV_QUORUM_TYPE               "quorum_type"
 
 // Constants for cluster option names
 #define PCMK__OPT_NODE_HEALTH_BASE          "node-health-base"
 #define PCMK__OPT_NODE_HEALTH_GREEN         "node-health-green"
 #define PCMK__OPT_NODE_HEALTH_RED           "node-health-red"
 #define PCMK__OPT_NODE_HEALTH_STRATEGY      "node-health-strategy"
 #define PCMK__OPT_NODE_HEALTH_YELLOW        "node-health-yellow"
 
 // Constants for meta-attribute names
 #define PCMK__META_ALLOW_UNHEALTHY_NODES    "allow-unhealthy-nodes"
 
 // Constants for enumerated values for various options
 #define PCMK__VALUE_CLUSTER                 "cluster"
 #define PCMK__VALUE_CUSTOM                  "custom"
 #define PCMK__VALUE_FENCING                 "fencing"
 #define PCMK__VALUE_GREEN                   "green"
 #define PCMK__VALUE_LOCAL                   "local"
 #define PCMK__VALUE_MIGRATE_ON_RED          "migrate-on-red"
 #define PCMK__VALUE_NONE                    "none"
 #define PCMK__VALUE_NOTHING                 "nothing"
 #define PCMK__VALUE_ONLY_GREEN              "only-green"
 #define PCMK__VALUE_PROGRESSIVE             "progressive"
 #define PCMK__VALUE_QUORUM                  "quorum"
 #define PCMK__VALUE_RED                     "red"
 #define PCMK__VALUE_UNFENCING               "unfencing"
 #define PCMK__VALUE_YELLOW                  "yellow"
 
 #endif // PCMK__OPTIONS_INTERNAL__H
diff --git a/lib/common/remote.c b/lib/common/remote.c
index 17ae646994..ec6ef7cb7f 100644
--- a/lib/common/remote.c
+++ b/lib/common/remote.c
@@ -1,1274 +1,1275 @@
 /*
  * Copyright 2008-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 #include <crm/crm.h>
 
 #include <sys/param.h>
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <netinet/ip.h>
 #include <netinet/tcp.h>
 #include <netdb.h>
 #include <stdlib.h>
 #include <errno.h>
 #include <inttypes.h>   // PRIx32
 
 #include <glib.h>
 #include <bzlib.h>
 
 #include <crm/common/ipc_internal.h>
 #include <crm/common/xml.h>
 #include <crm/common/mainloop.h>
 #include <crm/common/remote_internal.h>
 
 #ifdef HAVE_GNUTLS_GNUTLS_H
 #  include <gnutls/gnutls.h>
 #endif
 
 /* Swab macros from linux/swab.h */
 #ifdef HAVE_LINUX_SWAB_H
 #  include <linux/swab.h>
 #else
 /*
  * casts are necessary for constants, because we never know how for sure
  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
  */
 #define __swab16(x) ((uint16_t)(                                      \
         (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) |                  \
         (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
 
 #define __swab32(x) ((uint32_t)(                                      \
         (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) |            \
         (((uint32_t)(x) & (uint32_t)0x0000ff00UL) <<  8) |            \
         (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >>  8) |            \
         (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
 
 #define __swab64(x) ((uint64_t)(                                      \
         (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) |   \
         (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) |   \
         (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) |   \
         (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) <<  8) |   \
         (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >>  8) |   \
         (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) |   \
         (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) |   \
         (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
 #endif
 
 #define REMOTE_MSG_VERSION 1
 #define ENDIAN_LOCAL 0xBADADBBD
 
 struct remote_header_v0 {
     uint32_t endian;    /* Detect messages from hosts with different endian-ness */
     uint32_t version;
     uint64_t id;
     uint64_t flags;
     uint32_t size_total;
     uint32_t payload_offset;
     uint32_t payload_compressed;
     uint32_t payload_uncompressed;
 
         /* New fields get added here */
 
 } __attribute__ ((packed));
 
 /*!
  * \internal
  * \brief Retrieve remote message header, in local endianness
  *
  * Return a pointer to the header portion of a remote connection's message
  * buffer, converting the header to local endianness if needed.
  *
  * \param[in,out] remote  Remote connection with new message
  *
  * \return Pointer to message header, localized if necessary
  */
 static struct remote_header_v0 *
 localized_remote_header(pcmk__remote_t *remote)
 {
     struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
     if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
         return NULL;
 
     } else if(header->endian != ENDIAN_LOCAL) {
         uint32_t endian = __swab32(header->endian);
 
         CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
         if(endian != ENDIAN_LOCAL) {
             crm_err("Invalid message detected, endian mismatch: %" PRIx32
                     " is neither %" PRIx32 " nor the swab'd %" PRIx32,
                     ENDIAN_LOCAL, header->endian, endian);
             return NULL;
         }
 
         header->id = __swab64(header->id);
         header->flags = __swab64(header->flags);
         header->endian = __swab32(header->endian);
 
         header->version = __swab32(header->version);
         header->size_total = __swab32(header->size_total);
         header->payload_offset = __swab32(header->payload_offset);
         header->payload_compressed = __swab32(header->payload_compressed);
         header->payload_uncompressed = __swab32(header->payload_uncompressed);
     }
 
     return header;
 }
 
 #ifdef HAVE_GNUTLS_GNUTLS_H
 
 int
 pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
 {
     int rc = 0;
     int pollrc = 0;
     time_t time_limit = time(NULL) + timeout_ms / 1000;
 
     do {
         rc = gnutls_handshake(*remote->tls_session);
         if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
             pollrc = pcmk__remote_ready(remote, 1000);
             if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
                 /* poll returned error, there is no hope */
                 crm_trace("TLS handshake poll failed: %s (%d)",
                           pcmk_strerror(pollrc), pollrc);
                 return pcmk_legacy2rc(pollrc);
             }
         } else if (rc < 0) {
             crm_trace("TLS handshake failed: %s (%d)",
                       gnutls_strerror(rc), rc);
             return EPROTO;
         } else {
             return pcmk_rc_ok;
         }
     } while (time(NULL) < time_limit);
     return ETIME;
 }
 
 /*!
  * \internal
  * \brief Set minimum prime size required by TLS client
  *
  * \param[in] session  TLS session to affect
  */
 static void
 set_minimum_dh_bits(const gnutls_session_t *session)
 {
     int dh_min_bits;
 
     pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0);
 
     /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
      * the priority string imply the DH requirements, but this is the only
      * way to give the user control over compatibility with older servers.
      */
     if (dh_min_bits > 0) {
         crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
                  dh_min_bits);
         gnutls_dh_set_prime_bits(*session, dh_min_bits);
     }
 }
 
 static unsigned int
 get_bound_dh_bits(unsigned int dh_bits)
 {
     int dh_min_bits;
     int dh_max_bits;
 
     pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0);
     pcmk__scan_min_int(getenv("PCMK_dh_max_bits"), &dh_max_bits, 0);
     if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
         crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
         dh_max_bits = 0;
     }
     if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
         return dh_min_bits;
     }
     if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
         return dh_max_bits;
     }
     return dh_bits;
 }
 
 /*!
  * \internal
  * \brief Initialize a new TLS session
  *
  * \param[in] csock       Connected socket for TLS session
  * \param[in] conn_type   GNUTLS_SERVER or GNUTLS_CLIENT
  * \param[in] cred_type   GNUTLS_CRD_ANON or GNUTLS_CRD_PSK
  * \param[in] credentials TLS session credentials
  *
  * \return Pointer to newly created session object, or NULL on error
  */
 gnutls_session_t *
 pcmk__new_tls_session(int csock, unsigned int conn_type,
                       gnutls_credentials_type_t cred_type, void *credentials)
 {
     int rc = GNUTLS_E_SUCCESS;
     const char *prio_base = NULL;
     char *prio = NULL;
     gnutls_session_t *session = NULL;
 
     /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
      * values required for its functionality.
      *
      * For an example of anonymous authentication, see:
      * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
      */
 
     prio_base = getenv("PCMK_tls_priorities");
     if (prio_base == NULL) {
         prio_base = PCMK_GNUTLS_PRIORITIES;
     }
     prio = crm_strdup_printf("%s:%s", prio_base,
                              (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
 
     session = gnutls_malloc(sizeof(gnutls_session_t));
     if (session == NULL) {
         rc = GNUTLS_E_MEMORY_ERROR;
         goto error;
     }
 
     rc = gnutls_init(session, conn_type);
     if (rc != GNUTLS_E_SUCCESS) {
         goto error;
     }
 
     /* @TODO On the server side, it would be more efficient to cache the
      * priority with gnutls_priority_init2() and set it with
      * gnutls_priority_set() for all sessions.
      */
     rc = gnutls_priority_set_direct(*session, prio, NULL);
     if (rc != GNUTLS_E_SUCCESS) {
         goto error;
     }
     if (conn_type == GNUTLS_CLIENT) {
         set_minimum_dh_bits(session);
     }
 
     gnutls_transport_set_ptr(*session,
                              (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
 
     rc = gnutls_credentials_set(*session, cred_type, credentials);
     if (rc != GNUTLS_E_SUCCESS) {
         goto error;
     }
     free(prio);
     return session;
 
 error:
     crm_err("Could not initialize %s TLS %s session: %s "
             CRM_XS " rc=%d priority='%s'",
             (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
             (conn_type == GNUTLS_SERVER)? "server" : "client",
             gnutls_strerror(rc), rc, prio);
     free(prio);
     if (session != NULL) {
         gnutls_free(session);
     }
     return NULL;
 }
 
 /*!
  * \internal
  * \brief Initialize Diffie-Hellman parameters for a TLS server
  *
  * \param[out] dh_params  Parameter object to initialize
  *
  * \return Standard Pacemaker return code
  * \todo The current best practice is to allow the client and server to
  *       negotiate the Diffie-Hellman parameters via a TLS extension (RFC 7919).
  *       However, we have to support both older versions of GnuTLS (<3.6) that
  *       don't support the extension on our side, and older Pacemaker versions
  *       that don't support the extension on the other side. The next best
  *       practice would be to use a known good prime (see RFC 5114 section 2.2),
  *       possibly stored in a file distributed with Pacemaker.
  */
 int
 pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
 {
     int rc = GNUTLS_E_SUCCESS;
     unsigned int dh_bits = 0;
 
     rc = gnutls_dh_params_init(dh_params);
     if (rc != GNUTLS_E_SUCCESS) {
         goto error;
     }
 
     dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
                                           GNUTLS_SEC_PARAM_NORMAL);
     if (dh_bits == 0) {
         rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
         goto error;
     }
     dh_bits = get_bound_dh_bits(dh_bits);
 
     crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
              dh_bits);
     rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
     if (rc != GNUTLS_E_SUCCESS) {
         goto error;
     }
 
     return pcmk_rc_ok;
 
 error:
     crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
             CRM_XS " rc=%d", gnutls_strerror(rc), rc);
     return EPROTO;
 }
 
 /*!
  * \internal
  * \brief Process handshake data from TLS client
  *
  * Read as much TLS handshake data as is available.
  *
  * \param[in] client  Client connection
  *
  * \return Standard Pacemaker return code (of particular interest, EAGAIN
  *         if some data was successfully read but more data is needed)
  */
 int
 pcmk__read_handshake_data(const pcmk__client_t *client)
 {
     int rc = 0;
 
     CRM_ASSERT(client && client->remote && client->remote->tls_session);
 
     do {
         rc = gnutls_handshake(*client->remote->tls_session);
     } while (rc == GNUTLS_E_INTERRUPTED);
 
     if (rc == GNUTLS_E_AGAIN) {
         /* No more data is available at the moment. This function should be
          * invoked again once the client sends more.
          */
         return EAGAIN;
     } else if (rc != GNUTLS_E_SUCCESS) {
         crm_err("TLS handshake with remote client failed: %s "
                 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
         return EPROTO;
     }
     return pcmk_rc_ok;
 }
 
 // \return Standard Pacemaker return code
 static int
 send_tls(gnutls_session_t *session, struct iovec *iov)
 {
     const char *unsent = iov->iov_base;
     size_t unsent_len = iov->iov_len;
     ssize_t gnutls_rc;
 
     if (unsent == NULL) {
         return EINVAL;
     }
 
     crm_trace("Sending TLS message of %llu bytes",
               (unsigned long long) unsent_len);
     while (true) {
         gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
 
         if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
             crm_trace("Retrying to send %llu bytes remaining",
                       (unsigned long long) unsent_len);
 
         } else if (gnutls_rc < 0) {
             // Caller can log as error if necessary
             crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
                      gnutls_strerror((int) gnutls_rc),
                      (long long) gnutls_rc);
             return ECONNABORTED;
 
         } else if (gnutls_rc < unsent_len) {
             crm_trace("Sent %lld of %llu bytes remaining",
                       (long long) gnutls_rc, (unsigned long long) unsent_len);
             unsent_len -= gnutls_rc;
             unsent += gnutls_rc;
         } else {
             crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
             break;
         }
     }
     return pcmk_rc_ok;
 }
 #endif
 
 // \return Standard Pacemaker return code
 static int
 send_plaintext(int sock, struct iovec *iov)
 {
     const char *unsent = iov->iov_base;
     size_t unsent_len = iov->iov_len;
     ssize_t write_rc;
 
     if (unsent == NULL) {
         return EINVAL;
     }
 
     crm_debug("Sending plaintext message of %llu bytes to socket %d",
               (unsigned long long) unsent_len, sock);
     while (true) {
         write_rc = write(sock, unsent, unsent_len);
         if (write_rc < 0) {
             int rc = errno;
 
             if ((errno == EINTR) || (errno == EAGAIN)) {
                 crm_trace("Retrying to send %llu bytes remaining to socket %d",
                           (unsigned long long) unsent_len, sock);
                 continue;
             }
 
             // Caller can log as error if necessary
             crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
                      pcmk_rc_str(rc), rc, sock);
             return rc;
 
         } else if (write_rc < unsent_len) {
             crm_trace("Sent %lld of %llu bytes remaining",
                       (long long) write_rc, (unsigned long long) unsent_len);
             unsent += write_rc;
             unsent_len -= write_rc;
             continue;
 
         } else {
             crm_trace("Sent all %lld bytes remaining: %.100s",
                       (long long) write_rc, (char *) (iov->iov_base));
             break;
         }
     }
     return pcmk_rc_ok;
 }
 
 // \return Standard Pacemaker return code
 static int
 remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
 {
     int rc = pcmk_rc_ok;
 
     for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
 #ifdef HAVE_GNUTLS_GNUTLS_H
         if (remote->tls_session) {
             rc = send_tls(remote->tls_session, &(iov[lpc]));
             continue;
         }
 #endif
         if (remote->tcp_socket) {
             rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
         } else {
             rc = ESOCKTNOSUPPORT;
         }
     }
     return rc;
 }
 
 /*!
  * \internal
  * \brief Send an XML message over a Pacemaker Remote connection
  *
  * \param[in,out] remote  Pacemaker Remote connection to use
  * \param[in]     msg     XML to send
  *
  * \return Standard Pacemaker return code
  */
 int
 pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg)
 {
     int rc = pcmk_rc_ok;
     static uint64_t id = 0;
     char *xml_text = NULL;
 
     struct iovec iov[2];
     struct remote_header_v0 *header;
 
     CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
 
     xml_text = dump_xml_unformatted(msg);
     CRM_CHECK(xml_text != NULL, return EINVAL);
 
     header = calloc(1, sizeof(struct remote_header_v0));
     CRM_ASSERT(header != NULL);
 
     iov[0].iov_base = header;
     iov[0].iov_len = sizeof(struct remote_header_v0);
 
     iov[1].iov_base = xml_text;
     iov[1].iov_len = 1 + strlen(xml_text);
 
     id++;
     header->id = id;
     header->endian = ENDIAN_LOCAL;
     header->version = REMOTE_MSG_VERSION;
     header->payload_offset = iov[0].iov_len;
     header->payload_uncompressed = iov[1].iov_len;
     header->size_total = iov[0].iov_len + iov[1].iov_len;
 
     rc = remote_send_iovs(remote, iov, 2);
     if (rc != pcmk_rc_ok) {
         crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
                 pcmk_rc_str(rc), rc);
     }
 
     free(iov[0].iov_base);
     free(iov[1].iov_base);
     return rc;
 }
 
 /*!
  * \internal
  * \brief Obtain the XML from the currently buffered remote connection message
  *
  * \param[in,out] remote  Remote connection possibly with message available
  *
  * \return Newly allocated XML object corresponding to message data, or NULL
  * \note This effectively removes the message from the connection buffer.
  */
 xmlNode *
 pcmk__remote_message_xml(pcmk__remote_t *remote)
 {
     xmlNode *xml = NULL;
     struct remote_header_v0 *header = localized_remote_header(remote);
 
     if (header == NULL) {
         return NULL;
     }
 
     /* Support compression on the receiving end now, in case we ever want to add it later */
     if (header->payload_compressed) {
         int rc = 0;
         unsigned int size_u = 1 + header->payload_uncompressed;
         char *uncompressed = calloc(1, header->payload_offset + size_u);
 
         crm_trace("Decompressing message data %d bytes into %d bytes",
                  header->payload_compressed, size_u);
 
         rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
                                         remote->buffer + header->payload_offset,
                                         header->payload_compressed, 1, 0);
         rc = pcmk__bzlib2rc(rc);
 
         if (rc != pcmk_rc_ok && header->version > REMOTE_MSG_VERSION) {
             crm_warn("Couldn't decompress v%d message, we only understand v%d",
                      header->version, REMOTE_MSG_VERSION);
             free(uncompressed);
             return NULL;
 
         } else if (rc != pcmk_rc_ok) {
             crm_err("Decompression failed: %s " CRM_XS " rc=%d",
                     pcmk_rc_str(rc), rc);
             free(uncompressed);
             return NULL;
         }
 
         CRM_ASSERT(size_u == header->payload_uncompressed);
 
         memcpy(uncompressed, remote->buffer, header->payload_offset);       /* Preserve the header */
         remote->buffer_size = header->payload_offset + size_u;
 
         free(remote->buffer);
         remote->buffer = uncompressed;
         header = localized_remote_header(remote);
     }
 
     /* take ownership of the buffer */
     remote->buffer_offset = 0;
 
     CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
 
     xml = string2xml(remote->buffer + header->payload_offset);
     if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
         crm_warn("Couldn't parse v%d message, we only understand v%d",
                  header->version, REMOTE_MSG_VERSION);
 
     } else if (xml == NULL) {
         crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
     }
 
     return xml;
 }
 
 static int
 get_remote_socket(const pcmk__remote_t *remote)
 {
 #ifdef HAVE_GNUTLS_GNUTLS_H
     if (remote->tls_session) {
         void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
 
         return GPOINTER_TO_INT(sock_ptr);
     }
 #endif
 
     if (remote->tcp_socket) {
         return remote->tcp_socket;
     }
 
     crm_err("Remote connection type undetermined (bug?)");
     return -1;
 }
 
 /*!
  * \internal
  * \brief Wait for a remote session to have data to read
  *
  * \param[in] remote      Connection to check
  * \param[in] timeout_ms  Maximum time (in ms) to wait
  *
  * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
  *         there is data ready to be read, and ETIME if there is no data within
  *         the specified timeout)
  */
 int
 pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
 {
     struct pollfd fds = { 0, };
     int sock = 0;
     int rc = 0;
     time_t start;
     int timeout = timeout_ms;
 
     sock = get_remote_socket(remote);
     if (sock <= 0) {
         crm_trace("No longer connected");
         return ENOTCONN;
     }
 
     start = time(NULL);
     errno = 0;
     do {
         fds.fd = sock;
         fds.events = POLLIN;
 
         /* If we got an EINTR while polling, and we have a
          * specific timeout we are trying to honor, attempt
          * to adjust the timeout to the closest second. */
         if (errno == EINTR && (timeout > 0)) {
             timeout = timeout_ms - ((time(NULL) - start) * 1000);
             if (timeout < 1000) {
                 timeout = 1000;
             }
         }
 
         rc = poll(&fds, 1, timeout);
     } while (rc < 0 && errno == EINTR);
 
     if (rc < 0) {
         return errno;
     }
     return (rc == 0)? ETIME : pcmk_rc_ok;
 }
 
 /*!
  * \internal
  * \brief Read bytes from non-blocking remote connection
  *
  * \param[in,out] remote  Remote connection to read
  *
  * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if
  *         a full message has been received, or EAGAIN for a partial message)
  * \note Use only with non-blocking sockets after polling the socket.
  * \note This function will return when the socket read buffer is empty or an
  *       error is encountered.
  */
 static int
 read_available_remote_data(pcmk__remote_t *remote)
 {
     int rc = pcmk_rc_ok;
     size_t read_len = sizeof(struct remote_header_v0);
     struct remote_header_v0 *header = localized_remote_header(remote);
     bool received = false;
     ssize_t read_rc;
 
     if(header) {
         /* Stop at the end of the current message */
         read_len = header->size_total;
     }
 
     /* automatically grow the buffer when needed */
     if(remote->buffer_size < read_len) {
         remote->buffer_size = 2 * read_len;
         crm_trace("Expanding buffer to %llu bytes",
                   (unsigned long long) remote->buffer_size);
         remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
     }
 
 #ifdef HAVE_GNUTLS_GNUTLS_H
     if (!received && remote->tls_session) {
         read_rc = gnutls_record_recv(*(remote->tls_session),
                                      remote->buffer + remote->buffer_offset,
                                      remote->buffer_size - remote->buffer_offset);
         if (read_rc == GNUTLS_E_INTERRUPTED) {
             rc = EINTR;
         } else if (read_rc == GNUTLS_E_AGAIN) {
             rc = EAGAIN;
         } else if (read_rc < 0) {
             crm_debug("TLS receive failed: %s (%lld)",
                       gnutls_strerror(read_rc), (long long) read_rc);
             rc = EIO;
         }
         received = true;
     }
 #endif
 
     if (!received && remote->tcp_socket) {
         read_rc = read(remote->tcp_socket,
                        remote->buffer + remote->buffer_offset,
                        remote->buffer_size - remote->buffer_offset);
         if (read_rc < 0) {
             rc = errno;
         }
         received = true;
     }
 
     if (!received) {
         crm_err("Remote connection type undetermined (bug?)");
         return ESOCKTNOSUPPORT;
     }
 
     /* process any errors. */
     if (read_rc > 0) {
         remote->buffer_offset += read_rc;
         /* always null terminate buffer, the +1 to alloc always allows for this. */
         remote->buffer[remote->buffer_offset] = '\0';
         crm_trace("Received %lld more bytes (%llu total)",
                   (long long) read_rc,
                   (unsigned long long) remote->buffer_offset);
 
     } else if ((rc == EINTR) || (rc == EAGAIN)) {
         crm_trace("No data available for non-blocking remote read: %s (%d)",
                   pcmk_rc_str(rc), rc);
 
     } else if (read_rc == 0) {
         crm_debug("End of remote data encountered after %llu bytes",
                   (unsigned long long) remote->buffer_offset);
         return ENOTCONN;
 
     } else {
         crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
                   (unsigned long long) remote->buffer_offset,
                   pcmk_rc_str(rc), rc);
         return ENOTCONN;
     }
 
     header = localized_remote_header(remote);
     if(header) {
         if(remote->buffer_offset < header->size_total) {
             crm_trace("Read partial remote message (%llu of %u bytes)",
                       (unsigned long long) remote->buffer_offset,
                       header->size_total);
         } else {
             crm_trace("Read full remote message of %llu bytes",
                       (unsigned long long) remote->buffer_offset);
             return pcmk_rc_ok;
         }
     }
 
     return EAGAIN;
 }
 
 /*!
  * \internal
  * \brief Read one message from a remote connection
  *
  * \param[in,out] remote      Remote connection to read
  * \param[in]     timeout_ms  Fail if message not read in this many milliseconds
  *                            (10s will be used if 0, and 60s if negative)
  *
  * \return Standard Pacemaker return code
  */
 int
 pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
 {
     int rc = pcmk_rc_ok;
     time_t start = time(NULL);
     int remaining_timeout = 0;
 
     if (timeout_ms == 0) {
         timeout_ms = 10000;
     } else if (timeout_ms < 0) {
         timeout_ms = 60000;
     }
 
     remaining_timeout = timeout_ms;
     while (remaining_timeout > 0) {
 
         crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
                   remaining_timeout, timeout_ms);
         rc = pcmk__remote_ready(remote, remaining_timeout);
 
         if (rc == ETIME) {
             crm_err("Timed out (%d ms) while waiting for remote data",
                     remaining_timeout);
             return rc;
 
         } else if (rc != pcmk_rc_ok) {
             crm_debug("Wait for remote data aborted (will retry): %s "
                       CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
 
         } else {
             rc = read_available_remote_data(remote);
             if (rc == pcmk_rc_ok) {
                 return rc;
             } else if (rc == EAGAIN) {
                 crm_trace("Waiting for more remote data");
             } else {
                 crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
                           pcmk_rc_str(rc), rc);
             }
         }
 
         // Don't waste time retrying after fatal errors
         if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
             return rc;
         }
 
         remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
     }
     return ETIME;
 }
 
 struct tcp_async_cb_data {
     int sock;
     int timeout_ms;
     time_t start;
     void *userdata;
     void (*callback) (void *userdata, int rc, int sock);
 };
 
 // \return TRUE if timer should be rescheduled, FALSE otherwise
 static gboolean
 check_connect_finished(gpointer userdata)
 {
     struct tcp_async_cb_data *cb_data = userdata;
     int rc;
 
     fd_set rset, wset;
     struct timeval ts = { 0, };
 
     if (cb_data->start == 0) {
         // Last connect() returned success immediately
         rc = pcmk_rc_ok;
         goto dispatch_done;
     }
 
     // If the socket is ready for reading or writing, the connect succeeded
     FD_ZERO(&rset);
     FD_SET(cb_data->sock, &rset);
     wset = rset;
     rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
 
     if (rc < 0) { // select() error
         rc = errno;
         if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
             if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
                 return TRUE; // There is time left, so reschedule timer
             } else {
                 rc = ETIMEDOUT;
             }
         }
         crm_trace("Could not check socket %d for connection success: %s (%d)",
                   cb_data->sock, pcmk_rc_str(rc), rc);
 
     } else if (rc == 0) { // select() timeout
         if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
             return TRUE; // There is time left, so reschedule timer
         }
         crm_debug("Timed out while waiting for socket %d connection success",
                   cb_data->sock);
         rc = ETIMEDOUT;
 
     // select() returned number of file descriptors that are ready
 
     } else if (FD_ISSET(cb_data->sock, &rset)
                || FD_ISSET(cb_data->sock, &wset)) {
 
         // The socket is ready; check it for connection errors
         int error = 0;
         socklen_t len = sizeof(error);
 
         if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
             rc = errno;
             crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
                       cb_data->sock, pcmk_rc_str(rc), rc);
         } else if (error != 0) {
             rc = error;
             crm_trace("Socket %d connected with error: %s (%d)",
                       cb_data->sock, pcmk_rc_str(rc), rc);
         } else {
             rc = pcmk_rc_ok;
         }
 
     } else { // Should not be possible
         crm_trace("select() succeeded, but socket %d not in resulting "
                   "read/write sets", cb_data->sock);
         rc = EAGAIN;
     }
 
   dispatch_done:
     if (rc == pcmk_rc_ok) {
         crm_trace("Socket %d is connected", cb_data->sock);
     } else {
         close(cb_data->sock);
         cb_data->sock = -1;
     }
 
     if (cb_data->callback) {
         cb_data->callback(cb_data->userdata, rc, cb_data->sock);
     }
     free(cb_data);
     return FALSE; // Do not reschedule timer
 }
 
 /*!
  * \internal
  * \brief Attempt to connect socket, calling callback when done
  *
  * Set a given socket non-blocking, then attempt to connect to it,
  * retrying periodically until success or a timeout is reached.
  * Call a caller-supplied callback function when completed.
  *
  * \param[in]  sock        Newly created socket
  * \param[in]  addr        Socket address information for connect
  * \param[in]  addrlen     Size of socket address information in bytes
  * \param[in]  timeout_ms  Fail if not connected within this much time
  * \param[out] timer_id    If not NULL, store retry timer ID here
  * \param[in]  userdata    User data to pass to callback
  * \param[in]  callback    Function to call when connection attempt completes
  *
  * \return Standard Pacemaker return code
  */
 static int
 connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
                      int timeout_ms, int *timer_id, void *userdata,
                      void (*callback) (void *userdata, int rc, int sock))
 {
     int rc = 0;
     int interval = 500;
     int timer;
     struct tcp_async_cb_data *cb_data = NULL;
 
     rc = pcmk__set_nonblocking(sock);
     if (rc != pcmk_rc_ok) {
         crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
                  pcmk_rc_str(rc), rc);
         return rc;
     }
 
     rc = connect(sock, addr, addrlen);
     if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
         rc = errno;
         crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
                  pcmk_rc_str(rc), rc);
         return rc;
     }
 
     cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
     cb_data->userdata = userdata;
     cb_data->callback = callback;
     cb_data->sock = sock;
     cb_data->timeout_ms = timeout_ms;
 
     if (rc == 0) {
         /* The connect was successful immediately, we still return to mainloop
          * and let this callback get called later. This avoids the user of this api
          * to have to account for the fact the callback could be invoked within this
          * function before returning. */
         cb_data->start = 0;
         interval = 1;
     } else {
         cb_data->start = time(NULL);
     }
 
     /* This timer function does a non-blocking poll on the socket to see if we
      * can use it. Once we can, the connect has completed. This method allows us
      * to connect without blocking the mainloop.
      *
      * @TODO Use a mainloop fd callback for this instead of polling. Something
      *       about the way mainloop is currently polling prevents this from
      *       working at the moment though. (See connect(2) regarding EINPROGRESS
      *       for possible new handling needed.)
      */
     crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
               interval, sock);
     timer = g_timeout_add(interval, check_connect_finished, cb_data);
     if (timer_id) {
         *timer_id = timer;
     }
 
     // timer callback should be taking care of cb_data
     // cppcheck-suppress memleak
     return pcmk_rc_ok;
 }
 
 /*!
  * \internal
  * \brief Attempt once to connect socket and set it non-blocking
  *
  * \param[in]  sock        Newly created socket
  * \param[in]  addr        Socket address information for connect
  * \param[in]  addrlen     Size of socket address information in bytes
  *
  * \return Standard Pacemaker return code
  */
 static int
 connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
 {
     int rc = connect(sock, addr, addrlen);
 
     if (rc < 0) {
         rc = errno;
         crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
                  pcmk_rc_str(rc), rc);
         return rc;
     }
 
     rc = pcmk__set_nonblocking(sock);
     if (rc != pcmk_rc_ok) {
         crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
                  pcmk_rc_str(rc), rc);
         return rc;
     }
 
     return pcmk_ok;
 }
 
 /*!
  * \internal
  * \brief Connect to server at specified TCP port
  *
  * \param[in]  host        Name of server to connect to
  * \param[in]  port        Server port to connect to
  * \param[in]  timeout_ms  If asynchronous, fail if not connected in this time
  * \param[out] timer_id    If asynchronous and this is non-NULL, retry timer ID
  *                         will be put here (for ease of cancelling by caller)
  * \param[out] sock_fd     Where to store socket file descriptor
  * \param[in]  userdata    If asynchronous, data to pass to callback
  * \param[in]  callback    If NULL, attempt a single synchronous connection,
  *                         otherwise retry asynchronously then call this
  *
  * \return Standard Pacemaker return code
  */
 int
 pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
                      int *sock_fd, void *userdata,
                      void (*callback) (void *userdata, int rc, int sock))
 {
     char buffer[INET6_ADDRSTRLEN];
     struct addrinfo *res = NULL;
     struct addrinfo *rp = NULL;
     struct addrinfo hints;
     const char *server = host;
     int rc;
     int sock = -1;
 
     CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
 
     // Get host's IP address(es)
     memset(&hints, 0, sizeof(struct addrinfo));
     hints.ai_family = AF_UNSPEC;        /* Allow IPv4 or IPv6 */
     hints.ai_socktype = SOCK_STREAM;
     hints.ai_flags = AI_CANONNAME;
 
     rc = getaddrinfo(server, NULL, &hints, &res);
     rc = pcmk__gaierror2rc(rc);
 
     if (rc != pcmk_rc_ok) {
         crm_err("Unable to get IP address info for %s: %s",
                 server, pcmk_rc_str(rc));
         goto async_cleanup;
     }
 
     if (!res || !res->ai_addr) {
         crm_err("Unable to get IP address info for %s: no result", server);
         rc = ENOTCONN;
         goto async_cleanup;
     }
 
     // getaddrinfo() returns a list of host's addresses, try them in order
     for (rp = res; rp != NULL; rp = rp->ai_next) {
         struct sockaddr *addr = rp->ai_addr;
 
         if (!addr) {
             continue;
         }
 
         if (rp->ai_canonname) {
             server = res->ai_canonname;
         }
         crm_debug("Got canonical name %s for %s", server, host);
 
         sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
         if (sock == -1) {
             rc = errno;
             crm_warn("Could not create socket for remote connection to %s:%d: "
                      "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
             continue;
         }
 
         /* Set port appropriately for address family */
         /* (void*) casts avoid false-positive compiler alignment warnings */
         if (addr->sa_family == AF_INET6) {
             ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
         } else {
             ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
         }
 
         memset(buffer, 0, PCMK__NELEM(buffer));
         pcmk__sockaddr2str(addr, buffer);
         crm_info("Attempting remote connection to %s:%d", buffer, port);
 
         if (callback) {
             if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
                                      timer_id, userdata, callback) == pcmk_rc_ok) {
                 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
             }
 
         } else if (connect_socket_once(sock, rp->ai_addr,
                                        rp->ai_addrlen) == pcmk_rc_ok) {
             break;          /* Success */
         }
 
         // Connect failed
         close(sock);
         sock = -1;
         rc = ENOTCONN;
     }
 
 async_cleanup:
 
     if (res) {
         freeaddrinfo(res);
     }
     *sock_fd = sock;
     return rc;
 }
 
 /*!
  * \internal
  * \brief Convert an IP address (IPv4 or IPv6) to a string for logging
  *
  * \param[in]  sa  Socket address for IP
  * \param[out] s   Storage for at least INET6_ADDRSTRLEN bytes
  *
  * \note sa The socket address can be a pointer to struct sockaddr_in (IPv4),
  *          struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either),
  *          as long as its sa_family member is set correctly.
  */
 void
 pcmk__sockaddr2str(const void *sa, char *s)
 {
     switch (((const struct sockaddr *) sa)->sa_family) {
         case AF_INET:
             inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
                       s, INET6_ADDRSTRLEN);
             break;
 
         case AF_INET6:
             inet_ntop(AF_INET6,
                       &(((const struct sockaddr_in6 *) sa)->sin6_addr),
                       s, INET6_ADDRSTRLEN);
             break;
 
         default:
             strcpy(s, "<invalid>");
     }
 }
 
 /*!
  * \internal
  * \brief Accept a client connection on a remote server socket
  *
  * \param[in]  ssock  Server socket file descriptor being listened on
  * \param[out] csock  Where to put new client socket's file descriptor
  *
  * \return Standard Pacemaker return code
  */
 int
 pcmk__accept_remote_connection(int ssock, int *csock)
 {
     int rc;
     struct sockaddr_storage addr;
     socklen_t laddr = sizeof(addr);
     char addr_str[INET6_ADDRSTRLEN];
 
     /* accept the connection */
     memset(&addr, 0, sizeof(addr));
     *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
     if (*csock == -1) {
         rc = errno;
         crm_err("Could not accept remote client connection: %s "
                 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
         return rc;
     }
     pcmk__sockaddr2str(&addr, addr_str);
     crm_info("Accepted new remote client connection from %s", addr_str);
 
     rc = pcmk__set_nonblocking(*csock);
     if (rc != pcmk_rc_ok) {
         crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
                 pcmk_rc_str(rc), rc);
         close(*csock);
         *csock = -1;
         return rc;
     }
 
 #ifdef TCP_USER_TIMEOUT
     if (pcmk__get_sbd_timeout() > 0) {
         // Time to fail and retry before watchdog
         unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2;
 
         rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
                         &optval, sizeof(optval));
         if (rc < 0) {
             rc = errno;
             crm_err("Could not set TCP timeout to %d ms on remote connection: "
                     "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
             close(*csock);
             *csock = -1;
             return rc;
         }
     }
 #endif
 
     return rc;
 }
 
 /*!
  * \brief Get the default remote connection TCP port on this host
  *
  * \return Remote connection TCP port number
  */
 int
 crm_default_remote_port(void)
 {
     static int port = 0;
 
     if (port == 0) {
-        const char *env = getenv("PCMK_remote_port");
+        const char *env = pcmk__env_option(PCMK__ENV_REMOTE_PORT);
 
         if (env) {
             errno = 0;
             port = strtol(env, NULL, 10);
             if (errno || (port < 1) || (port > 65535)) {
-                crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
+                crm_warn("Environment variable PCMK_" PCMK__ENV_REMOTE_PORT
+                         " has invalid value '%s', using %d instead",
                          env, DEFAULT_REMOTE_PORT);
                 port = DEFAULT_REMOTE_PORT;
             }
         } else {
             port = DEFAULT_REMOTE_PORT;
         }
     }
     return port;
 }
diff --git a/lib/pengine/bundle.c b/lib/pengine/bundle.c
index 83a6788089..c8ef255346 100644
--- a/lib/pengine/bundle.c
+++ b/lib/pengine/bundle.c
@@ -1,2219 +1,2219 @@
 /*
  * Copyright 2004-2023 the Pacemaker project contributors
  *
  * The version control history for this file may have further details.
  *
  * This source code is licensed under the GNU Lesser General Public License
  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
  */
 
 #include <crm_internal.h>
 
 #include <ctype.h>
 #include <stdint.h>
 
 #include <crm/pengine/rules.h>
 #include <crm/pengine/status.h>
 #include <crm/pengine/internal.h>
 #include <crm/msg_xml.h>
 #include <crm/common/output.h>
 #include <crm/common/xml_internal.h>
 #include <pe_status_private.h>
 
 enum pe__bundle_mount_flags {
     pe__bundle_mount_none       = 0x00,
 
     // mount instance-specific subdirectory rather than source directly
     pe__bundle_mount_subdir     = 0x01
 };
 
 typedef struct {
     char *source;
     char *target;
     char *options;
     uint32_t flags; // bitmask of pe__bundle_mount_flags
 } pe__bundle_mount_t;
 
 typedef struct {
     char *source;
     char *target;
 } pe__bundle_port_t;
 
 enum pe__container_agent {
     PE__CONTAINER_AGENT_UNKNOWN,
     PE__CONTAINER_AGENT_DOCKER,
     PE__CONTAINER_AGENT_RKT,
     PE__CONTAINER_AGENT_PODMAN,
 };
 
 #define PE__CONTAINER_AGENT_UNKNOWN_S "unknown"
 #define PE__CONTAINER_AGENT_DOCKER_S  "docker"
 #define PE__CONTAINER_AGENT_RKT_S     "rkt"
 #define PE__CONTAINER_AGENT_PODMAN_S  "podman"
 
 typedef struct pe__bundle_variant_data_s {
         int promoted_max;
         int nreplicas;
         int nreplicas_per_host;
         char *prefix;
         char *image;
         const char *ip_last;
         char *host_network;
         char *host_netmask;
         char *control_port;
         char *container_network;
         char *ip_range_start;
         gboolean add_host;
         gchar *container_host_options;
         char *container_command;
         char *launcher_options;
         const char *attribute_target;
 
         pcmk_resource_t *child;
 
         GList *replicas;    // pe__bundle_replica_t *
         GList *ports;       // pe__bundle_port_t *
         GList *mounts;      // pe__bundle_mount_t *
 
         enum pe__container_agent agent_type;
 } pe__bundle_variant_data_t;
 
 #define get_bundle_variant_data(data, rsc)                      \
     CRM_ASSERT(rsc != NULL);                                    \
     CRM_ASSERT(rsc->variant == pcmk_rsc_variant_bundle);        \
     CRM_ASSERT(rsc->variant_opaque != NULL);                    \
     data = (pe__bundle_variant_data_t *) rsc->variant_opaque;
 
 /*!
  * \internal
  * \brief Get maximum number of bundle replicas allowed to run
  *
  * \param[in] rsc  Bundle or bundled resource to check
  *
  * \return Maximum replicas for bundle corresponding to \p rsc
  */
 int
 pe__bundle_max(const pcmk_resource_t *rsc)
 {
     const pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, pe__const_top_resource(rsc, true));
     return bundle_data->nreplicas;
 }
 
 /*!
  * \internal
  * \brief Get the resource inside a bundle
  *
  * \param[in] bundle  Bundle to check
  *
  * \return Resource inside \p bundle if any, otherwise NULL
  */
 pcmk_resource_t *
 pe__bundled_resource(const pcmk_resource_t *rsc)
 {
     const pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, pe__const_top_resource(rsc, true));
     return bundle_data->child;
 }
 
 /*!
  * \internal
  * \brief Get containerized resource corresponding to a given bundle container
  *
  * \param[in] instance  Collective instance that might be a bundle container
  *
  * \return Bundled resource instance inside \p instance if it is a bundle
  *         container instance, otherwise NULL
  */
 const pcmk_resource_t *
 pe__get_rsc_in_container(const pcmk_resource_t *instance)
 {
     const pe__bundle_variant_data_t *data = NULL;
     const pcmk_resource_t *top = pe__const_top_resource(instance, true);
 
     if ((top == NULL) || (top->variant != pcmk_rsc_variant_bundle)) {
         return NULL;
     }
     get_bundle_variant_data(data, top);
 
     for (const GList *iter = data->replicas; iter != NULL; iter = iter->next) {
         const pe__bundle_replica_t *replica = iter->data;
 
         if (instance == replica->container) {
             return replica->child;
         }
     }
     return NULL;
 }
 
 /*!
  * \internal
  * \brief Check whether a given node is created by a bundle
  *
  * \param[in] bundle  Bundle resource to check
  * \param[in] node    Node to check
  *
  * \return true if \p node is an instance of \p bundle, otherwise false
  */
 bool
 pe__node_is_bundle_instance(const pcmk_resource_t *bundle,
                             const pcmk_node_t *node)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, bundle);
     for (GList *iter = bundle_data->replicas; iter != NULL; iter = iter->next) {
         pe__bundle_replica_t *replica = iter->data;
 
         if (pe__same_node(node, replica->node)) {
             return true;
         }
     }
     return false;
 }
 
 /*!
  * \internal
  * \brief Get the container of a bundle's first replica
  *
  * \param[in] bundle  Bundle resource to get container for
  *
  * \return Container resource from first replica of \p bundle if any,
  *         otherwise NULL
  */
 pcmk_resource_t *
 pe__first_container(const pcmk_resource_t *bundle)
 {
     const pe__bundle_variant_data_t *bundle_data = NULL;
     const pe__bundle_replica_t *replica = NULL;
 
     get_bundle_variant_data(bundle_data, bundle);
     if (bundle_data->replicas == NULL) {
         return NULL;
     }
     replica = bundle_data->replicas->data;
     return replica->container;
 }
 
 /*!
  * \internal
  * \brief Iterate over bundle replicas
  *
  * \param[in,out] bundle     Bundle to iterate over
  * \param[in]     fn         Function to call for each replica (its return value
  *                           indicates whether to continue iterating)
  * \param[in,out] user_data  Pointer to pass to \p fn
  */
 void
 pe__foreach_bundle_replica(pcmk_resource_t *bundle,
                            bool (*fn)(pe__bundle_replica_t *, void *),
                            void *user_data)
 {
     const pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, bundle);
     for (GList *iter = bundle_data->replicas; iter != NULL; iter = iter->next) {
         if (!fn((pe__bundle_replica_t *) iter->data, user_data)) {
             break;
         }
     }
 }
 
 /*!
  * \internal
  * \brief Iterate over const bundle replicas
  *
  * \param[in]     bundle     Bundle to iterate over
  * \param[in]     fn         Function to call for each replica (its return value
  *                           indicates whether to continue iterating)
  * \param[in,out] user_data  Pointer to pass to \p fn
  */
 void
 pe__foreach_const_bundle_replica(const pcmk_resource_t *bundle,
                                  bool (*fn)(const pe__bundle_replica_t *,
                                             void *),
                                  void *user_data)
 {
     const pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, bundle);
     for (const GList *iter = bundle_data->replicas; iter != NULL;
          iter = iter->next) {
 
         if (!fn((const pe__bundle_replica_t *) iter->data, user_data)) {
             break;
         }
     }
 }
 
 static char *
 next_ip(const char *last_ip)
 {
     unsigned int oct1 = 0;
     unsigned int oct2 = 0;
     unsigned int oct3 = 0;
     unsigned int oct4 = 0;
     int rc = sscanf(last_ip, "%u.%u.%u.%u", &oct1, &oct2, &oct3, &oct4);
 
     if (rc != 4) {
         /*@ TODO check for IPv6 */
         return NULL;
 
     } else if (oct3 > 253) {
         return NULL;
 
     } else if (oct4 > 253) {
         ++oct3;
         oct4 = 1;
 
     } else {
         ++oct4;
     }
 
     return crm_strdup_printf("%u.%u.%u.%u", oct1, oct2, oct3, oct4);
 }
 
 static void
 allocate_ip(pe__bundle_variant_data_t *data, pe__bundle_replica_t *replica,
             GString *buffer)
 {
     if(data->ip_range_start == NULL) {
         return;
 
     } else if(data->ip_last) {
         replica->ipaddr = next_ip(data->ip_last);
 
     } else {
         replica->ipaddr = strdup(data->ip_range_start);
     }
 
     data->ip_last = replica->ipaddr;
     switch (data->agent_type) {
         case PE__CONTAINER_AGENT_DOCKER:
         case PE__CONTAINER_AGENT_PODMAN:
             if (data->add_host) {
                 g_string_append_printf(buffer, " --add-host=%s-%d:%s",
                                        data->prefix, replica->offset,
                                        replica->ipaddr);
             } else {
                 g_string_append_printf(buffer, " --hosts-entry=%s=%s-%d",
                                        replica->ipaddr, data->prefix,
                                        replica->offset);
             }
             break;
 
         case PE__CONTAINER_AGENT_RKT:
             g_string_append_printf(buffer, " --hosts-entry=%s=%s-%d",
                                    replica->ipaddr, data->prefix,
                                    replica->offset);
             break;
 
         default: // PE__CONTAINER_AGENT_UNKNOWN
             break;
     }
 }
 
 static xmlNode *
 create_resource(const char *name, const char *provider, const char *kind)
 {
     xmlNode *rsc = create_xml_node(NULL, XML_CIB_TAG_RESOURCE);
 
     crm_xml_add(rsc, XML_ATTR_ID, name);
     crm_xml_add(rsc, XML_AGENT_ATTR_CLASS, PCMK_RESOURCE_CLASS_OCF);
     crm_xml_add(rsc, XML_AGENT_ATTR_PROVIDER, provider);
     crm_xml_add(rsc, XML_ATTR_TYPE, kind);
 
     return rsc;
 }
 
 /*!
  * \internal
  * \brief Check whether cluster can manage resource inside container
  *
  * \param[in,out] data  Container variant data
  *
  * \return TRUE if networking configuration is acceptable, FALSE otherwise
  *
  * \note The resource is manageable if an IP range or control port has been
  *       specified. If a control port is used without an IP range, replicas per
  *       host must be 1.
  */
 static bool
 valid_network(pe__bundle_variant_data_t *data)
 {
     if(data->ip_range_start) {
         return TRUE;
     }
     if(data->control_port) {
         if(data->nreplicas_per_host > 1) {
             pe_err("Specifying the 'control-port' for %s requires 'replicas-per-host=1'", data->prefix);
             data->nreplicas_per_host = 1;
             // @TODO to be sure:
             // pe__clear_resource_flags(rsc, pcmk_rsc_unique);
         }
         return TRUE;
     }
     return FALSE;
 }
 
 static int
 create_ip_resource(pcmk_resource_t *parent, pe__bundle_variant_data_t *data,
                    pe__bundle_replica_t *replica)
 {
     if(data->ip_range_start) {
         char *id = NULL;
         xmlNode *xml_ip = NULL;
         xmlNode *xml_obj = NULL;
 
         id = crm_strdup_printf("%s-ip-%s", data->prefix, replica->ipaddr);
         crm_xml_sanitize_id(id);
         xml_ip = create_resource(id, "heartbeat", "IPaddr2");
         free(id);
 
         xml_obj = create_xml_node(xml_ip, XML_TAG_ATTR_SETS);
         crm_xml_set_id(xml_obj, "%s-attributes-%d",
                        data->prefix, replica->offset);
 
         crm_create_nvpair_xml(xml_obj, NULL, "ip", replica->ipaddr);
         if(data->host_network) {
             crm_create_nvpair_xml(xml_obj, NULL, "nic", data->host_network);
         }
 
         if(data->host_netmask) {
             crm_create_nvpair_xml(xml_obj, NULL,
                                   "cidr_netmask", data->host_netmask);
 
         } else {
             crm_create_nvpair_xml(xml_obj, NULL, "cidr_netmask", "32");
         }
 
         xml_obj = create_xml_node(xml_ip, "operations");
         crm_create_op_xml(xml_obj, ID(xml_ip), PCMK_ACTION_MONITOR, "60s",
                           NULL);
 
         // TODO: Other ops? Timeouts and intervals from underlying resource?
 
         if (pe__unpack_resource(xml_ip, &replica->ip, parent,
                                 parent->cluster) != pcmk_rc_ok) {
             return pcmk_rc_unpack_error;
         }
 
         parent->children = g_list_append(parent->children, replica->ip);
     }
     return pcmk_rc_ok;
 }
 
 static const char*
 container_agent_str(enum pe__container_agent t)
 {
     switch (t) {
         case PE__CONTAINER_AGENT_DOCKER: return PE__CONTAINER_AGENT_DOCKER_S;
         case PE__CONTAINER_AGENT_RKT:    return PE__CONTAINER_AGENT_RKT_S;
         case PE__CONTAINER_AGENT_PODMAN: return PE__CONTAINER_AGENT_PODMAN_S;
         default: // PE__CONTAINER_AGENT_UNKNOWN
             break;
     }
     return PE__CONTAINER_AGENT_UNKNOWN_S;
 }
 
 static int
 create_container_resource(pcmk_resource_t *parent,
                           const pe__bundle_variant_data_t *data,
                           pe__bundle_replica_t *replica)
 {
     char *id = NULL;
     xmlNode *xml_container = NULL;
     xmlNode *xml_obj = NULL;
 
     // Agent-specific
     const char *hostname_opt = NULL;
     const char *env_opt = NULL;
     const char *agent_str = NULL;
     int volid = 0;  // rkt-only
 
     GString *buffer = NULL;
     GString *dbuffer = NULL;
 
     // Where syntax differences are drop-in replacements, set them now
     switch (data->agent_type) {
         case PE__CONTAINER_AGENT_DOCKER:
         case PE__CONTAINER_AGENT_PODMAN:
             hostname_opt = "-h ";
             env_opt = "-e ";
             break;
         case PE__CONTAINER_AGENT_RKT:
             hostname_opt = "--hostname=";
             env_opt = "--environment=";
             break;
         default:    // PE__CONTAINER_AGENT_UNKNOWN
             return pcmk_rc_unpack_error;
     }
     agent_str = container_agent_str(data->agent_type);
 
     buffer = g_string_sized_new(4096);
 
     id = crm_strdup_printf("%s-%s-%d", data->prefix, agent_str,
                            replica->offset);
     crm_xml_sanitize_id(id);
     xml_container = create_resource(id, "heartbeat", agent_str);
     free(id);
 
     xml_obj = create_xml_node(xml_container, XML_TAG_ATTR_SETS);
     crm_xml_set_id(xml_obj, "%s-attributes-%d", data->prefix, replica->offset);
 
     crm_create_nvpair_xml(xml_obj, NULL, "image", data->image);
     crm_create_nvpair_xml(xml_obj, NULL, "allow_pull", XML_BOOLEAN_TRUE);
     crm_create_nvpair_xml(xml_obj, NULL, "force_kill", XML_BOOLEAN_FALSE);
     crm_create_nvpair_xml(xml_obj, NULL, "reuse", XML_BOOLEAN_FALSE);
 
     if (data->agent_type == PE__CONTAINER_AGENT_DOCKER) {
         g_string_append(buffer, " --restart=no");
     }
 
     /* Set a container hostname only if we have an IP to map it to. The user can
      * set -h or --uts=host themselves if they want a nicer name for logs, but
      * this makes applications happy who need their  hostname to match the IP
      * they bind to.
      */
     if (data->ip_range_start != NULL) {
         g_string_append_printf(buffer, " %s%s-%d", hostname_opt, data->prefix,
                                replica->offset);
     }
     pcmk__g_strcat(buffer, " ", env_opt, "PCMK_stderr=1", NULL);
 
     if (data->container_network != NULL) {
         pcmk__g_strcat(buffer, " --net=", data->container_network, NULL);
     }
 
     if (data->control_port != NULL) {
-        pcmk__g_strcat(buffer, " ", env_opt, "PCMK_remote_port=",
-                      data->control_port, NULL);
+        pcmk__g_strcat(buffer, " ", env_opt, "PCMK_" PCMK__ENV_REMOTE_PORT "=",
+                       data->control_port, NULL);
     } else {
-        g_string_append_printf(buffer, " %sPCMK_remote_port=%d", env_opt,
-                               DEFAULT_REMOTE_PORT);
+        g_string_append_printf(buffer, " %sPCMK_" PCMK__ENV_REMOTE_PORT "=%d",
+                               env_opt, DEFAULT_REMOTE_PORT);
     }
 
     for (GList *iter = data->mounts; iter != NULL; iter = iter->next) {
         pe__bundle_mount_t *mount = (pe__bundle_mount_t *) iter->data;
         char *source = NULL;
 
         if (pcmk_is_set(mount->flags, pe__bundle_mount_subdir)) {
             source = crm_strdup_printf("%s/%s-%d", mount->source, data->prefix,
                                        replica->offset);
             pcmk__add_separated_word(&dbuffer, 1024, source, ",");
         }
 
         switch (data->agent_type) {
             case PE__CONTAINER_AGENT_DOCKER:
             case PE__CONTAINER_AGENT_PODMAN:
                 pcmk__g_strcat(buffer,
                                " -v ", pcmk__s(source, mount->source),
                                ":", mount->target, NULL);
 
                 if (mount->options != NULL) {
                     pcmk__g_strcat(buffer, ":", mount->options, NULL);
                 }
                 break;
             case PE__CONTAINER_AGENT_RKT:
                 g_string_append_printf(buffer,
                                        " --volume vol%d,kind=host,"
                                        "source=%s%s%s "
                                        "--mount volume=vol%d,target=%s",
                                        volid, pcmk__s(source, mount->source),
                                        (mount->options != NULL)? "," : "",
                                        pcmk__s(mount->options, ""),
                                        volid, mount->target);
                 volid++;
                 break;
             default:
                 break;
         }
         free(source);
     }
 
     for (GList *iter = data->ports; iter != NULL; iter = iter->next) {
         pe__bundle_port_t *port = (pe__bundle_port_t *) iter->data;
 
         switch (data->agent_type) {
             case PE__CONTAINER_AGENT_DOCKER:
             case PE__CONTAINER_AGENT_PODMAN:
                 if (replica->ipaddr != NULL) {
                     pcmk__g_strcat(buffer,
                                    " -p ", replica->ipaddr, ":", port->source,
                                    ":", port->target, NULL);
 
                 } else if (!pcmk__str_eq(data->container_network, "host",
                                          pcmk__str_none)) {
                     // No need to do port mapping if net == host
                     pcmk__g_strcat(buffer,
                                    " -p ", port->source, ":", port->target,
                                    NULL);
                 }
                 break;
             case PE__CONTAINER_AGENT_RKT:
                 if (replica->ipaddr != NULL) {
                     pcmk__g_strcat(buffer,
                                    " --port=", port->target,
                                    ":", replica->ipaddr, ":", port->source,
                                    NULL);
                 } else {
                     pcmk__g_strcat(buffer,
                                    " --port=", port->target, ":", port->source,
                                    NULL);
                 }
                 break;
             default:
                 break;
         }
     }
 
     /* @COMPAT: We should use pcmk__add_word() here, but we can't yet, because
      * it would cause restarts during rolling upgrades.
      *
      * In a previous version of the container resource creation logic, if
      * data->launcher_options is not NULL, we append
      * (" %s", data->launcher_options) even if data->launcher_options is an
      * empty string. Likewise for data->container_host_options. Using
      *
      *     pcmk__add_word(buffer, 0, data->launcher_options)
      *
      * removes that extra trailing space, causing a resource definition change.
      */
     if (data->launcher_options != NULL) {
         pcmk__g_strcat(buffer, " ", data->launcher_options, NULL);
     }
 
     if (data->container_host_options != NULL) {
         pcmk__g_strcat(buffer, " ", data->container_host_options, NULL);
     }
 
     crm_create_nvpair_xml(xml_obj, NULL, "run_opts",
                           (const char *) buffer->str);
     g_string_free(buffer, TRUE);
 
     crm_create_nvpair_xml(xml_obj, NULL, "mount_points",
                           (dbuffer != NULL)? (const char *) dbuffer->str : "");
     if (dbuffer != NULL) {
         g_string_free(dbuffer, TRUE);
     }
 
     if (replica->child != NULL) {
         if (data->container_command != NULL) {
             crm_create_nvpair_xml(xml_obj, NULL, "run_cmd",
                                   data->container_command);
         } else {
             crm_create_nvpair_xml(xml_obj, NULL, "run_cmd",
                                   SBIN_DIR "/pacemaker-remoted");
         }
 
         /* TODO: Allow users to specify their own?
          *
          * We just want to know if the container is alive; we'll monitor the
          * child independently.
          */
         crm_create_nvpair_xml(xml_obj, NULL, "monitor_cmd", "/bin/true");
 #if 0
         /* @TODO Consider supporting the use case where we can start and stop
          * resources, but not proxy local commands (such as setting node
          * attributes), by running the local executor in stand-alone mode.
          * However, this would probably be better done via ACLs as with other
          * Pacemaker Remote nodes.
          */
     } else if ((child != NULL) && data->untrusted) {
         crm_create_nvpair_xml(xml_obj, NULL, "run_cmd",
                               CRM_DAEMON_DIR "/pacemaker-execd");
         crm_create_nvpair_xml(xml_obj, NULL, "monitor_cmd",
                               CRM_DAEMON_DIR "/pacemaker/cts-exec-helper -c poke");
 #endif
     } else {
         if (data->container_command != NULL) {
             crm_create_nvpair_xml(xml_obj, NULL, "run_cmd",
                                   data->container_command);
         }
 
         /* TODO: Allow users to specify their own?
          *
          * We don't know what's in the container, so we just want to know if it
          * is alive.
          */
         crm_create_nvpair_xml(xml_obj, NULL, "monitor_cmd", "/bin/true");
     }
 
     xml_obj = create_xml_node(xml_container, "operations");
     crm_create_op_xml(xml_obj, ID(xml_container), PCMK_ACTION_MONITOR, "60s",
                       NULL);
 
     // TODO: Other ops? Timeouts and intervals from underlying resource?
     if (pe__unpack_resource(xml_container, &replica->container, parent,
                             parent->cluster) != pcmk_rc_ok) {
         return pcmk_rc_unpack_error;
     }
     pe__set_resource_flags(replica->container, pcmk_rsc_replica_container);
     parent->children = g_list_append(parent->children, replica->container);
 
     return pcmk_rc_ok;
 }
 
 /*!
  * \brief Ban a node from a resource's (and its children's) allowed nodes list
  *
  * \param[in,out] rsc    Resource to modify
  * \param[in]     uname  Name of node to ban
  */
 static void
 disallow_node(pcmk_resource_t *rsc, const char *uname)
 {
     gpointer match = g_hash_table_lookup(rsc->allowed_nodes, uname);
 
     if (match) {
         ((pcmk_node_t *) match)->weight = -INFINITY;
         ((pcmk_node_t *) match)->rsc_discover_mode = pcmk_probe_never;
     }
     if (rsc->children) {
         g_list_foreach(rsc->children, (GFunc) disallow_node, (gpointer) uname);
     }
 }
 
 static int
 create_remote_resource(pcmk_resource_t *parent, pe__bundle_variant_data_t *data,
                        pe__bundle_replica_t *replica)
 {
     if (replica->child && valid_network(data)) {
         GHashTableIter gIter;
         pcmk_node_t *node = NULL;
         xmlNode *xml_remote = NULL;
         char *id = crm_strdup_printf("%s-%d", data->prefix, replica->offset);
         char *port_s = NULL;
         const char *uname = NULL;
         const char *connect_name = NULL;
 
         if (pe_find_resource(parent->cluster->resources, id) != NULL) {
             free(id);
             // The biggest hammer we have
             id = crm_strdup_printf("pcmk-internal-%s-remote-%d",
                                    replica->child->id, replica->offset);
             //@TODO return error instead of asserting?
             CRM_ASSERT(pe_find_resource(parent->cluster->resources,
                                         id) == NULL);
         }
 
         /* REMOTE_CONTAINER_HACK: Using "#uname" as the server name when the
          * connection does not have its own IP is a magic string that we use to
          * support nested remotes (i.e. a bundle running on a remote node).
          */
         connect_name = (replica->ipaddr? replica->ipaddr : "#uname");
 
         if (data->control_port == NULL) {
             port_s = pcmk__itoa(DEFAULT_REMOTE_PORT);
         }
 
         /* This sets replica->container as replica->remote's container, which is
          * similar to what happens with guest nodes. This is how the scheduler
          * knows that the bundle node is fenced by recovering the container, and
          * that remote should be ordered relative to the container.
          */
         xml_remote = pe_create_remote_xml(NULL, id, replica->container->id,
                                           NULL, NULL, NULL,
                                           connect_name, (data->control_port?
                                           data->control_port : port_s));
         free(port_s);
 
         /* Abandon our created ID, and pull the copy from the XML, because we
          * need something that will get freed during data set cleanup to use as
          * the node ID and uname.
          */
         free(id);
         id = NULL;
         uname = ID(xml_remote);
 
         /* Ensure a node has been created for the guest (it may have already
          * been, if it has a permanent node attribute), and ensure its weight is
          * -INFINITY so no other resources can run on it.
          */
         node = pe_find_node(parent->cluster->nodes, uname);
         if (node == NULL) {
             node = pe_create_node(uname, uname, "remote", "-INFINITY",
                                   parent->cluster);
         } else {
             node->weight = -INFINITY;
         }
         node->rsc_discover_mode = pcmk_probe_never;
 
         /* unpack_remote_nodes() ensures that each remote node and guest node
          * has a pcmk_node_t entry. Ideally, it would do the same for bundle
          * nodes. Unfortunately, a bundle has to be mostly unpacked before it's
          * obvious what nodes will be needed, so we do it just above.
          *
          * Worse, that means that the node may have been utilized while
          * unpacking other resources, without our weight correction. The most
          * likely place for this to happen is when pe__unpack_resource() calls
          * resource_location() to set a default score in symmetric clusters.
          * This adds a node *copy* to each resource's allowed nodes, and these
          * copies will have the wrong weight.
          *
          * As a hacky workaround, fix those copies here.
          *
          * @TODO Possible alternative: ensure bundles are unpacked before other
          * resources, so the weight is correct before any copies are made.
          */
         g_list_foreach(parent->cluster->resources, (GFunc) disallow_node,
                        (gpointer) uname);
 
         replica->node = pe__copy_node(node);
         replica->node->weight = 500;
         replica->node->rsc_discover_mode = pcmk_probe_exclusive;
 
         /* Ensure the node shows up as allowed and with the correct discovery set */
         if (replica->child->allowed_nodes != NULL) {
             g_hash_table_destroy(replica->child->allowed_nodes);
         }
         replica->child->allowed_nodes = pcmk__strkey_table(NULL, free);
         g_hash_table_insert(replica->child->allowed_nodes,
                             (gpointer) replica->node->details->id,
                             pe__copy_node(replica->node));
 
         {
             pcmk_node_t *copy = pe__copy_node(replica->node);
             copy->weight = -INFINITY;
             g_hash_table_insert(replica->child->parent->allowed_nodes,
                                 (gpointer) replica->node->details->id, copy);
         }
         if (pe__unpack_resource(xml_remote, &replica->remote, parent,
                                 parent->cluster) != pcmk_rc_ok) {
             return pcmk_rc_unpack_error;
         }
 
         g_hash_table_iter_init(&gIter, replica->remote->allowed_nodes);
         while (g_hash_table_iter_next(&gIter, NULL, (void **)&node)) {
             if (pe__is_guest_or_remote_node(node)) {
                 /* Remote resources can only run on 'normal' cluster node */
                 node->weight = -INFINITY;
             }
         }
 
         replica->node->details->remote_rsc = replica->remote;
 
         // Ensure pe__is_guest_node() functions correctly immediately
         replica->remote->container = replica->container;
 
         /* A bundle's #kind is closer to "container" (guest node) than the
          * "remote" set by pe_create_node().
          */
         g_hash_table_insert(replica->node->details->attrs,
                             strdup(CRM_ATTR_KIND), strdup("container"));
 
         /* One effect of this is that setup_container() will add
          * replica->remote to replica->container's fillers, which will make
          * pe__resource_contains_guest_node() true for replica->container.
          *
          * replica->child does NOT get added to replica->container's fillers.
          * The only noticeable effect if it did would be for its fail count to
          * be taken into account when checking replica->container's migration
          * threshold.
          */
         parent->children = g_list_append(parent->children, replica->remote);
     }
     return pcmk_rc_ok;
 }
 
 static int
 create_replica_resources(pcmk_resource_t *parent, pe__bundle_variant_data_t *data,
                          pe__bundle_replica_t *replica)
 {
     int rc = pcmk_rc_ok;
 
     rc = create_container_resource(parent, data, replica);
     if (rc != pcmk_rc_ok) {
         return rc;
     }
 
     rc = create_ip_resource(parent, data, replica);
     if (rc != pcmk_rc_ok) {
         return rc;
     }
 
     rc = create_remote_resource(parent, data, replica);
     if (rc != pcmk_rc_ok) {
         return rc;
     }
 
     if ((replica->child != NULL) && (replica->ipaddr != NULL)) {
         add_hash_param(replica->child->meta, "external-ip", replica->ipaddr);
     }
 
     if (replica->remote != NULL) {
         /*
          * Allow the remote connection resource to be allocated to a
          * different node than the one on which the container is active.
          *
          * This makes it possible to have Pacemaker Remote nodes running
          * containers with pacemaker-remoted inside in order to start
          * services inside those containers.
          */
         pe__set_resource_flags(replica->remote,
                                pcmk_rsc_remote_nesting_allowed);
     }
     return rc;
 }
 
 static void
 mount_add(pe__bundle_variant_data_t *bundle_data, const char *source,
           const char *target, const char *options, uint32_t flags)
 {
     pe__bundle_mount_t *mount = calloc(1, sizeof(pe__bundle_mount_t));
 
     CRM_ASSERT(mount != NULL);
     mount->source = strdup(source);
     mount->target = strdup(target);
     pcmk__str_update(&mount->options, options);
     mount->flags = flags;
     bundle_data->mounts = g_list_append(bundle_data->mounts, mount);
 }
 
 static void
 mount_free(pe__bundle_mount_t *mount)
 {
     free(mount->source);
     free(mount->target);
     free(mount->options);
     free(mount);
 }
 
 static void
 port_free(pe__bundle_port_t *port)
 {
     free(port->source);
     free(port->target);
     free(port);
 }
 
 static pe__bundle_replica_t *
 replica_for_remote(pcmk_resource_t *remote)
 {
     pcmk_resource_t *top = remote;
     pe__bundle_variant_data_t *bundle_data = NULL;
 
     if (top == NULL) {
         return NULL;
     }
 
     while (top->parent != NULL) {
         top = top->parent;
     }
 
     get_bundle_variant_data(bundle_data, top);
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
 
         if (replica->remote == remote) {
             return replica;
         }
     }
     CRM_LOG_ASSERT(FALSE);
     return NULL;
 }
 
 bool
 pe__bundle_needs_remote_name(pcmk_resource_t *rsc)
 {
     const char *value;
     GHashTable *params = NULL;
 
     if (rsc == NULL) {
         return false;
     }
 
     // Use NULL node since pcmk__bundle_expand() uses that to set value
     params = pe_rsc_params(rsc, NULL, rsc->cluster);
     value = g_hash_table_lookup(params, XML_RSC_ATTR_REMOTE_RA_ADDR);
 
     return pcmk__str_eq(value, "#uname", pcmk__str_casei)
            && xml_contains_remote_node(rsc->xml);
 }
 
 const char *
 pe__add_bundle_remote_name(pcmk_resource_t *rsc, pcmk_scheduler_t *data_set,
                            xmlNode *xml, const char *field)
 {
     // REMOTE_CONTAINER_HACK: Allow remote nodes that start containers with pacemaker remote inside
 
     pcmk_node_t *node = NULL;
     pe__bundle_replica_t *replica = NULL;
 
     if (!pe__bundle_needs_remote_name(rsc)) {
         return NULL;
     }
 
     replica = replica_for_remote(rsc);
     if (replica == NULL) {
         return NULL;
     }
 
     node = replica->container->allocated_to;
     if (node == NULL) {
         /* If it won't be running anywhere after the
          * transition, go with where it's running now.
          */
         node = pe__current_node(replica->container);
     }
 
     if(node == NULL) {
         crm_trace("Cannot determine address for bundle connection %s", rsc->id);
         return NULL;
     }
 
     crm_trace("Setting address for bundle connection %s to bundle host %s",
               rsc->id, pe__node_name(node));
     if(xml != NULL && field != NULL) {
         crm_xml_add(xml, field, node->details->uname);
     }
 
     return node->details->uname;
 }
 
 #define pe__set_bundle_mount_flags(mount_xml, flags, flags_to_set) do {     \
         flags = pcmk__set_flags_as(__func__, __LINE__, LOG_TRACE,           \
                                    "Bundle mount", ID(mount_xml), flags,    \
                                    (flags_to_set), #flags_to_set);          \
     } while (0)
 
 gboolean
 pe__unpack_bundle(pcmk_resource_t *rsc, pcmk_scheduler_t *data_set)
 {
     const char *value = NULL;
     xmlNode *xml_obj = NULL;
     xmlNode *xml_resource = NULL;
     pe__bundle_variant_data_t *bundle_data = NULL;
     bool need_log_mount = TRUE;
 
     CRM_ASSERT(rsc != NULL);
     pe_rsc_trace(rsc, "Processing resource %s...", rsc->id);
 
     bundle_data = calloc(1, sizeof(pe__bundle_variant_data_t));
     rsc->variant_opaque = bundle_data;
     bundle_data->prefix = strdup(rsc->id);
 
     xml_obj = first_named_child(rsc->xml, PE__CONTAINER_AGENT_DOCKER_S);
     if (xml_obj != NULL) {
         bundle_data->agent_type = PE__CONTAINER_AGENT_DOCKER;
     } else {
         xml_obj = first_named_child(rsc->xml, PE__CONTAINER_AGENT_RKT_S);
         if (xml_obj != NULL) {
             bundle_data->agent_type = PE__CONTAINER_AGENT_RKT;
         } else {
             xml_obj = first_named_child(rsc->xml, PE__CONTAINER_AGENT_PODMAN_S);
             if (xml_obj != NULL) {
                 bundle_data->agent_type = PE__CONTAINER_AGENT_PODMAN;
             } else {
                 return FALSE;
             }
         }
     }
 
     // Use 0 for default, minimum, and invalid promoted-max
     value = crm_element_value(xml_obj, PCMK_META_PROMOTED_MAX);
     if (value == NULL) {
         // @COMPAT deprecated since 2.0.0
         value = crm_element_value(xml_obj, "masters");
     }
     pcmk__scan_min_int(value, &bundle_data->promoted_max, 0);
 
     // Default replicas to promoted-max if it was specified and 1 otherwise
     value = crm_element_value(xml_obj, "replicas");
     if ((value == NULL) && (bundle_data->promoted_max > 0)) {
         bundle_data->nreplicas = bundle_data->promoted_max;
     } else {
         pcmk__scan_min_int(value, &bundle_data->nreplicas, 1);
     }
 
     /*
      * Communication between containers on the same host via the
      * floating IPs only works if the container is started with:
      *   --userland-proxy=false --ip-masq=false
      */
     value = crm_element_value(xml_obj, "replicas-per-host");
     pcmk__scan_min_int(value, &bundle_data->nreplicas_per_host, 1);
     if (bundle_data->nreplicas_per_host == 1) {
         pe__clear_resource_flags(rsc, pcmk_rsc_unique);
     }
 
     bundle_data->container_command = crm_element_value_copy(xml_obj, "run-command");
     bundle_data->launcher_options = crm_element_value_copy(xml_obj, "options");
     bundle_data->image = crm_element_value_copy(xml_obj, "image");
     bundle_data->container_network = crm_element_value_copy(xml_obj, "network");
 
     xml_obj = first_named_child(rsc->xml, "network");
     if(xml_obj) {
 
         bundle_data->ip_range_start = crm_element_value_copy(xml_obj, "ip-range-start");
         bundle_data->host_netmask = crm_element_value_copy(xml_obj, "host-netmask");
         bundle_data->host_network = crm_element_value_copy(xml_obj, "host-interface");
         bundle_data->control_port = crm_element_value_copy(xml_obj, "control-port");
         value = crm_element_value(xml_obj, "add-host");
         if (crm_str_to_boolean(value, &bundle_data->add_host) != 1) {
             bundle_data->add_host = TRUE;
         }
 
         for (xmlNode *xml_child = pcmk__xe_first_child(xml_obj); xml_child != NULL;
              xml_child = pcmk__xe_next(xml_child)) {
 
             pe__bundle_port_t *port = calloc(1, sizeof(pe__bundle_port_t));
             port->source = crm_element_value_copy(xml_child, "port");
 
             if(port->source == NULL) {
                 port->source = crm_element_value_copy(xml_child, "range");
             } else {
                 port->target = crm_element_value_copy(xml_child, "internal-port");
             }
 
             if(port->source != NULL && strlen(port->source) > 0) {
                 if(port->target == NULL) {
                     port->target = strdup(port->source);
                 }
                 bundle_data->ports = g_list_append(bundle_data->ports, port);
 
             } else {
                 pe_err("Invalid port directive %s", ID(xml_child));
                 port_free(port);
             }
         }
     }
 
     xml_obj = first_named_child(rsc->xml, "storage");
     for (xmlNode *xml_child = pcmk__xe_first_child(xml_obj); xml_child != NULL;
          xml_child = pcmk__xe_next(xml_child)) {
 
         const char *source = crm_element_value(xml_child, "source-dir");
         const char *target = crm_element_value(xml_child, "target-dir");
         const char *options = crm_element_value(xml_child, "options");
         int flags = pe__bundle_mount_none;
 
         if (source == NULL) {
             source = crm_element_value(xml_child, "source-dir-root");
             pe__set_bundle_mount_flags(xml_child, flags,
                                        pe__bundle_mount_subdir);
         }
 
         if (source && target) {
             mount_add(bundle_data, source, target, options, flags);
             if (strcmp(target, "/var/log") == 0) {
                 need_log_mount = FALSE;
             }
         } else {
             pe_err("Invalid mount directive %s", ID(xml_child));
         }
     }
 
     xml_obj = first_named_child(rsc->xml, "primitive");
     if (xml_obj && valid_network(bundle_data)) {
         char *value = NULL;
         xmlNode *xml_set = NULL;
 
         xml_resource = create_xml_node(NULL, XML_CIB_TAG_INCARNATION);
 
         /* @COMPAT We no longer use the <master> tag, but we need to keep it as
          * part of the resource name, so that bundles don't restart in a rolling
          * upgrade. (It also avoids needing to change regression tests.)
          */
         crm_xml_set_id(xml_resource, "%s-%s", bundle_data->prefix,
                       (bundle_data->promoted_max? "master"
                       : (const char *)xml_resource->name));
 
         xml_set = create_xml_node(xml_resource, XML_TAG_META_SETS);
         crm_xml_set_id(xml_set, "%s-%s-meta", bundle_data->prefix, xml_resource->name);
 
         crm_create_nvpair_xml(xml_set, NULL,
                               XML_RSC_ATTR_ORDERED, XML_BOOLEAN_TRUE);
 
         value = pcmk__itoa(bundle_data->nreplicas);
         crm_create_nvpair_xml(xml_set, NULL, PCMK_META_CLONE_MAX, value);
         free(value);
 
         value = pcmk__itoa(bundle_data->nreplicas_per_host);
         crm_create_nvpair_xml(xml_set, NULL, PCMK_META_CLONE_NODE_MAX, value);
         free(value);
 
         crm_create_nvpair_xml(xml_set, NULL, XML_RSC_ATTR_UNIQUE,
                               pcmk__btoa(bundle_data->nreplicas_per_host > 1));
 
         if (bundle_data->promoted_max) {
             crm_create_nvpair_xml(xml_set, NULL,
                                   XML_RSC_ATTR_PROMOTABLE, XML_BOOLEAN_TRUE);
 
             value = pcmk__itoa(bundle_data->promoted_max);
             crm_create_nvpair_xml(xml_set, NULL, PCMK_META_PROMOTED_MAX, value);
             free(value);
         }
 
         //crm_xml_add(xml_obj, XML_ATTR_ID, bundle_data->prefix);
         add_node_copy(xml_resource, xml_obj);
 
     } else if(xml_obj) {
         pe_err("Cannot control %s inside %s without either ip-range-start or control-port",
                rsc->id, ID(xml_obj));
         return FALSE;
     }
 
     if(xml_resource) {
         int lpc = 0;
         GList *childIter = NULL;
         pe__bundle_port_t *port = NULL;
         GString *buffer = NULL;
 
         if (pe__unpack_resource(xml_resource, &(bundle_data->child), rsc,
                                 data_set) != pcmk_rc_ok) {
             return FALSE;
         }
 
         /* Currently, we always map the default authentication key location
          * into the same location inside the container.
          *
          * Ideally, we would respect the host's PCMK_authkey_location, but:
          * - it may be different on different nodes;
          * - the actual connection will do extra checking to make sure the key
          *   file exists and is readable, that we can't do here on the DC
          * - tools such as crm_resource and crm_simulate may not have the same
          *   environment variables as the cluster, causing operation digests to
          *   differ
          *
          * Always using the default location inside the container is fine,
          * because we control the pacemaker_remote environment, and it avoids
          * having to pass another environment variable to the container.
          *
          * @TODO A better solution may be to have only pacemaker_remote use the
          * environment variable, and have the cluster nodes use a new
          * cluster option for key location. This would introduce the limitation
          * of the location being the same on all cluster nodes, but that's
          * reasonable.
          */
         mount_add(bundle_data, DEFAULT_REMOTE_KEY_LOCATION,
                   DEFAULT_REMOTE_KEY_LOCATION, NULL, pe__bundle_mount_none);
 
         if (need_log_mount) {
             mount_add(bundle_data, CRM_BUNDLE_DIR, "/var/log", NULL,
                       pe__bundle_mount_subdir);
         }
 
         port = calloc(1, sizeof(pe__bundle_port_t));
         if(bundle_data->control_port) {
             port->source = strdup(bundle_data->control_port);
         } else {
             /* If we wanted to respect PCMK_remote_port, we could use
              * crm_default_remote_port() here and elsewhere in this file instead
              * of DEFAULT_REMOTE_PORT.
              *
              * However, it gains nothing, since we control both the container
              * environment and the connection resource parameters, and the user
              * can use a different port if desired by setting control-port.
              */
             port->source = pcmk__itoa(DEFAULT_REMOTE_PORT);
         }
         port->target = strdup(port->source);
         bundle_data->ports = g_list_append(bundle_data->ports, port);
 
         buffer = g_string_sized_new(1024);
         for (childIter = bundle_data->child->children; childIter != NULL;
              childIter = childIter->next) {
 
             pe__bundle_replica_t *replica = calloc(1, sizeof(pe__bundle_replica_t));
 
             replica->child = childIter->data;
             replica->child->exclusive_discover = TRUE;
             replica->offset = lpc++;
 
             // Ensure the child's notify gets set based on the underlying primitive's value
             if (pcmk_is_set(replica->child->flags, pcmk_rsc_notify)) {
                 pe__set_resource_flags(bundle_data->child, pcmk_rsc_notify);
             }
 
             allocate_ip(bundle_data, replica, buffer);
             bundle_data->replicas = g_list_append(bundle_data->replicas,
                                                   replica);
             bundle_data->attribute_target = g_hash_table_lookup(replica->child->meta,
                                                                 XML_RSC_ATTR_TARGET);
         }
         bundle_data->container_host_options = g_string_free(buffer, FALSE);
 
         if (bundle_data->attribute_target) {
             g_hash_table_replace(rsc->meta, strdup(XML_RSC_ATTR_TARGET),
                                  strdup(bundle_data->attribute_target));
             g_hash_table_replace(bundle_data->child->meta,
                                  strdup(XML_RSC_ATTR_TARGET),
                                  strdup(bundle_data->attribute_target));
         }
 
     } else {
         // Just a naked container, no pacemaker-remote
         GString *buffer = g_string_sized_new(1024);
 
         for (int lpc = 0; lpc < bundle_data->nreplicas; lpc++) {
             pe__bundle_replica_t *replica = calloc(1, sizeof(pe__bundle_replica_t));
 
             replica->offset = lpc;
             allocate_ip(bundle_data, replica, buffer);
             bundle_data->replicas = g_list_append(bundle_data->replicas,
                                                   replica);
         }
         bundle_data->container_host_options = g_string_free(buffer, FALSE);
     }
 
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
 
         if (create_replica_resources(rsc, bundle_data, replica) != pcmk_rc_ok) {
             pe_err("Failed unpacking resource %s", rsc->id);
             rsc->fns->free(rsc);
             return FALSE;
         }
 
         /* Utilization needs special handling for bundles. It makes no sense for
          * the inner primitive to have utilization, because it is tied
          * one-to-one to the guest node created by the container resource -- and
          * there's no way to set capacities for that guest node anyway.
          *
          * What the user really wants is to configure utilization for the
          * container. However, the schema only allows utilization for
          * primitives, and the container resource is implicit anyway, so the
          * user can *only* configure utilization for the inner primitive. If
          * they do, move the primitive's utilization values to the container.
          *
          * @TODO This means that bundles without an inner primitive can't have
          * utilization. An alternative might be to allow utilization values in
          * the top-level bundle XML in the schema, and copy those to each
          * container.
          */
         if (replica->child != NULL) {
             GHashTable *empty = replica->container->utilization;
 
             replica->container->utilization = replica->child->utilization;
             replica->child->utilization = empty;
         }
     }
 
     if (bundle_data->child) {
         rsc->children = g_list_append(rsc->children, bundle_data->child);
     }
     return TRUE;
 }
 
 static int
 replica_resource_active(pcmk_resource_t *rsc, gboolean all)
 {
     if (rsc) {
         gboolean child_active = rsc->fns->active(rsc, all);
 
         if (child_active && !all) {
             return TRUE;
         } else if (!child_active && all) {
             return FALSE;
         }
     }
     return -1;
 }
 
 gboolean
 pe__bundle_active(pcmk_resource_t *rsc, gboolean all)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
     GList *iter = NULL;
 
     get_bundle_variant_data(bundle_data, rsc);
     for (iter = bundle_data->replicas; iter != NULL; iter = iter->next) {
         pe__bundle_replica_t *replica = iter->data;
         int rsc_active;
 
         rsc_active = replica_resource_active(replica->ip, all);
         if (rsc_active >= 0) {
             return (gboolean) rsc_active;
         }
 
         rsc_active = replica_resource_active(replica->child, all);
         if (rsc_active >= 0) {
             return (gboolean) rsc_active;
         }
 
         rsc_active = replica_resource_active(replica->container, all);
         if (rsc_active >= 0) {
             return (gboolean) rsc_active;
         }
 
         rsc_active = replica_resource_active(replica->remote, all);
         if (rsc_active >= 0) {
             return (gboolean) rsc_active;
         }
     }
 
     /* If "all" is TRUE, we've already checked that no resources were inactive,
      * so return TRUE; if "all" is FALSE, we didn't find any active resources,
      * so return FALSE.
      */
     return all;
 }
 
 /*!
  * \internal
  * \brief Find the bundle replica corresponding to a given node
  *
  * \param[in] bundle  Top-level bundle resource
  * \param[in] node    Node to search for
  *
  * \return Bundle replica if found, NULL otherwise
  */
 pcmk_resource_t *
 pe__find_bundle_replica(const pcmk_resource_t *bundle, const pcmk_node_t *node)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
     CRM_ASSERT(bundle && node);
 
     get_bundle_variant_data(bundle_data, bundle);
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
 
         CRM_ASSERT(replica && replica->node);
         if (replica->node->details == node->details) {
             return replica->child;
         }
     }
     return NULL;
 }
 
 /*!
  * \internal
  * \deprecated This function will be removed in a future release
  */
 static void
 print_rsc_in_list(pcmk_resource_t *rsc, const char *pre_text, long options,
                   void *print_data)
 {
     if (rsc != NULL) {
         if (options & pe_print_html) {
             status_print("<li>");
         }
         rsc->fns->print(rsc, pre_text, options, print_data);
         if (options & pe_print_html) {
             status_print("</li>\n");
         }
     }
 }
 
 /*!
  * \internal
  * \deprecated This function will be removed in a future release
  */
 static void
 bundle_print_xml(pcmk_resource_t *rsc, const char *pre_text, long options,
                  void *print_data)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
     char *child_text = NULL;
     CRM_CHECK(rsc != NULL, return);
 
     if (pre_text == NULL) {
         pre_text = "";
     }
     child_text = crm_strdup_printf("%s        ", pre_text);
 
     get_bundle_variant_data(bundle_data, rsc);
 
     status_print("%s<bundle ", pre_text);
     status_print(XML_ATTR_ID "=\"%s\" ", rsc->id);
     status_print("type=\"%s\" ", container_agent_str(bundle_data->agent_type));
     status_print("image=\"%s\" ", bundle_data->image);
     status_print("unique=\"%s\" ", pe__rsc_bool_str(rsc, pcmk_rsc_unique));
     status_print("managed=\"%s\" ",
                  pe__rsc_bool_str(rsc, pcmk_rsc_managed));
     status_print("failed=\"%s\" ", pe__rsc_bool_str(rsc, pcmk_rsc_failed));
     status_print(">\n");
 
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
 
         CRM_ASSERT(replica);
         status_print("%s    <replica " XML_ATTR_ID "=\"%d\">\n",
                      pre_text, replica->offset);
         print_rsc_in_list(replica->ip, child_text, options, print_data);
         print_rsc_in_list(replica->child, child_text, options, print_data);
         print_rsc_in_list(replica->container, child_text, options, print_data);
         print_rsc_in_list(replica->remote, child_text, options, print_data);
         status_print("%s    </replica>\n", pre_text);
     }
     status_print("%s</bundle>\n", pre_text);
     free(child_text);
 }
 
 PCMK__OUTPUT_ARGS("bundle", "uint32_t", "pcmk_resource_t *", "GList *",
                   "GList *")
 int
 pe__bundle_xml(pcmk__output_t *out, va_list args)
 {
     uint32_t show_opts = va_arg(args, uint32_t);
     pcmk_resource_t *rsc = va_arg(args, pcmk_resource_t *);
     GList *only_node = va_arg(args, GList *);
     GList *only_rsc = va_arg(args, GList *);
 
     pe__bundle_variant_data_t *bundle_data = NULL;
     int rc = pcmk_rc_no_output;
     gboolean printed_header = FALSE;
     gboolean print_everything = TRUE;
 
     const char *desc = NULL;
 
     CRM_ASSERT(rsc != NULL);
     
     get_bundle_variant_data(bundle_data, rsc);
 
     if (rsc->fns->is_filtered(rsc, only_rsc, TRUE)) {
         return rc;
     }
 
     print_everything = pcmk__str_in_list(rsc->id, only_rsc, pcmk__str_star_matches);
 
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
         char *id = NULL;
         gboolean print_ip, print_child, print_ctnr, print_remote;
 
         CRM_ASSERT(replica);
 
         if (pcmk__rsc_filtered_by_node(replica->container, only_node)) {
             continue;
         }
 
         print_ip = replica->ip != NULL &&
                    !replica->ip->fns->is_filtered(replica->ip, only_rsc, print_everything);
         print_child = replica->child != NULL &&
                       !replica->child->fns->is_filtered(replica->child, only_rsc, print_everything);
         print_ctnr = !replica->container->fns->is_filtered(replica->container, only_rsc, print_everything);
         print_remote = replica->remote != NULL &&
                        !replica->remote->fns->is_filtered(replica->remote, only_rsc, print_everything);
 
         if (!print_everything && !print_ip && !print_child && !print_ctnr && !print_remote) {
             continue;
         }
 
         if (!printed_header) {
             printed_header = TRUE;
 
             desc = pe__resource_description(rsc, show_opts);
 
             rc = pe__name_and_nvpairs_xml(out, true, "bundle", 8,
                      "id", rsc->id,
                      "type", container_agent_str(bundle_data->agent_type),
                      "image", bundle_data->image,
                      "unique", pe__rsc_bool_str(rsc, pcmk_rsc_unique),
                      "maintenance",
                      pe__rsc_bool_str(rsc, pcmk_rsc_maintenance),
                      "managed", pe__rsc_bool_str(rsc, pcmk_rsc_managed),
                      "failed", pe__rsc_bool_str(rsc, pcmk_rsc_failed),
                      "description", desc);
             CRM_ASSERT(rc == pcmk_rc_ok);
         }
 
         id = pcmk__itoa(replica->offset);
         rc = pe__name_and_nvpairs_xml(out, true, "replica", 1, "id", id);
         free(id);
         CRM_ASSERT(rc == pcmk_rc_ok);
 
         if (print_ip) {
             out->message(out, crm_map_element_name(replica->ip->xml), show_opts,
                          replica->ip, only_node, only_rsc);
         }
 
         if (print_child) {
             out->message(out, crm_map_element_name(replica->child->xml), show_opts,
                          replica->child, only_node, only_rsc);
         }
 
         if (print_ctnr) {
             out->message(out, crm_map_element_name(replica->container->xml), show_opts,
                          replica->container, only_node, only_rsc);
         }
 
         if (print_remote) {
             out->message(out, crm_map_element_name(replica->remote->xml), show_opts,
                          replica->remote, only_node, only_rsc);
         }
 
         pcmk__output_xml_pop_parent(out); // replica
     }
 
     if (printed_header) {
         pcmk__output_xml_pop_parent(out); // bundle
     }
 
     return rc;
 }
 
 static void
 pe__bundle_replica_output_html(pcmk__output_t *out, pe__bundle_replica_t *replica,
                                pcmk_node_t *node, uint32_t show_opts)
 {
     pcmk_resource_t *rsc = replica->child;
 
     int offset = 0;
     char buffer[LINE_MAX];
 
     if(rsc == NULL) {
         rsc = replica->container;
     }
 
     if (replica->remote) {
         offset += snprintf(buffer + offset, LINE_MAX - offset, "%s",
                            rsc_printable_id(replica->remote));
     } else {
         offset += snprintf(buffer + offset, LINE_MAX - offset, "%s",
                            rsc_printable_id(replica->container));
     }
     if (replica->ipaddr) {
         offset += snprintf(buffer + offset, LINE_MAX - offset, " (%s)",
                            replica->ipaddr);
     }
 
     pe__common_output_html(out, rsc, buffer, node, show_opts);
 }
 
 /*!
  * \internal
  * \brief Get a string describing a resource's unmanaged state or lack thereof
  *
  * \param[in] rsc  Resource to describe
  *
  * \return A string indicating that a resource is in maintenance mode or
  *         otherwise unmanaged, or an empty string otherwise
  */
 static const char *
 get_unmanaged_str(const pcmk_resource_t *rsc)
 {
     if (pcmk_is_set(rsc->flags, pcmk_rsc_maintenance)) {
         return " (maintenance)";
     }
     if (!pcmk_is_set(rsc->flags, pcmk_rsc_managed)) {
         return " (unmanaged)";
     }
     return "";
 }
 
 PCMK__OUTPUT_ARGS("bundle", "uint32_t", "pcmk_resource_t *", "GList *",
                   "GList *")
 int
 pe__bundle_html(pcmk__output_t *out, va_list args)
 {
     uint32_t show_opts = va_arg(args, uint32_t);
     pcmk_resource_t *rsc = va_arg(args, pcmk_resource_t *);
     GList *only_node = va_arg(args, GList *);
     GList *only_rsc = va_arg(args, GList *);
 
     const char *desc = NULL;
     pe__bundle_variant_data_t *bundle_data = NULL;
     int rc = pcmk_rc_no_output;
     gboolean print_everything = TRUE;
 
     CRM_ASSERT(rsc != NULL);
 
     get_bundle_variant_data(bundle_data, rsc);
 
     desc = pe__resource_description(rsc, show_opts);
 
     if (rsc->fns->is_filtered(rsc, only_rsc, TRUE)) {
         return rc;
     }
 
     print_everything = pcmk__str_in_list(rsc->id, only_rsc, pcmk__str_star_matches);
 
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
         gboolean print_ip, print_child, print_ctnr, print_remote;
 
         CRM_ASSERT(replica);
 
         if (pcmk__rsc_filtered_by_node(replica->container, only_node)) {
             continue;
         }
 
         print_ip = replica->ip != NULL &&
                    !replica->ip->fns->is_filtered(replica->ip, only_rsc, print_everything);
         print_child = replica->child != NULL &&
                       !replica->child->fns->is_filtered(replica->child, only_rsc, print_everything);
         print_ctnr = !replica->container->fns->is_filtered(replica->container, only_rsc, print_everything);
         print_remote = replica->remote != NULL &&
                        !replica->remote->fns->is_filtered(replica->remote, only_rsc, print_everything);
 
         if (pcmk_is_set(show_opts, pcmk_show_implicit_rscs) ||
             (print_everything == FALSE && (print_ip || print_child || print_ctnr || print_remote))) {
             /* The text output messages used below require pe_print_implicit to
              * be set to do anything.
              */
             uint32_t new_show_opts = show_opts | pcmk_show_implicit_rscs;
 
             PCMK__OUTPUT_LIST_HEADER(out, FALSE, rc, "Container bundle%s: %s [%s]%s%s%s%s%s",
                                      (bundle_data->nreplicas > 1)? " set" : "",
                                      rsc->id, bundle_data->image,
                                      pcmk_is_set(rsc->flags, pcmk_rsc_unique)? " (unique)" : "",
                                      desc ? " (" : "", desc ? desc : "", desc ? ")" : "",
                                      get_unmanaged_str(rsc));
 
             if (pcmk__list_of_multiple(bundle_data->replicas)) {
                 out->begin_list(out, NULL, NULL, "Replica[%d]", replica->offset);
             }
 
             if (print_ip) {
                 out->message(out, crm_map_element_name(replica->ip->xml),
                              new_show_opts, replica->ip, only_node, only_rsc);
             }
 
             if (print_child) {
                 out->message(out, crm_map_element_name(replica->child->xml),
                              new_show_opts, replica->child, only_node, only_rsc);
             }
 
             if (print_ctnr) {
                 out->message(out, crm_map_element_name(replica->container->xml),
                              new_show_opts, replica->container, only_node, only_rsc);
             }
 
             if (print_remote) {
                 out->message(out, crm_map_element_name(replica->remote->xml),
                              new_show_opts, replica->remote, only_node, only_rsc);
             }
 
             if (pcmk__list_of_multiple(bundle_data->replicas)) {
                 out->end_list(out);
             }
         } else if (print_everything == FALSE && !(print_ip || print_child || print_ctnr || print_remote)) {
             continue;
         } else {
             PCMK__OUTPUT_LIST_HEADER(out, FALSE, rc, "Container bundle%s: %s [%s]%s%s%s%s%s",
                                      (bundle_data->nreplicas > 1)? " set" : "",
                                      rsc->id, bundle_data->image,
                                      pcmk_is_set(rsc->flags, pcmk_rsc_unique)? " (unique)" : "",
                                      desc ? " (" : "", desc ? desc : "", desc ? ")" : "",
                                      get_unmanaged_str(rsc));
 
             pe__bundle_replica_output_html(out, replica, pe__current_node(replica->container),
                                            show_opts);
         }
     }
 
     PCMK__OUTPUT_LIST_FOOTER(out, rc);
     return rc;
 }
 
 static void
 pe__bundle_replica_output_text(pcmk__output_t *out, pe__bundle_replica_t *replica,
                                pcmk_node_t *node, uint32_t show_opts)
 {
     const pcmk_resource_t *rsc = replica->child;
 
     int offset = 0;
     char buffer[LINE_MAX];
 
     if(rsc == NULL) {
         rsc = replica->container;
     }
 
     if (replica->remote) {
         offset += snprintf(buffer + offset, LINE_MAX - offset, "%s",
                            rsc_printable_id(replica->remote));
     } else {
         offset += snprintf(buffer + offset, LINE_MAX - offset, "%s",
                            rsc_printable_id(replica->container));
     }
     if (replica->ipaddr) {
         offset += snprintf(buffer + offset, LINE_MAX - offset, " (%s)",
                            replica->ipaddr);
     }
 
     pe__common_output_text(out, rsc, buffer, node, show_opts);
 }
 
 PCMK__OUTPUT_ARGS("bundle", "uint32_t", "pcmk_resource_t *", "GList *",
                   "GList *")
 int
 pe__bundle_text(pcmk__output_t *out, va_list args)
 {
     uint32_t show_opts = va_arg(args, uint32_t);
     pcmk_resource_t *rsc = va_arg(args, pcmk_resource_t *);
     GList *only_node = va_arg(args, GList *);
     GList *only_rsc = va_arg(args, GList *);
 
     const char *desc = NULL;
     pe__bundle_variant_data_t *bundle_data = NULL;
     int rc = pcmk_rc_no_output;
     gboolean print_everything = TRUE;
 
     desc = pe__resource_description(rsc, show_opts);
     
     get_bundle_variant_data(bundle_data, rsc);
 
     CRM_ASSERT(rsc != NULL);
 
     if (rsc->fns->is_filtered(rsc, only_rsc, TRUE)) {
         return rc;
     }
 
     print_everything = pcmk__str_in_list(rsc->id, only_rsc, pcmk__str_star_matches);
 
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
         gboolean print_ip, print_child, print_ctnr, print_remote;
 
         CRM_ASSERT(replica);
 
         if (pcmk__rsc_filtered_by_node(replica->container, only_node)) {
             continue;
         }
 
         print_ip = replica->ip != NULL &&
                    !replica->ip->fns->is_filtered(replica->ip, only_rsc, print_everything);
         print_child = replica->child != NULL &&
                       !replica->child->fns->is_filtered(replica->child, only_rsc, print_everything);
         print_ctnr = !replica->container->fns->is_filtered(replica->container, only_rsc, print_everything);
         print_remote = replica->remote != NULL &&
                        !replica->remote->fns->is_filtered(replica->remote, only_rsc, print_everything);
 
         if (pcmk_is_set(show_opts, pcmk_show_implicit_rscs) ||
             (print_everything == FALSE && (print_ip || print_child || print_ctnr || print_remote))) {
             /* The text output messages used below require pe_print_implicit to
              * be set to do anything.
              */
             uint32_t new_show_opts = show_opts | pcmk_show_implicit_rscs;
 
             PCMK__OUTPUT_LIST_HEADER(out, FALSE, rc, "Container bundle%s: %s [%s]%s%s%s%s%s",
                                      (bundle_data->nreplicas > 1)? " set" : "",
                                      rsc->id, bundle_data->image,
                                      pcmk_is_set(rsc->flags, pcmk_rsc_unique)? " (unique)" : "",
                                      desc ? " (" : "", desc ? desc : "", desc ? ")" : "",
                                      get_unmanaged_str(rsc));
 
             if (pcmk__list_of_multiple(bundle_data->replicas)) {
                 out->list_item(out, NULL, "Replica[%d]", replica->offset);
             }
 
             out->begin_list(out, NULL, NULL, NULL);
 
             if (print_ip) {
                 out->message(out, crm_map_element_name(replica->ip->xml),
                              new_show_opts, replica->ip, only_node, only_rsc);
             }
 
             if (print_child) {
                 out->message(out, crm_map_element_name(replica->child->xml),
                              new_show_opts, replica->child, only_node, only_rsc);
             }
 
             if (print_ctnr) {
                 out->message(out, crm_map_element_name(replica->container->xml),
                              new_show_opts, replica->container, only_node, only_rsc);
             }
 
             if (print_remote) {
                 out->message(out, crm_map_element_name(replica->remote->xml),
                              new_show_opts, replica->remote, only_node, only_rsc);
             }
 
             out->end_list(out);
         } else if (print_everything == FALSE && !(print_ip || print_child || print_ctnr || print_remote)) {
             continue;
         } else {
             PCMK__OUTPUT_LIST_HEADER(out, FALSE, rc, "Container bundle%s: %s [%s]%s%s%s%s%s",
                                      (bundle_data->nreplicas > 1)? " set" : "",
                                      rsc->id, bundle_data->image,
                                      pcmk_is_set(rsc->flags, pcmk_rsc_unique)? " (unique)" : "",
                                      desc ? " (" : "", desc ? desc : "", desc ? ")" : "",
                                      get_unmanaged_str(rsc));
 
             pe__bundle_replica_output_text(out, replica, pe__current_node(replica->container),
                                            show_opts);
         }
     }
 
     PCMK__OUTPUT_LIST_FOOTER(out, rc);
     return rc;
 }
 
 /*!
  * \internal
  * \deprecated This function will be removed in a future release
  */
 static void
 print_bundle_replica(pe__bundle_replica_t *replica, const char *pre_text,
                      long options, void *print_data)
 {
     pcmk_node_t *node = NULL;
     pcmk_resource_t *rsc = replica->child;
 
     int offset = 0;
     char buffer[LINE_MAX];
 
     if(rsc == NULL) {
         rsc = replica->container;
     }
 
     if (replica->remote) {
         offset += snprintf(buffer + offset, LINE_MAX - offset, "%s",
                            rsc_printable_id(replica->remote));
     } else {
         offset += snprintf(buffer + offset, LINE_MAX - offset, "%s",
                            rsc_printable_id(replica->container));
     }
     if (replica->ipaddr) {
         offset += snprintf(buffer + offset, LINE_MAX - offset, " (%s)",
                            replica->ipaddr);
     }
 
     node = pe__current_node(replica->container);
     common_print(rsc, pre_text, buffer, node, options, print_data);
 }
 
 /*!
  * \internal
  * \deprecated This function will be removed in a future release
  */
 void
 pe__print_bundle(pcmk_resource_t *rsc, const char *pre_text, long options,
                  void *print_data)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
     char *child_text = NULL;
     CRM_CHECK(rsc != NULL, return);
 
     if (options & pe_print_xml) {
         bundle_print_xml(rsc, pre_text, options, print_data);
         return;
     }
 
     get_bundle_variant_data(bundle_data, rsc);
 
     if (pre_text == NULL) {
         pre_text = " ";
     }
 
     status_print("%sContainer bundle%s: %s [%s]%s%s\n",
                  pre_text, ((bundle_data->nreplicas > 1)? " set" : ""),
                  rsc->id, bundle_data->image,
                  pcmk_is_set(rsc->flags, pcmk_rsc_unique)? " (unique)" : "",
                  pcmk_is_set(rsc->flags, pcmk_rsc_managed)? "" : " (unmanaged)");
     if (options & pe_print_html) {
         status_print("<br />\n<ul>\n");
     }
 
 
     for (GList *gIter = bundle_data->replicas; gIter != NULL;
          gIter = gIter->next) {
         pe__bundle_replica_t *replica = gIter->data;
 
         CRM_ASSERT(replica);
         if (options & pe_print_html) {
             status_print("<li>");
         }
 
         if (pcmk_is_set(options, pe_print_implicit)) {
             child_text = crm_strdup_printf("     %s", pre_text);
             if (pcmk__list_of_multiple(bundle_data->replicas)) {
                 status_print("  %sReplica[%d]\n", pre_text, replica->offset);
             }
             if (options & pe_print_html) {
                 status_print("<br />\n<ul>\n");
             }
             print_rsc_in_list(replica->ip, child_text, options, print_data);
             print_rsc_in_list(replica->container, child_text, options, print_data);
             print_rsc_in_list(replica->remote, child_text, options, print_data);
             print_rsc_in_list(replica->child, child_text, options, print_data);
             if (options & pe_print_html) {
                 status_print("</ul>\n");
             }
         } else {
             child_text = crm_strdup_printf("%s  ", pre_text);
             print_bundle_replica(replica, child_text, options, print_data);
         }
         free(child_text);
 
         if (options & pe_print_html) {
             status_print("</li>\n");
         }
     }
     if (options & pe_print_html) {
         status_print("</ul>\n");
     }
 }
 
 static void
 free_bundle_replica(pe__bundle_replica_t *replica)
 {
     if (replica == NULL) {
         return;
     }
 
     if (replica->node) {
         free(replica->node);
         replica->node = NULL;
     }
 
     if (replica->ip) {
         free_xml(replica->ip->xml);
         replica->ip->xml = NULL;
         replica->ip->fns->free(replica->ip);
         replica->ip = NULL;
     }
     if (replica->container) {
         free_xml(replica->container->xml);
         replica->container->xml = NULL;
         replica->container->fns->free(replica->container);
         replica->container = NULL;
     }
     if (replica->remote) {
         free_xml(replica->remote->xml);
         replica->remote->xml = NULL;
         replica->remote->fns->free(replica->remote);
         replica->remote = NULL;
     }
     free(replica->ipaddr);
     free(replica);
 }
 
 void
 pe__free_bundle(pcmk_resource_t *rsc)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
     CRM_CHECK(rsc != NULL, return);
 
     get_bundle_variant_data(bundle_data, rsc);
     pe_rsc_trace(rsc, "Freeing %s", rsc->id);
 
     free(bundle_data->prefix);
     free(bundle_data->image);
     free(bundle_data->control_port);
     free(bundle_data->host_network);
     free(bundle_data->host_netmask);
     free(bundle_data->ip_range_start);
     free(bundle_data->container_network);
     free(bundle_data->launcher_options);
     free(bundle_data->container_command);
     g_free(bundle_data->container_host_options);
 
     g_list_free_full(bundle_data->replicas,
                      (GDestroyNotify) free_bundle_replica);
     g_list_free_full(bundle_data->mounts, (GDestroyNotify)mount_free);
     g_list_free_full(bundle_data->ports, (GDestroyNotify)port_free);
     g_list_free(rsc->children);
 
     if(bundle_data->child) {
         free_xml(bundle_data->child->xml);
         bundle_data->child->xml = NULL;
         bundle_data->child->fns->free(bundle_data->child);
     }
     common_free(rsc);
 }
 
 enum rsc_role_e
 pe__bundle_resource_state(const pcmk_resource_t *rsc, gboolean current)
 {
     enum rsc_role_e container_role = pcmk_role_unknown;
     return container_role;
 }
 
 /*!
  * \brief Get the number of configured replicas in a bundle
  *
  * \param[in] rsc  Bundle resource
  *
  * \return Number of configured replicas, or 0 on error
  */
 int
 pe_bundle_replicas(const pcmk_resource_t *rsc)
 {
     if ((rsc == NULL) || (rsc->variant != pcmk_rsc_variant_bundle)) {
         return 0;
     } else {
         pe__bundle_variant_data_t *bundle_data = NULL;
 
         get_bundle_variant_data(bundle_data, rsc);
         return bundle_data->nreplicas;
     }
 }
 
 void
 pe__count_bundle(pcmk_resource_t *rsc)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, rsc);
     for (GList *item = bundle_data->replicas; item != NULL; item = item->next) {
         pe__bundle_replica_t *replica = item->data;
 
         if (replica->ip) {
             replica->ip->fns->count(replica->ip);
         }
         if (replica->child) {
             replica->child->fns->count(replica->child);
         }
         if (replica->container) {
             replica->container->fns->count(replica->container);
         }
         if (replica->remote) {
             replica->remote->fns->count(replica->remote);
         }
     }
 }
 
 gboolean
 pe__bundle_is_filtered(const pcmk_resource_t *rsc, GList *only_rsc,
                        gboolean check_parent)
 {
     gboolean passes = FALSE;
     pe__bundle_variant_data_t *bundle_data = NULL;
 
     if (pcmk__str_in_list(rsc_printable_id(rsc), only_rsc, pcmk__str_star_matches)) {
         passes = TRUE;
     } else {
         get_bundle_variant_data(bundle_data, rsc);
 
         for (GList *gIter = bundle_data->replicas; gIter != NULL; gIter = gIter->next) {
             pe__bundle_replica_t *replica = gIter->data;
 
             if (replica->ip != NULL && !replica->ip->fns->is_filtered(replica->ip, only_rsc, FALSE)) {
                 passes = TRUE;
                 break;
             } else if (replica->child != NULL && !replica->child->fns->is_filtered(replica->child, only_rsc, FALSE)) {
                 passes = TRUE;
                 break;
             } else if (!replica->container->fns->is_filtered(replica->container, only_rsc, FALSE)) {
                 passes = TRUE;
                 break;
             } else if (replica->remote != NULL && !replica->remote->fns->is_filtered(replica->remote, only_rsc, FALSE)) {
                 passes = TRUE;
                 break;
             }
         }
     }
 
     return !passes;
 }
 
 /*!
  * \internal
  * \brief Get a list of a bundle's containers
  *
  * \param[in] bundle  Bundle resource
  *
  * \return Newly created list of \p bundle's containers
  * \note It is the caller's responsibility to free the result with
  *       g_list_free().
  */
 GList *
 pe__bundle_containers(const pcmk_resource_t *bundle)
 {
     GList *containers = NULL;
     const pe__bundle_variant_data_t *data = NULL;
 
     get_bundle_variant_data(data, bundle);
     for (GList *iter = data->replicas; iter != NULL; iter = iter->next) {
         pe__bundle_replica_t *replica = iter->data;
 
         containers = g_list_append(containers, replica->container);
     }
     return containers;
 }
 
 // Bundle implementation of resource_object_functions_t:active_node()
 pcmk_node_t *
 pe__bundle_active_node(const pcmk_resource_t *rsc, unsigned int *count_all,
                        unsigned int *count_clean)
 {
     pcmk_node_t *active = NULL;
     pcmk_node_t *node = NULL;
     pcmk_resource_t *container = NULL;
     GList *containers = NULL;
     GList *iter = NULL;
     GHashTable *nodes = NULL;
     const pe__bundle_variant_data_t *data = NULL;
 
     if (count_all != NULL) {
         *count_all = 0;
     }
     if (count_clean != NULL) {
         *count_clean = 0;
     }
     if (rsc == NULL) {
         return NULL;
     }
 
     /* For the purposes of this method, we only care about where the bundle's
      * containers are active, so build a list of active containers.
      */
     get_bundle_variant_data(data, rsc);
     for (iter = data->replicas; iter != NULL; iter = iter->next) {
         pe__bundle_replica_t *replica = iter->data;
 
         if (replica->container->running_on != NULL) {
             containers = g_list_append(containers, replica->container);
         }
     }
     if (containers == NULL) {
         return NULL;
     }
 
     /* If the bundle has only a single active container, just use that
      * container's method. If live migration is ever supported for bundle
      * containers, this will allow us to prefer the migration source when there
      * is only one container and it is migrating. For now, this just lets us
      * avoid creating the nodes table.
      */
     if (pcmk__list_of_1(containers)) {
         container = containers->data;
         node = container->fns->active_node(container, count_all, count_clean);
         g_list_free(containers);
         return node;
     }
 
     // Add all containers' active nodes to a hash table (for uniqueness)
     nodes = g_hash_table_new(NULL, NULL);
     for (iter = containers; iter != NULL; iter = iter->next) {
         container = iter->data;
 
         for (GList *node_iter = container->running_on; node_iter != NULL;
              node_iter = node_iter->next) {
             node = node_iter->data;
 
             // If insert returns true, we haven't counted this node yet
             if (g_hash_table_insert(nodes, (gpointer) node->details,
                                     (gpointer) node)
                 && !pe__count_active_node(rsc, node, &active, count_all,
                                           count_clean)) {
                 goto done;
             }
         }
     }
 
 done:
     g_list_free(containers);
     g_hash_table_destroy(nodes);
     return active;
 }
 
 /*!
  * \internal
  * \brief Get maximum bundle resource instances per node
  *
  * \param[in] rsc  Bundle resource to check
  *
  * \return Maximum number of \p rsc instances that can be active on one node
  */
 unsigned int
 pe__bundle_max_per_node(const pcmk_resource_t *rsc)
 {
     pe__bundle_variant_data_t *bundle_data = NULL;
 
     get_bundle_variant_data(bundle_data, rsc);
     CRM_ASSERT(bundle_data->nreplicas_per_host >= 0);
     return (unsigned int) bundle_data->nreplicas_per_host;
 }