Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4511708
remote.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
36 KB
Referenced Files
None
Subscribers
None
remote.c
View Options
/*
* Copyright (c) 2008 Andrew Beekhof
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#include <crm_internal.h>
#include <crm/crm.h>
#include <sys/param.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <stdlib.h>
#include <errno.h>
#include <glib.h>
#include <bzlib.h>
#include <crm/common/ipcs.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
#include <crm/common/remote_internal.h>
#ifdef HAVE_GNUTLS_GNUTLS_H
# undef KEYFILE
# include <gnutls/gnutls.h>
const int psk_tls_kx_order[] = {
GNUTLS_KX_DHE_PSK,
GNUTLS_KX_PSK,
};
const int anon_tls_kx_order[] = {
GNUTLS_KX_ANON_DH,
GNUTLS_KX_DHE_RSA,
GNUTLS_KX_DHE_DSS,
GNUTLS_KX_RSA,
0
};
#endif
/* Swab macros from linux/swab.h */
#ifdef HAVE_LINUX_SWAB_H
# include <linux/swab.h>
#else
/*
* casts are necessary for constants, because we never know how for sure
* how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
*/
#define __swab16(x) ((uint16_t)( \
(((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
(((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
#define __swab32(x) ((uint32_t)( \
(((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
(((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
(((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
(((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
#define __swab64(x) ((uint64_t)( \
(((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
(((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
(((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
(((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
(((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
(((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
(((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
(((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
#endif
#define REMOTE_MSG_VERSION 1
#define ENDIAN_LOCAL 0xBADADBBD
struct crm_remote_header_v0
{
uint32_t endian; /* Detect messages from hosts with different endian-ness */
uint32_t version;
uint64_t id;
uint64_t flags;
uint32_t size_total;
uint32_t payload_offset;
uint32_t payload_compressed;
uint32_t payload_uncompressed;
/* New fields get added here */
} __attribute__ ((packed));
static struct crm_remote_header_v0 *
crm_remote_header(crm_remote_t * remote)
{
struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
return NULL;
} else if(header->endian != ENDIAN_LOCAL) {
uint32_t endian = __swab32(header->endian);
CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
if(endian != ENDIAN_LOCAL) {
crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
ENDIAN_LOCAL, header->endian, endian);
return NULL;
}
header->id = __swab64(header->id);
header->flags = __swab64(header->flags);
header->endian = __swab32(header->endian);
header->version = __swab32(header->version);
header->size_total = __swab32(header->size_total);
header->payload_offset = __swab32(header->payload_offset);
header->payload_compressed = __swab32(header->payload_compressed);
header->payload_uncompressed = __swab32(header->payload_uncompressed);
}
return header;
}
#ifdef HAVE_GNUTLS_GNUTLS_H
int
crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
{
int rc = 0;
int pollrc = 0;
time_t start = time(NULL);
do {
rc = gnutls_handshake(*remote->tls_session);
if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
pollrc = crm_remote_ready(remote, 1000);
if (pollrc < 0) {
/* poll returned error, there is no hope */
rc = -1;
}
}
} while (((time(NULL) - start) < (timeout_ms / 1000)) &&
(rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
if (rc < 0) {
crm_trace("gnutls_handshake() failed with %d", rc);
}
return rc;
}
/*!
* \internal
* \brief Set minimum prime size required by TLS client
*
* \param[in] session TLS session to affect
*/
static void
pcmk__set_minimum_dh_bits(gnutls_session_t *session)
{
const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
if (dh_min_bits_s) {
int dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
/* This function is deprecated since GnuTLS 3.1.7, in favor of letting
* the priority string imply the DH requirements, but this is the only
* way to give the user control over compatibility with older servers.
*/
if (dh_min_bits > 0) {
crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
dh_min_bits);
gnutls_dh_set_prime_bits(*session, dh_min_bits);
}
}
}
static unsigned int
pcmk__bound_dh_bits(unsigned int dh_bits)
{
const char *dh_min_bits_s = getenv("PCMK_dh_min_bits");
const char *dh_max_bits_s = getenv("PCMK_dh_max_bits");
int dh_min_bits = 0;
int dh_max_bits = 0;
if (dh_min_bits_s) {
dh_min_bits = crm_parse_int(dh_min_bits_s, "0");
}
if (dh_max_bits_s) {
dh_max_bits = crm_parse_int(dh_max_bits_s, "0");
if ((dh_min_bits > 0) && (dh_max_bits > 0)
&& (dh_max_bits < dh_min_bits)) {
crm_warn("Ignoring PCMK_dh_max_bits because it is less than PCMK_dh_min_bits");
dh_max_bits = 0;
}
}
if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
return dh_min_bits;
}
if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
return dh_max_bits;
}
return dh_bits;
}
/*!
* \internal
* \brief Initialize a new TLS session
*
* \param[in] csock Connected socket for TLS session
* \param[in] conn_type GNUTLS_SERVER or GNUTLS_CLIENT
* \param[in] cred_type GNUTLS_CRD_ANON or GNUTLS_CRD_PSK
* \param[in] credentials TLS session credentials
*
* \return Pointer to newly created session object, or NULL on error
*/
gnutls_session_t *
pcmk__new_tls_session(int csock, unsigned int conn_type,
gnutls_credentials_type_t cred_type, void *credentials)
{
int rc = GNUTLS_E_SUCCESS;
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
const char *prio_base = NULL;
char *prio = NULL;
# endif
gnutls_session_t *session = NULL;
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
/* Determine list of acceptable ciphers, etc. Pacemaker always adds the
* values required for its functionality.
*
* For an example of anonymous authentication, see:
* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
*/
prio_base = getenv("PCMK_tls_priorities");
if (prio_base == NULL) {
prio_base = PCMK_GNUTLS_PRIORITIES;
}
prio = crm_strdup_printf("%s:%s", prio_base,
(cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
# endif
session = gnutls_malloc(sizeof(gnutls_session_t));
if (session == NULL) {
rc = GNUTLS_E_MEMORY_ERROR;
goto error;
}
rc = gnutls_init(session, conn_type);
if (rc != GNUTLS_E_SUCCESS) {
goto error;
}
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
/* @TODO On the server side, it would be more efficient to cache the
* priority with gnutls_priority_init2() and set it with
* gnutls_priority_set() for all sessions.
*/
rc = gnutls_priority_set_direct(*session, prio, NULL);
if (rc != GNUTLS_E_SUCCESS) {
goto error;
}
if (conn_type == GNUTLS_CLIENT) {
pcmk__set_minimum_dh_bits(session);
}
# else
gnutls_set_default_priority(*session);
gnutls_kx_set_priority(*session, (cred_type == GNUTLS_CRD_ANON)? anon_tls_kx_order : psk_tls_kx_order);
# endif
gnutls_transport_set_ptr(*session,
(gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
rc = gnutls_credentials_set(*session, cred_type, credentials);
if (rc != GNUTLS_E_SUCCESS) {
goto error;
}
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
free(prio);
# endif
return session;
error:
{
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
const char *prio_s = prio;
# else
const char *prio_s = "default";
# endif
crm_err("Could not initialize %s TLS %s session: %s "
CRM_XS " rc=%d priority='%s'",
(cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
(conn_type == GNUTLS_SERVER)? "server" : "client",
gnutls_strerror(rc), rc, prio_s);
}
# ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
free(prio);
# endif
if (session != NULL) {
gnutls_free(session);
}
return NULL;
}
/*!
* \internal
* \brief Initialize Diffie-Hellman parameters for a TLS server
*
* \param[out] dh_params Parameter object to initialize
*
* \return GNUTLS_E_SUCCESS on success, GnuTLS error code on error
* \todo The current best practice is to allow the client and server to
* negotiate the Diffie-Hellman parameters via a TLS extension (RFC 7919).
* However, we have to support both older versions of GnuTLS (<3.6) that
* don't support the extension on our side, and older Pacemaker versions
* that don't support the extension on the other side. The next best
* practice would be to use a known good prime (see RFC 5114 section 2.2),
* possibly stored in a file distributed with Pacemaker.
*/
int
pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
{
int rc = GNUTLS_E_SUCCESS;
unsigned int dh_bits = 0;
rc = gnutls_dh_params_init(dh_params);
if (rc != GNUTLS_E_SUCCESS) {
goto error;
}
#ifdef HAVE_GNUTLS_SEC_PARAM_TO_PK_BITS
dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
GNUTLS_SEC_PARAM_NORMAL);
if (dh_bits == 0) {
rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
goto error;
}
#else
dh_bits = 1024;
#endif
dh_bits = pcmk__bound_dh_bits(dh_bits);
crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
dh_bits);
rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
if (rc != GNUTLS_E_SUCCESS) {
goto error;
}
return rc;
error:
crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
CRM_XS " rc=%d", gnutls_strerror(rc), rc);
CRM_ASSERT(rc == GNUTLS_E_SUCCESS);
return rc;
}
/*!
* \internal
* \brief Process handshake data from TLS client
*
* Read as much TLS handshake data as is available.
*
* \param[in] client Client connection
*
* \retval GnuTLS error code on error
* \retval 0 if more data is needed
* \retval 1 if handshake is successfully completed
*/
int
pcmk__read_handshake_data(crm_client_t *client)
{
int rc = 0;
CRM_ASSERT(client && client->remote && client->remote->tls_session);
do {
rc = gnutls_handshake(*client->remote->tls_session);
} while (rc == GNUTLS_E_INTERRUPTED);
if (rc == GNUTLS_E_AGAIN) {
/* No more data is available at the moment. This function should be
* invoked again once the client sends more.
*/
return 0;
} else if (rc != GNUTLS_E_SUCCESS) {
return rc;
}
return 1;
}
static int
crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
{
const char *unsent = buf;
int rc = 0;
int total_send;
if (buf == NULL) {
return -EINVAL;
}
total_send = len;
crm_trace("Message size: %llu", (unsigned long long) len);
while (TRUE) {
rc = gnutls_record_send(*session, unsent, len);
if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
crm_trace("Retrying to send %llu bytes",
(unsigned long long) len);
} else if (rc < 0) {
crm_err("Connection terminated: %s " CRM_XS " rc=%d",
gnutls_strerror(rc), rc);
rc = -ECONNABORTED;
break;
} else if (rc < len) {
crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
len -= rc;
unsent += rc;
} else {
crm_trace("Sent all %d bytes", rc);
break;
}
}
return rc < 0 ? rc : total_send;
}
#endif
static int
crm_send_plaintext(int sock, const char *buf, size_t len)
{
int rc = 0;
const char *unsent = buf;
int total_send;
if (buf == NULL) {
return -EINVAL;
}
total_send = len;
crm_trace("Message on socket %d: size=%llu",
sock, (unsigned long long) len);
retry:
rc = write(sock, unsent, len);
if (rc < 0) {
rc = -errno;
switch (errno) {
case EINTR:
case EAGAIN:
crm_trace("Retry");
goto retry;
default:
crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
break;
}
} else if (rc < len) {
crm_trace("Only sent %d of %llu remaining bytes",
rc, (unsigned long long) len);
len -= rc;
unsent += rc;
goto retry;
} else {
crm_trace("Sent %d bytes: %.100s", rc, buf);
}
return rc < 0 ? rc : total_send;
}
static int
crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
{
int lpc = 0;
int rc = -ESOCKTNOSUPPORT;
for(; lpc < iovs; lpc++) {
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote->tls_session) {
rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
} else if (remote->tcp_socket) {
#else
if (remote->tcp_socket) {
#endif
rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
} else {
crm_err("Unsupported connection type");
}
}
return rc;
}
int
crm_remote_send(crm_remote_t * remote, xmlNode * msg)
{
int rc = pcmk_ok;
static uint64_t id = 0;
char *xml_text = dump_xml_unformatted(msg);
struct iovec iov[2];
struct crm_remote_header_v0 *header;
if (xml_text == NULL) {
crm_err("Could not send remote message: no message provided");
return -EINVAL;
}
header = calloc(1, sizeof(struct crm_remote_header_v0));
iov[0].iov_base = header;
iov[0].iov_len = sizeof(struct crm_remote_header_v0);
iov[1].iov_base = xml_text;
iov[1].iov_len = 1 + strlen(xml_text);
id++;
header->id = id;
header->endian = ENDIAN_LOCAL;
header->version = REMOTE_MSG_VERSION;
header->payload_offset = iov[0].iov_len;
header->payload_uncompressed = iov[1].iov_len;
header->size_total = iov[0].iov_len + iov[1].iov_len;
crm_trace("Sending len[0]=%d, start=%x",
(int)iov[0].iov_len, *(int*)(void*)xml_text);
rc = crm_remote_sendv(remote, iov, 2);
if (rc < 0) {
crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
pcmk_strerror(rc), rc);
}
free(iov[0].iov_base);
free(iov[1].iov_base);
return rc;
}
/*!
* \internal
* \brief handles the recv buffer and parsing out msgs.
* \note new_data is owned by this function once it is passed in.
*/
xmlNode *
crm_remote_parse_buffer(crm_remote_t * remote)
{
xmlNode *xml = NULL;
struct crm_remote_header_v0 *header = crm_remote_header(remote);
if (remote->buffer == NULL || header == NULL) {
return NULL;
}
/* Support compression on the receiving end now, in case we ever want to add it later */
if (header->payload_compressed) {
int rc = 0;
unsigned int size_u = 1 + header->payload_uncompressed;
char *uncompressed = calloc(1, header->payload_offset + size_u);
crm_trace("Decompressing message data %d bytes into %d bytes",
header->payload_compressed, size_u);
rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
remote->buffer + header->payload_offset,
header->payload_compressed, 1, 0);
if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
crm_warn("Couldn't decompress v%d message, we only understand v%d",
header->version, REMOTE_MSG_VERSION);
free(uncompressed);
return NULL;
} else if (rc != BZ_OK) {
crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
free(uncompressed);
return NULL;
}
CRM_ASSERT(size_u == header->payload_uncompressed);
memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
remote->buffer_size = header->payload_offset + size_u;
free(remote->buffer);
remote->buffer = uncompressed;
header = crm_remote_header(remote);
}
/* take ownership of the buffer */
remote->buffer_offset = 0;
CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
xml = string2xml(remote->buffer + header->payload_offset);
if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
crm_warn("Couldn't parse v%d message, we only understand v%d",
header->version, REMOTE_MSG_VERSION);
} else if (xml == NULL) {
crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
}
return xml;
}
/*!
* \internal
* \brief Wait for a remote session to have data to read
*
* \param[in] remote Connection to check
* \param[in] total_timeout Maximum time (in ms) to wait
*
* \return Positive value if ready to be read, 0 on timeout, -errno on error
*/
int
crm_remote_ready(crm_remote_t *remote, int total_timeout)
{
struct pollfd fds = { 0, };
int sock = 0;
int rc = 0;
time_t start;
int timeout = total_timeout;
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote->tls_session) {
void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
sock = GPOINTER_TO_INT(sock_ptr);
} else if (remote->tcp_socket) {
#else
if (remote->tcp_socket) {
#endif
sock = remote->tcp_socket;
} else {
crm_err("Unsupported connection type");
}
if (sock <= 0) {
crm_trace("No longer connected");
return -ENOTCONN;
}
start = time(NULL);
errno = 0;
do {
fds.fd = sock;
fds.events = POLLIN;
/* If we got an EINTR while polling, and we have a
* specific timeout we are trying to honor, attempt
* to adjust the timeout to the closest second. */
if (errno == EINTR && (timeout > 0)) {
timeout = total_timeout - ((time(NULL) - start) * 1000);
if (timeout < 1000) {
timeout = 1000;
}
}
rc = poll(&fds, 1, timeout);
} while (rc < 0 && errno == EINTR);
return (rc < 0)? -errno : rc;
}
/*!
* \internal
* \brief Read bytes off non blocking remote connection.
*
* \note only use with NON-Blocking sockets. Should only be used after polling socket.
* This function will return once max_size is met, the socket read buffer
* is empty, or an error is encountered.
*
* \retval number of bytes received
*/
static size_t
crm_remote_recv_once(crm_remote_t * remote)
{
int rc = 0;
size_t read_len = sizeof(struct crm_remote_header_v0);
struct crm_remote_header_v0 *header = crm_remote_header(remote);
if(header) {
/* Stop at the end of the current message */
read_len = header->size_total;
}
/* automatically grow the buffer when needed */
if(remote->buffer_size < read_len) {
remote->buffer_size = 2 * read_len;
crm_trace("Expanding buffer to %llu bytes",
(unsigned long long) remote->buffer_size);
remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
CRM_ASSERT(remote->buffer != NULL);
}
#ifdef HAVE_GNUTLS_GNUTLS_H
if (remote->tls_session) {
rc = gnutls_record_recv(*(remote->tls_session),
remote->buffer + remote->buffer_offset,
remote->buffer_size - remote->buffer_offset);
if (rc == GNUTLS_E_INTERRUPTED) {
rc = -EINTR;
} else if (rc == GNUTLS_E_AGAIN) {
rc = -EAGAIN;
} else if (rc < 0) {
crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
rc = -pcmk_err_generic;
}
} else if (remote->tcp_socket) {
#else
if (remote->tcp_socket) {
#endif
errno = 0;
rc = read(remote->tcp_socket,
remote->buffer + remote->buffer_offset,
remote->buffer_size - remote->buffer_offset);
if(rc < 0) {
rc = -errno;
}
} else {
crm_err("Unsupported connection type");
return -ESOCKTNOSUPPORT;
}
/* process any errors. */
if (rc > 0) {
remote->buffer_offset += rc;
/* always null terminate buffer, the +1 to alloc always allows for this. */
remote->buffer[remote->buffer_offset] = '\0';
crm_trace("Received %u more bytes, %llu total",
rc, (unsigned long long) remote->buffer_offset);
} else if (rc == -EINTR || rc == -EAGAIN) {
crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
} else if (rc == 0) {
crm_debug("EOF encoutered after %llu bytes",
(unsigned long long) remote->buffer_offset);
return -ENOTCONN;
} else {
crm_debug("Error receiving message after %llu bytes: %s (%d)",
(unsigned long long) remote->buffer_offset,
pcmk_strerror(rc), rc);
return -ENOTCONN;
}
header = crm_remote_header(remote);
if(header) {
if(remote->buffer_offset < header->size_total) {
crm_trace("Read less than the advertised length: %llu < %u bytes",
(unsigned long long) remote->buffer_offset,
header->size_total);
} else {
crm_trace("Read full message of %llu bytes",
(unsigned long long) remote->buffer_offset);
return remote->buffer_offset;
}
}
return -EAGAIN;
}
/*!
* \internal
* \brief Read message(s) from a remote connection
*
* \param[in] remote Remote connection to read
* \param[in] total_timeout Fail if message not read in this time (ms)
* \param[out] disconnected Will be set to 1 if disconnect detected
*
* \return TRUE if at least one full message read, FALSE otherwise
*/
gboolean
crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
{
int rc;
time_t start = time(NULL);
int remaining_timeout = 0;
if (total_timeout == 0) {
total_timeout = 10000;
} else if (total_timeout < 0) {
total_timeout = 60000;
}
*disconnected = 0;
remaining_timeout = total_timeout;
while ((remaining_timeout > 0) && !(*disconnected)) {
crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
remaining_timeout, total_timeout);
rc = crm_remote_ready(remote, remaining_timeout);
if (rc == 0) {
crm_err("Timed out (%d ms) while waiting for remote data",
remaining_timeout);
return FALSE;
} else if (rc < 0) {
crm_debug("Wait for remote data aborted, will try again: %s "
CRM_XS " rc=%d", pcmk_strerror(rc), rc);
} else {
rc = crm_remote_recv_once(remote);
if (rc > 0) {
return TRUE;
} else if (rc == -EAGAIN) {
crm_trace("Still waiting for remote data");
} else if (rc < 0) {
crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
pcmk_strerror(rc), rc);
}
}
if (rc == -ENOTCONN) {
*disconnected = 1;
return FALSE;
}
remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
}
return FALSE;
}
struct tcp_async_cb_data {
gboolean success;
int sock;
void *userdata;
void (*callback) (void *userdata, int sock);
int timeout; /*ms */
time_t start;
};
static gboolean
check_connect_finished(gpointer userdata)
{
struct tcp_async_cb_data *cb_data = userdata;
int cb_arg = 0; // socket fd on success, -errno on error
int sock = cb_data->sock;
int error = 0;
fd_set rset, wset;
socklen_t len = sizeof(error);
struct timeval ts = { 0, };
if (cb_data->success == TRUE) {
goto dispatch_done;
}
FD_ZERO(&rset);
FD_SET(sock, &rset);
wset = rset;
crm_trace("fd %d: checking to see if connect finished", sock);
cb_arg = select(sock + 1, &rset, &wset, NULL, &ts);
if (cb_arg < 0) {
cb_arg = -errno;
if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
/* reschedule if there is still time left */
if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
goto reschedule;
} else {
cb_arg = -ETIMEDOUT;
}
}
crm_trace("fd %d: select failed %d connect dispatch ", sock, cb_arg);
goto dispatch_done;
} else if (cb_arg == 0) {
if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
goto reschedule;
}
crm_debug("fd %d: timeout during select", sock);
cb_arg = -ETIMEDOUT;
goto dispatch_done;
} else {
crm_trace("fd %d: select returned success", sock);
cb_arg = 0;
}
/* can we read or write to the socket now? */
if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
cb_arg = -errno;
crm_trace("fd %d: call to getsockopt failed", sock);
goto dispatch_done;
}
if (error) {
crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
cb_arg = -error;
goto dispatch_done;
}
} else {
crm_trace("neither read nor write set after select");
cb_arg = -EAGAIN;
goto dispatch_done;
}
dispatch_done:
if (!cb_arg) {
crm_trace("fd %d: connected", sock);
/* Success, set the return code to the sock to report to the callback */
cb_arg = cb_data->sock;
cb_data->sock = 0;
} else {
close(sock);
}
if (cb_data->callback) {
cb_data->callback(cb_data->userdata, cb_arg);
}
free(cb_data);
return FALSE;
reschedule:
/* will check again next interval */
return TRUE;
}
static int
internal_tcp_connect_async(int sock,
const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
{
int rc = 0;
int interval = 500;
int timer;
struct tcp_async_cb_data *cb_data = NULL;
rc = crm_set_nonblocking(sock);
if (rc < 0) {
crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
pcmk_strerror(rc), rc);
close(sock);
return -1;
}
rc = connect(sock, addr, addrlen);
if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
crm_perror(LOG_WARNING, "connect");
return -1;
}
cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
cb_data->userdata = userdata;
cb_data->callback = callback;
cb_data->sock = sock;
cb_data->timeout = timeout;
cb_data->start = time(NULL);
if (rc == 0) {
/* The connect was successful immediately, we still return to mainloop
* and let this callback get called later. This avoids the user of this api
* to have to account for the fact the callback could be invoked within this
* function before returning. */
cb_data->success = TRUE;
interval = 1;
}
/* Check connect finished is mostly doing a non-block poll on the socket
* to see if we can read/write to it. Once we can, the connect has completed.
* This method allows us to connect to the server without blocking mainloop.
*
* This is a poor man's way of polling to see when the connection finished.
* At some point we should figure out a way to use a mainloop fd callback for this.
* Something about the way mainloop is currently polling prevents this from working at the
* moment though. */
crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
interval, sock);
timer = g_timeout_add(interval, check_connect_finished, cb_data);
if (timer_id) {
*timer_id = timer;
}
return 0;
}
static int
internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
{
int rc = connect(sock, addr, addrlen);
if (rc < 0) {
rc = -errno;
crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
pcmk_strerror(rc), rc);
return rc;
}
rc = crm_set_nonblocking(sock);
if (rc < 0) {
crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
pcmk_strerror(rc), rc);
return rc;
}
return pcmk_ok;
}
/*!
* \internal
* \brief Connect to server at specified TCP port
*
* \param[in] host Name of server to connect to
* \param[in] port Server port to connect to
* \param[in] timeout Report error if not connected in this many milliseconds
* \param[out] timer_id If non-NULL, will be set to timer ID, if asynchronous
* \param[in] userdata Data to pass to callback, if asynchronous
* \param[in] callback If non-NULL, connect asynchronously then call this
*
* \return File descriptor of connected socket on success, -ENOTCONN otherwise
*/
int
crm_remote_tcp_connect_async(const char *host, int port, int timeout,
int *timer_id, void *userdata,
void (*callback) (void *userdata, int sock))
{
char buffer[INET6_ADDRSTRLEN];
struct addrinfo *res = NULL;
struct addrinfo *rp = NULL;
struct addrinfo hints;
const char *server = host;
int ret_ga;
int sock = -ENOTCONN;
// Get host's IP address(es)
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_CANONNAME;
ret_ga = getaddrinfo(server, NULL, &hints, &res);
if (ret_ga) {
crm_err("Unable to get IP address info for %s: %s",
server, gai_strerror(ret_ga));
goto async_cleanup;
}
if (!res || !res->ai_addr) {
crm_err("Unable to get IP address info for %s: no result", server);
goto async_cleanup;
}
// getaddrinfo() returns a list of host's addresses, try them in order
for (rp = res; rp != NULL; rp = rp->ai_next) {
struct sockaddr *addr = rp->ai_addr;
if (!addr) {
continue;
}
if (rp->ai_canonname) {
server = res->ai_canonname;
}
crm_debug("Got canonical name %s for %s", server, host);
sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
crm_perror(LOG_WARNING, "creating socket for connection to %s",
server);
sock = -ENOTCONN;
continue;
}
/* Set port appropriately for address family */
/* (void*) casts avoid false-positive compiler alignment warnings */
if (addr->sa_family == AF_INET6) {
((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
} else {
((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
}
memset(buffer, 0, DIMOF(buffer));
crm_sockaddr2str(addr, buffer);
crm_info("Attempting TCP connection to %s:%d", buffer, port);
if (callback) {
if (internal_tcp_connect_async
(sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
goto async_cleanup; /* Success for now, we'll hear back later in the callback */
}
} else if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
break; /* Success */
}
close(sock);
sock = -ENOTCONN;
}
async_cleanup:
if (res) {
freeaddrinfo(res);
}
return sock;
}
int
crm_remote_tcp_connect(const char *host, int port)
{
return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
}
/*!
* \brief Convert an IP address (IPv4 or IPv6) to a string for logging
*
* \param[in] sa Socket address for IP
* \param[out] s Storage for at least INET6_ADDRSTRLEN bytes
*
* \note sa The socket address can be a pointer to struct sockaddr_in (IPv4),
* struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either),
* as long as its sa_family member is set correctly.
*/
void
crm_sockaddr2str(void *sa, char *s)
{
switch (((struct sockaddr*)sa)->sa_family) {
case AF_INET:
inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
s, INET6_ADDRSTRLEN);
break;
case AF_INET6:
inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
s, INET6_ADDRSTRLEN);
break;
default:
strcpy(s, "<invalid>");
}
}
int
crm_remote_accept(int ssock)
{
int csock = 0;
int rc = 0;
unsigned laddr = 0;
struct sockaddr_storage addr;
char addr_str[INET6_ADDRSTRLEN];
#ifdef TCP_USER_TIMEOUT
int optval;
long sbd_timeout = crm_get_sbd_timeout();
#endif
/* accept the connection */
laddr = sizeof(addr);
memset(&addr, 0, sizeof(addr));
csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
crm_sockaddr2str(&addr, addr_str);
crm_info("New remote connection from %s", addr_str);
if (csock == -1) {
crm_err("accept socket failed");
return -1;
}
rc = crm_set_nonblocking(csock);
if (rc < 0) {
crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
pcmk_strerror(rc), rc);
close(csock);
return rc;
}
#ifdef TCP_USER_TIMEOUT
if (sbd_timeout > 0) {
optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
&optval, sizeof(optval));
if (rc < 0) {
crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
optval);
close(csock);
return rc;
}
}
#endif
return csock;
}
/*!
* \brief Get the default remote connection TCP port on this host
*
* \return Remote connection TCP port number
*/
int
crm_default_remote_port()
{
static int port = 0;
if (port == 0) {
const char *env = getenv("PCMK_remote_port");
if (env) {
errno = 0;
port = strtol(env, NULL, 10);
if (errno || (port < 1) || (port > 65535)) {
crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
env, DEFAULT_REMOTE_PORT);
port = DEFAULT_REMOTE_PORT;
}
} else {
port = DEFAULT_REMOTE_PORT;
}
}
return port;
}
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Wed, Jun 25, 2:14 AM (8 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1951841
Default Alt Text
remote.c (36 KB)
Attached To
Mode
rP Pacemaker
Attached
Detach File
Event Timeline
Log In to Comment