/* * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * * Copyright (c) 2024-present, Valkey contributors. * All rights reserved. * * Licensed under your choice of (a) the Redis Source Available License 2.0 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * GNU Affero General Public License v3 (AGPLv3). * * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "cluster_slot_stats.h" #include "cluster.h" typedef enum { KEY_COUNT, CPU_USEC, MEMORY_BYTES, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatType; /* ----------------------------------------------------------------------------- * CLUSTER SLOT-STATS command * -------------------------------------------------------------------------- */ /* Struct used to temporarily hold slot statistics for sorting. */ typedef struct { int slot; uint64_t stat; } slotStatForSort; static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) { clusterNode *primary = clusterNodeGetMaster(getMyClusterNode()); int assigned_slots_count = 0; for (int slot = start_slot; slot <= end_slot; slot++) { if (!clusterNodeCoversSlot(primary, slot)) continue; assigned_slots[slot]++; assigned_slots_count++; } return assigned_slots_count; } static inline kvstoreDictMetadata *getSlotMeta(int slot, int createIfNeeded) { return kvstoreGetDictMeta(server.db->keys, slot, createIfNeeded); } static uint64_t getSlotStat(int slot, slotStatType stat_type) { kvstoreDictMetadata *meta = getSlotMeta(slot, 0); switch (stat_type) { case KEY_COUNT: return countKeysInSlot(slot); case CPU_USEC: return meta ? meta->cpu_usec : 0; case MEMORY_BYTES: return meta ? meta->alloc_size : 0; case NETWORK_BYTES_IN: return meta ? meta->network_bytes_in : 0; case NETWORK_BYTES_OUT: return meta ? meta->network_bytes_out : 0; default: serverPanic("Invalid slot stat type %d was found.", stat_type); } } /* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */ static int slotStatForSortAscCmp(const void *a, const void *b) { const slotStatForSort *entry_a = a; const slotStatForSort *entry_b = b; if (entry_a->stat == entry_b->stat) { return entry_a->slot - entry_b->slot; } return entry_a->stat - entry_b->stat; } /* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */ static int slotStatForSortDescCmp(const void *a, const void *b) { const slotStatForSort *entry_a = a; const slotStatForSort *entry_b = b; if (entry_b->stat == entry_a->stat) { return entry_a->slot - entry_b->slot; } return entry_b->stat - entry_a->stat; } static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) { clusterNode *primary = clusterNodeGetMaster(getMyClusterNode()); int i = 0; for (int slot = 0; slot < CLUSTER_SLOTS; slot++) { if (!clusterNodeCoversSlot(primary, slot)) continue; slot_stats[i].slot = slot; slot_stats[i].stat = getSlotStat(slot, order_by); i++; } qsort(slot_stats, i, sizeof(slotStatForSort), desc ? slotStatForSortDescCmp : slotStatForSortAscCmp); } static void addReplySlotStat(client *c, int slot) { int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU; int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET; int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot; addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot, * and 1st index represents (map) usage statistics. */ addReplyLongLong(c, slot); /* Nested map representing slot usage statistics. */ addReplyMapLen(c, 1 + /* key-count */ (mem_enabled ? 1 : 0) + /* memory-bytes */ (cpu_enabled ? 1 : 0) + /* cpu-usec */ (net_enabled ? 2 : 0)); /* network-bytes-in/out */ addReplyBulkCString(c, "key-count"); addReplyLongLong(c, countKeysInSlot(slot)); /* Any additional metrics aside from key-count come with a performance trade-off, * and are aggregated and returned based on its server config. */ kvstoreDictMetadata *meta = getSlotMeta(slot, 0); if (mem_enabled) { addReplyBulkCString(c, "memory-bytes"); addReplyLongLong(c, meta ? meta->alloc_size : 0); } if (cpu_enabled) { addReplyBulkCString(c, "cpu-usec"); addReplyLongLong(c, meta ? meta->cpu_usec : 0); } if (net_enabled) { addReplyBulkCString(c, "network-bytes-in"); addReplyLongLong(c, meta ? meta->network_bytes_in : 0); addReplyBulkCString(c, "network-bytes-out"); addReplyLongLong(c, meta ? meta->network_bytes_out : 0); } } /* Adds reply for the SLOTSRANGE variant. * Response is ordered in ascending slot number. */ static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int start_slot, int end_slot, int len) { addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */ for (int slot = start_slot; slot <= end_slot; slot++) { if (assigned_slots[slot]) addReplySlotStat(c, slot); } } static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) { int num_slots_assigned = getMyShardSlotCount(); int len = min(limit, num_slots_assigned); addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */ for (int i = 0; i < len; i++) { addReplySlotStat(c, slot_stats[i].slot); } } static int canAddNetworkBytesOut(client *c) { return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT; } /* Accumulates egress bytes upon sending RESP responses back to user clients. */ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { if (!canAddNetworkBytesOut(c)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); meta->network_bytes_out += c->net_output_bytes_curr_cmd; } /* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { client *c = server.current_client; if (c == NULL || !canAddNetworkBytesOut(c)) return; /* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */ len *= (long long)listLength(server.slaves); serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); serverAssert(clusterNodeIsMaster(getMyClusterNode())); kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); /* We sometimes want to adjust the counter downwards (for example when we want to undo accounting for * SELECT commands that don't belong to any slot) so let's make sure we don't underflow the counter. */ debugServerAssert(len >= 0 || meta->network_bytes_out >= (uint64_t)-len); meta->network_bytes_out += len; } /* Increment network bytes out for replication stream. This method will increment `len` value times the active replica * count. */ void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) { clusterSlotStatsUpdateNetworkBytesOutForReplication(len); } /* Decrement network bytes out for replication stream. * This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command. * This will decrement `len` value times the active replica count. */ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { clusterSlotStatsUpdateNetworkBytesOutForReplication(-len); } /* Upon SPUBLISH, two egress events are triggered. * 1) Internal propagation, for clients that are subscribed to the current node. * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. * This function covers the internal propagation component. */ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { /* For a blocked client, c->slot could be pre-filled. * Thus c->slot is backed-up for restoration after aggregation is completed. */ int save_slot = c->slot; c->slot = slot; if (canAddNetworkBytesOut(c)) { serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); meta->network_bytes_out += c->net_output_bytes_curr_cmd; } /* For sharded pubsub, the client's network bytes metrics must be reset here, * as resetClient() is not called until subscription ends. */ c->net_output_bytes_curr_cmd = 0; c->slot = save_slot; } /* Adds reply for the ORDERBY variant. * Response is ordered based on the sort result. */ static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) { slotStatForSort slot_stats[CLUSTER_SLOTS]; collectAndSortSlotStats(slot_stats, order_by, desc); addReplySortedSlotStats(c, slot_stats, limit); } /* Resets applicable slot statistics. */ void clusterSlotStatReset(int slot) { kvstoreDictMetadata *meta = getSlotMeta(slot, 0); if (!meta) return; meta->cpu_usec = 0; meta->network_bytes_in = 0; meta->network_bytes_out = 0; kvstoreFreeDictIfNeeded(server.db->keys, slot); } void clusterSlotStatResetAll(void) { for (int slot = 0; slot < CLUSTER_SLOTS; slot++) clusterSlotStatReset(slot); } /* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped. * This is due to their unique callstack, where the c->duration for * EXEC, EVAL and FCALL already includes all of its nested commands. * Meaning, the accumulation of cpu-usec for these nested commands * would equate to repeating the same calculation twice. */ static int canAddCpuDuration(client *c) { return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_CPU) && /* CPU tracking should be enabled. */ c->slot != INVALID_CLUSTER_SLOT && /* Command should be slot specific. */ (!server.execution_nesting || /* Either command should not be nested, */ (c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */ } void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) { if (!canAddCpuDuration(c)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); meta->cpu_usec += duration; } /* For cross-slot scripting, its caller client's slot must be invalidated, * such that its slot-stats aggregation is bypassed. */ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) { if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return; ctx->original_client->slot = -1; } static int canAddNetworkBytesIn(client *c) { /* First, network tracking must be enabled. * Second, command should target a specific slot. * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT && !(c->flags & CLIENT_BLOCKED) && !server.in_exec; } /* Adds network ingress bytes of the current command in execution, * calculated earlier within networking.c layer. * * Note: Below function should only be called once c->slot is parsed. * Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure. * */ void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) { if (!canAddNetworkBytesIn(c)) return; if (c->cmd->proc == execCommand) { /* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */ c->net_input_bytes_curr_cmd += 15; } kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); meta->network_bytes_in += c->net_input_bytes_curr_cmd; } void clusterSlotStatsCommand(client *c) { if (!server.cluster_enabled) { addReplyError(c, "This instance has cluster support disabled"); return; } /* Parse additional arguments. */ if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) { /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */ int start_slot, end_slot; if ((start_slot = getSlotOrReply(c, c->argv[3])) == -1 || (end_slot = getSlotOrReply(c, c->argv[4])) == -1) { return; } if (start_slot > end_slot) { addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", start_slot, end_slot); return; } /* Initialize slot assignment array. */ unsigned char assigned_slots[CLUSTER_SLOTS] = {0}; int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, start_slot, end_slot); addReplySlotsRange(c, assigned_slots, start_slot, end_slot, assigned_slots_count); } else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) { /* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */ int desc = 1; slotStatType order_by = INVALID; int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU; int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET; int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot; if (!strcasecmp(c->argv[3]->ptr, "key-count")) { order_by = KEY_COUNT; } else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && cpu_enabled) { order_by = CPU_USEC; } else if (!strcasecmp(c->argv[3]->ptr, "memory-bytes") && mem_enabled) { order_by = MEMORY_BYTES; } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && net_enabled) { order_by = NETWORK_BYTES_IN; } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && net_enabled) { order_by = NETWORK_BYTES_OUT; } else { addReplyError(c, "Unrecognized sort metric for ORDERBY."); return; } int i = 4; /* Next argument index, following ORDERBY */ int limit_counter = 0, asc_desc_counter = 0; long limit = CLUSTER_SLOTS; while (i < c->argc) { int moreargs = c->argc > i + 1; if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) { if (getRangeLongFromObjectOrReply( c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit, "Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) { return; } i++; limit_counter++; } else if (!strcasecmp(c->argv[i]->ptr, "asc")) { desc = 0; asc_desc_counter++; } else if (!strcasecmp(c->argv[i]->ptr, "desc")) { desc = 1; asc_desc_counter++; } else { addReplyErrorObject(c, shared.syntaxerr); return; } if (limit_counter > 1 || asc_desc_counter > 1) { addReplyError(c, "Multiple filters of the same type are disallowed."); return; } i++; } addReplyOrderBy(c, order_by, limit, desc); } else { addReplySubcommandSyntaxError(c); } }