diff --git a/crmd/throttle.c b/crmd/throttle.c index c4611af7c4..aa28d099ac 100644 --- a/crmd/throttle.c +++ b/crmd/throttle.c @@ -1,408 +1,606 @@ /* * Copyright (C) 2013 Andrew Beekhof * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include + +#include +#include + +#include #include +#include #include #include #include #include #include + enum throttle_state_e { throttle_high = 0x0100, throttle_med = 0x0010, throttle_low = 0x0001, throttle_none = 0x0000, }; struct throttle_record_s { int cores; enum throttle_state_e mode; char *node; }; int throttle_job_max = 0; float throttle_load_target = 0.0; #define THROTTLE_FACTOR_LOW 1.2 #define THROTTLE_FACTOR_MEDIUM 1.6 #define THROTTLE_FACTOR_HIGH 2.0 GHashTable *throttle_records = NULL; mainloop_timer_t *throttle_timer = NULL; int throttle_num_cores(void) { static int cores = 0; char buffer[256]; FILE *stream = NULL; const char *cpufile = "/proc/cpuinfo"; if(cores) { return cores; } stream = fopen(cpufile, "r"); if(stream == NULL) { int rc = errno; crm_warn("Couldn't read %s, assuming a single processor: %s (%d)", cpufile, pcmk_strerror(rc), rc); return 1; } while (fgets(buffer, sizeof(buffer), stream)) { if(strstr(buffer, "processor") == buffer) { cores++; } } if(cores == 0) { crm_warn("No processors found in %s, assuming 1", cpufile); return 1; } fclose(stream); return cores; } +static char *find_cib_loadfile(void) +{ + DIR *dp; + struct dirent *entry; + struct stat statbuf; + char *match = NULL; + + dp = opendir("/proc"); + if (!dp) { + /* no proc directory to search through */ + crm_notice("Can not read /proc directory to track existing components"); + return FALSE; + } + + while ((entry = readdir(dp)) != NULL) { + char procpath[128]; + char value[64]; + char key[16]; + FILE *file; + int pid; + + strcpy(procpath, "/proc/"); + /* strlen("/proc/") + strlen("/status") + 1 = 14 + * 128 - 14 = 114 */ + strncat(procpath, entry->d_name, 114); + + if (lstat(procpath, &statbuf)) { + continue; + } + if (!S_ISDIR(statbuf.st_mode) || !isdigit(entry->d_name[0])) { + continue; + } + + strcat(procpath, "/status"); + + file = fopen(procpath, "r"); + if (!file) { + continue; + } + if (fscanf(file, "%15s%63s", key, value) != 2) { + fclose(file); + continue; + } + fclose(file); + + if (safe_str_neq("cib", value)) { + continue; + } + + pid = atoi(entry->d_name); + if (pid <= 0) { + continue; + } + + match = g_strdup_printf("/proc/%d/stat", pid); + break; + } + + closedir(dp); + return match; +} + +static bool throttle_cib_load(float *load) +{ +/* + /proc/[pid]/stat + Status information about the process. This is used by ps(1). It is defined in /usr/src/linux/fs/proc/array.c. + + The fields, in order, with their proper scanf(3) format specifiers, are: + + pid %d (1) The process ID. + + comm %s (2) The filename of the executable, in parentheses. This is visible whether or not the executable is swapped out. + + state %c (3) One character from the string "RSDZTW" where R is running, S is sleeping in an interruptible wait, D is waiting in uninterruptible disk sleep, Z is zombie, T is traced or stopped (on a signal), and W is paging. + + ppid %d (4) The PID of the parent. + + pgrp %d (5) The process group ID of the process. + + session %d (6) The session ID of the process. + + tty_nr %d (7) The controlling terminal of the process. (The minor device number is contained in the combination of bits 31 to 20 and 7 to 0; the major device number is in bits 15 to 8.) + + tpgid %d (8) The ID of the foreground process group of the controlling terminal of the process. + + flags %u (%lu before Linux 2.6.22) + (9) The kernel flags word of the process. For bit meanings, see the PF_* defines in the Linux kernel source file include/linux/sched.h. Details depend on the kernel version. + + minflt %lu (10) The number of minor faults the process has made which have not required loading a memory page from disk. + + cminflt %lu (11) The number of minor faults that the process's waited-for children have made. + + majflt %lu (12) The number of major faults the process has made which have required loading a memory page from disk. + + cmajflt %lu (13) The number of major faults that the process's waited-for children have made. + + utime %lu (14) Amount of time that this process has been scheduled in user mode, measured in clock ticks (divide by sysconf(_SC_CLK_TCK)). This includes guest time, guest_time (time spent running a virtual CPU, see below), so that applications that are not aware of the guest time field do not lose that time from their calculations. + + stime %lu (15) Amount of time that this process has been scheduled in kernel mode, measured in clock ticks (divide by sysconf(_SC_CLK_TCK)). + */ + + static char *loadfile = NULL; + static time_t last_call = 0; + static long ticks_per_s = 0; + static unsigned long last_utime, last_stime; + + char buffer[64*1024]; + FILE *stream = NULL; + time_t now = time(NULL); + + if(load == NULL) { + return FALSE; + } else { + *load = 0.0; + } + + if(loadfile == NULL) { + last_call = 0; + last_utime = 0; + last_stime = 0; + loadfile = find_cib_loadfile(); + ticks_per_s = sysconf(_SC_CLK_TCK); + crm_trace("Found %s", loadfile); + } + + stream = fopen(loadfile, "r"); + if(stream == NULL) { + int rc = errno; + + crm_warn("Couldn't read %s: %s (%d)", loadfile, pcmk_strerror(rc), rc); + free(loadfile); loadfile = NULL; + return FALSE; + } + + if(fgets(buffer, sizeof(buffer), stream)) { + char *comm = calloc(1, 256); + char state = 0; + int rc = 0, pid = 0, ppid = 0, pgrp = 0, session = 0, tty_nr = 0, tpgid = 0; + unsigned long flags = 0, minflt = 0, cminflt = 0, majflt = 0, cmajflt = 0, utime = 0, stime = 0; + + rc = sscanf(buffer, "%d %[^ ] %c %d %d %d %d %d %lu %lu %lu %lu %lu %lu %lu", + &pid, comm, &state, + &ppid, &pgrp, &session, &tty_nr, &tpgid, + &flags, &minflt, &cminflt, &majflt, &cmajflt, &utime, &stime); + + if(rc != 15) { + crm_err("Only %d of 15 fields found in %s", rc, loadfile); + return FALSE; + + } else if(last_call > 0 + && last_call < now + && last_utime <= utime + && last_stime <= stime) { + + time_t elapsed = now - last_call; + unsigned long delta_utime = utime - last_utime; + unsigned long delta_stime = stime - last_stime; + + *load = (delta_utime + delta_stime); /* Cast to a float before division */ + *load /= ticks_per_s; + *load /= elapsed; + crm_debug("Current CIB load from %lu ticks in %ds is %f (@%lu tps)", delta_utime + delta_stime, elapsed, *load, ticks_per_s); + + } else { + crm_debug("Init %lu + %lu ticks at %d (%lu tps)", utime, stime, now, ticks_per_s); + } + + last_call = now; + last_utime = utime; + last_stime = stime; + + fclose(stream); + return TRUE; + } + + fclose(stream); + return FALSE; +} + static bool throttle_load_avg(float *load) { char buffer[256]; FILE *stream = NULL; const char *loadfile = "/proc/loadavg"; if(load == NULL) { return FALSE; } stream = fopen(loadfile, "r"); if(stream == NULL) { int rc = errno; crm_warn("Couldn't read %s: %s (%d)", loadfile, pcmk_strerror(rc), rc); return FALSE; } if(fgets(buffer, sizeof(buffer), stream)) { char *nl = strstr(buffer, "\n"); /* Grab the 1-minute average, ignore the rest */ *load = strtof(buffer, NULL); if(nl) { nl[0] = 0; } crm_debug("Current load is %f (full: %s)", *load, buffer); + fclose(stream); + return TRUE; } fclose(stream); - return TRUE; + return FALSE; } static bool throttle_io_load(float *load, unsigned int *blocked) { char buffer[64*1024]; FILE *stream = NULL; const char *loadfile = "/proc/stat"; if(load == NULL) { return FALSE; } stream = fopen(loadfile, "r"); if(stream == NULL) { int rc = errno; crm_warn("Couldn't read %s: %s (%d)", loadfile, pcmk_strerror(rc), rc); return FALSE; } if(fgets(buffer, sizeof(buffer), stream)) { /* Borrowed from procps-ng's sysinfo.c */ char *b = NULL; long long cpu_use = 0; long long cpu_nic = 0; long long cpu_sys = 0; long long cpu_idl = 0; long long cpu_iow = 0; /* not separated out until the 2.5.41 kernel */ long long cpu_xxx = 0; /* not separated out until the 2.6.0-test4 kernel */ long long cpu_yyy = 0; /* not separated out until the 2.6.0-test4 kernel */ long long cpu_zzz = 0; /* not separated out until the 2.6.11 kernel */ long long divo2 = 0; long long duse = 0; long long dsys = 0; long long didl =0; long long diow =0; long long dstl = 0; long long Div = 0; b = strstr(buffer, "cpu "); if(b) sscanf(b, "cpu %Lu %Lu %Lu %Lu %Lu %Lu %Lu %Lu", &cpu_use, &cpu_nic, &cpu_sys, &cpu_idl, &cpu_iow, &cpu_xxx, &cpu_yyy, &cpu_zzz); if(blocked) { b = strstr(buffer, "procs_blocked "); if(b) sscanf(b, "procs_blocked %u", blocked); } duse = cpu_use + cpu_nic; dsys = cpu_sys + cpu_xxx + cpu_yyy; didl = cpu_idl; diow = cpu_iow; dstl = cpu_zzz; Div = duse + dsys + didl + diow + dstl; if (!Div) Div = 1, didl = 1; divo2 = Div / 2UL; /* vmstat output: * * procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu---- * r b swpd free buff cache si so bi bo in cs us sy id wa * 1 0 5537800 958592 204180 1737740 1 1 12 15 0 0 2 1 97 0 * * The last four columns are calculated as: * * (unsigned)( (100*duse + divo2) / Div ), * (unsigned)( (100*dsys + divo2) / Div ), * (unsigned)( (100*didl + divo2) / Div ), * (unsigned)( (100*diow + divo2) / Div ) * */ *load = (diow + divo2) / Div; crm_debug("Current IO load is %f", *load); + + fclose(stream); + return TRUE; } fclose(stream); - return load; + return FALSE; } static enum throttle_state_e -throttle_mode(void) +throttle_handle_load(float load, const char *desc) +{ + if(load > THROTTLE_FACTOR_HIGH * throttle_load_target) { + crm_notice("High %s detected: %f (limit: %f)", desc, load, throttle_load_target); + return throttle_high; + + } else if(load > THROTTLE_FACTOR_MEDIUM * throttle_load_target) { + crm_info("Moderate %s detected: %f (limit: %f)", desc, load, throttle_load_target); + return throttle_med; + + } else if(load > THROTTLE_FACTOR_LOW * throttle_load_target) { + crm_debug("Noticable %s detected: %f (limit: %f)", desc, load, throttle_load_target); + return throttle_low; + } + + crm_trace("Negligable %s detected: %f (limit: %f)", desc, load, throttle_load_target); + return throttle_none; +} + +static enum throttle_state_e +throttle_mode(void) { float load; unsigned int blocked = 0; int cores = throttle_num_cores(); enum throttle_state_e mode = throttle_none; if(throttle_load_target <= 0) { /* If we ever make this a valid value, the cluster will at least behave as expected */ return mode; } if(throttle_load_avg(&load)) { - float simple_load = 0.0; + float simple = load / cores; + mode |= throttle_handle_load(simple, "CPU load"); + } - if(cores) { - simple_load = load / cores; - } else { - simple_load = load; - } + if(throttle_cib_load(&load)) { + const char *desc = "CIB load"; - if(simple_load > THROTTLE_FACTOR_HIGH * throttle_load_target) { - crm_notice("High CPU load detected: %f (limit: %f)", simple_load, throttle_load_target); + if(load > 0.9) { + crm_notice("High %s detected: %f", desc, load); mode |= throttle_high; - } else if(simple_load > THROTTLE_FACTOR_MEDIUM * throttle_load_target) { - crm_info("Moderate CPU load detected: %f (limit: %f)", simple_load, throttle_load_target); + + } else if(load > 0.8) { + crm_info("Moderate %s detected: %f", desc, load); mode |= throttle_med; - } else if(simple_load > THROTTLE_FACTOR_LOW * throttle_load_target) { - crm_debug("Noticable CPU load detected: %f (limit: %f)", simple_load, throttle_load_target); + + } else if(load > 0.7) { + crm_debug("Noticable %s detected: %f", desc, load); mode |= throttle_low; + + } else { + crm_trace("Negligable %s detected: %f", desc, load); } } if(throttle_io_load(&load, &blocked)) { float blocked_ratio = 0.0; - if(load > THROTTLE_FACTOR_HIGH * throttle_load_target) { - crm_notice("High IO load detected: %f (limit: %f)", load, throttle_load_target); - mode |= throttle_high; - } else if(load > THROTTLE_FACTOR_MEDIUM * throttle_load_target) { - crm_info("Moderate IO load detected: %f (limit: %f)", load, throttle_load_target); - mode |= throttle_med; - } else if(load > THROTTLE_FACTOR_LOW * throttle_load_target) { - crm_info("Noticable IO load detected: %f (limit: %f)", load, throttle_load_target); - mode |= throttle_low; - } + mode |= throttle_handle_load(load, "IO load"); if(cores) { blocked_ratio = blocked / cores; } else { blocked_ratio = blocked; } - if(blocked_ratio > THROTTLE_FACTOR_HIGH * throttle_load_target) { - crm_notice("High IO indicator detected: %f (limit: %f)", blocked_ratio, throttle_load_target); - mode |= throttle_high; - } else if(blocked_ratio > THROTTLE_FACTOR_MEDIUM * throttle_load_target) { - crm_info("Moderate IO indicator detected: %f (limit: %f)", blocked_ratio, throttle_load_target); - mode |= throttle_med; - } else if(blocked_ratio > THROTTLE_FACTOR_LOW * throttle_load_target) { - crm_debug("Noticable IO indicator detected: %f (limit: %f)", blocked_ratio, throttle_load_target); - mode |= throttle_low; - } + mode |= throttle_handle_load(blocked_ratio, "blocked IO ratio"); } if(mode & throttle_high) { return throttle_high; } else if(mode & throttle_med) { return throttle_med; } else if(mode & throttle_low) { return throttle_low; } return throttle_none; } static void throttle_send_command(enum throttle_state_e mode) { xmlNode *xml = NULL; int cores = throttle_num_cores(); xml = create_request(CRM_OP_THROTTLE, NULL, NULL, CRM_SYSTEM_CRMD, CRM_SYSTEM_CRMD, NULL); crm_xml_add_int(xml, F_CRM_THROTTLE_MODE, mode); crm_xml_add_int(xml, F_CRM_THROTTLE_CORES, cores); send_cluster_message(NULL, crm_msg_crmd, xml, TRUE); free_xml(xml); crm_info("Updated throttle state to %.4x", mode); } static gboolean throttle_timer_cb(gpointer data) { static bool send_updates = FALSE; static enum throttle_state_e last = throttle_none; enum throttle_state_e now = throttle_mode(); if(send_updates == FALSE) { /* Optimize for the true case */ if(compare_version(fsa_our_dc_version, "3.0.8") < 0) { crm_trace("DC version %s doesn't support throttling", fsa_our_dc_version); } else { send_updates = TRUE; } } if(send_updates && now != last) { crm_debug("New throttle mode: %.4x (was %.4x)", now, last); throttle_send_command(now); last = now; } return TRUE; } static void throttle_record_free(gpointer p) { struct throttle_record_s *r = p; free(r->node); free(r); } void throttle_init(void) { float load = 0.0; throttle_load_avg(&load); throttle_records = g_hash_table_new_full(crm_str_hash, g_str_equal, NULL, throttle_record_free); throttle_timer = mainloop_timer_add("throttle", 30* 1000, TRUE, throttle_timer_cb, NULL); mainloop_timer_start(throttle_timer); } void throttle_fini(void) { mainloop_timer_del(throttle_timer); throttle_timer = NULL; g_hash_table_destroy(throttle_records); throttle_records = NULL; } int throttle_get_job_limit(const char *node) { int jobs = 1; int job_max = throttle_job_max; struct throttle_record_s *r = NULL; r = g_hash_table_lookup(throttle_records, node); if(r == NULL) { r = calloc(1, sizeof(struct throttle_record_s)); r->node = strdup(node); r->mode = throttle_mode(); r->cores = throttle_num_cores(); crm_trace("Defaulting to local values for unknown node %s", node); g_hash_table_insert(throttle_records, r->node, r); } if(job_max <= 0) { job_max = r->cores * 2; } switch(r->mode) { case throttle_high: jobs = 1; /* At least one job must always be allowed */ break; case throttle_med: jobs = QB_MAX(1, job_max / 4); break; case throttle_low: jobs = QB_MAX(1, job_max / 2); break; case throttle_none: jobs = QB_MAX(1, job_max); break; default: crm_err("Unknown throttle mode %.4x on %s", r->mode, node); break; } return jobs; } void throttle_update(xmlNode *xml) { int cores = 0; enum throttle_state_e mode = 0; struct throttle_record_s *r = NULL; const char *from = crm_element_value(xml, F_CRM_HOST_FROM); crm_element_value_int(xml, F_CRM_THROTTLE_MODE, (int*)&mode); crm_element_value_int(xml, F_CRM_THROTTLE_CORES, &cores); r = g_hash_table_lookup(throttle_records, from); if(r == NULL) { r = calloc(1, sizeof(struct throttle_record_s)); r->node = strdup(from); g_hash_table_insert(throttle_records, r->node, r); } r->cores = cores; r->mode = mode; crm_debug("Host %s has %d cores and throttle mode %.4x. New job limit is %d", from, cores, mode, throttle_get_job_limit(from)); }