diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/cluster_slot_stats.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/cluster_slot_stats.c')
| -rw-r--r-- | examples/redis-unstable/src/cluster_slot_stats.c | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/cluster_slot_stats.c b/examples/redis-unstable/src/cluster_slot_stats.c new file mode 100644 index 0000000..3d7c39a --- /dev/null +++ b/examples/redis-unstable/src/cluster_slot_stats.c @@ -0,0 +1,373 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + +#include "cluster_slot_stats.h" +#include "cluster.h" + +typedef enum { + KEY_COUNT, + CPU_USEC, + MEMORY_BYTES, + NETWORK_BYTES_IN, + NETWORK_BYTES_OUT, + SLOT_STAT_COUNT, + INVALID +} slotStatType; + +/* ----------------------------------------------------------------------------- + * CLUSTER SLOT-STATS command + * -------------------------------------------------------------------------- */ + +/* Struct used to temporarily hold slot statistics for sorting. */ +typedef struct { + int slot; + uint64_t stat; +} slotStatForSort; + +static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) { + clusterNode *primary = clusterNodeGetMaster(getMyClusterNode()); + int assigned_slots_count = 0; + for (int slot = start_slot; slot <= end_slot; slot++) { + if (!clusterNodeCoversSlot(primary, slot)) continue; + assigned_slots[slot]++; + assigned_slots_count++; + } + return assigned_slots_count; +} + +static inline kvstoreDictMetadata *getSlotMeta(int slot, int createIfNeeded) { + return kvstoreGetDictMeta(server.db->keys, slot, createIfNeeded); +} + +static uint64_t getSlotStat(int slot, slotStatType stat_type) { + kvstoreDictMetadata *meta = getSlotMeta(slot, 0); + switch (stat_type) { + case KEY_COUNT: return countKeysInSlot(slot); + case CPU_USEC: return meta ? meta->cpu_usec : 0; + case MEMORY_BYTES: return meta ? meta->alloc_size : 0; + case NETWORK_BYTES_IN: return meta ? meta->network_bytes_in : 0; + case NETWORK_BYTES_OUT: return meta ? meta->network_bytes_out : 0; + default: serverPanic("Invalid slot stat type %d was found.", stat_type); + } +} + +/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */ +static int slotStatForSortAscCmp(const void *a, const void *b) { + const slotStatForSort *entry_a = a; + const slotStatForSort *entry_b = b; + if (entry_a->stat == entry_b->stat) { + return entry_a->slot - entry_b->slot; + } + return entry_a->stat - entry_b->stat; +} + +/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */ +static int slotStatForSortDescCmp(const void *a, const void *b) { + const slotStatForSort *entry_a = a; + const slotStatForSort *entry_b = b; + if (entry_b->stat == entry_a->stat) { + return entry_a->slot - entry_b->slot; + } + return entry_b->stat - entry_a->stat; +} + +static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) { + clusterNode *primary = clusterNodeGetMaster(getMyClusterNode()); + int i = 0; + for (int slot = 0; slot < CLUSTER_SLOTS; slot++) { + if (!clusterNodeCoversSlot(primary, slot)) continue; + slot_stats[i].slot = slot; + slot_stats[i].stat = getSlotStat(slot, order_by); + i++; + } + qsort(slot_stats, i, sizeof(slotStatForSort), desc ? slotStatForSortDescCmp : slotStatForSortAscCmp); +} + +static void addReplySlotStat(client *c, int slot) { + int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU; + int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET; + int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot; + + addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot, + * and 1st index represents (map) usage statistics. */ + addReplyLongLong(c, slot); + /* Nested map representing slot usage statistics. */ + addReplyMapLen(c, 1 + /* key-count */ + (mem_enabled ? 1 : 0) + /* memory-bytes */ + (cpu_enabled ? 1 : 0) + /* cpu-usec */ + (net_enabled ? 2 : 0)); /* network-bytes-in/out */ + addReplyBulkCString(c, "key-count"); + addReplyLongLong(c, countKeysInSlot(slot)); + + /* Any additional metrics aside from key-count come with a performance trade-off, + * and are aggregated and returned based on its server config. */ + kvstoreDictMetadata *meta = getSlotMeta(slot, 0); + if (mem_enabled) { + addReplyBulkCString(c, "memory-bytes"); + addReplyLongLong(c, meta ? meta->alloc_size : 0); + } + if (cpu_enabled) { + addReplyBulkCString(c, "cpu-usec"); + addReplyLongLong(c, meta ? meta->cpu_usec : 0); + } + if (net_enabled) { + addReplyBulkCString(c, "network-bytes-in"); + addReplyLongLong(c, meta ? meta->network_bytes_in : 0); + addReplyBulkCString(c, "network-bytes-out"); + addReplyLongLong(c, meta ? meta->network_bytes_out : 0); + } +} + +/* Adds reply for the SLOTSRANGE variant. + * Response is ordered in ascending slot number. */ +static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int start_slot, int end_slot, int len) { + addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */ + + for (int slot = start_slot; slot <= end_slot; slot++) { + if (assigned_slots[slot]) addReplySlotStat(c, slot); + } +} + +static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) { + int num_slots_assigned = getMyShardSlotCount(); + int len = min(limit, num_slots_assigned); + addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */ + + for (int i = 0; i < len; i++) { + addReplySlotStat(c, slot_stats[i].slot); + } +} + +static int canAddNetworkBytesOut(client *c) { + return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT; +} + +/* Accumulates egress bytes upon sending RESP responses back to user clients. */ +void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { + if (!canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); + meta->network_bytes_out += c->net_output_bytes_curr_cmd; +} + +/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ +static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { + client *c = server.current_client; + if (c == NULL || !canAddNetworkBytesOut(c)) return; + + /* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */ + len *= (long long)listLength(server.slaves); + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + serverAssert(clusterNodeIsMaster(getMyClusterNode())); + kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); + /* We sometimes want to adjust the counter downwards (for example when we want to undo accounting for + * SELECT commands that don't belong to any slot) so let's make sure we don't underflow the counter. */ + debugServerAssert(len >= 0 || meta->network_bytes_out >= (uint64_t)-len); + meta->network_bytes_out += len; +} + +/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica + * count. */ +void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) { + clusterSlotStatsUpdateNetworkBytesOutForReplication(len); +} + +/* Decrement network bytes out for replication stream. + * This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command. + * This will decrement `len` value times the active replica count. */ +void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { + clusterSlotStatsUpdateNetworkBytesOutForReplication(-len); +} + +/* Upon SPUBLISH, two egress events are triggered. + * 1) Internal propagation, for clients that are subscribed to the current node. + * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). + * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. + * This function covers the internal propagation component. */ +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { + /* For a blocked client, c->slot could be pre-filled. + * Thus c->slot is backed-up for restoration after aggregation is completed. */ + int save_slot = c->slot; + c->slot = slot; + if (canAddNetworkBytesOut(c)) { + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); + meta->network_bytes_out += c->net_output_bytes_curr_cmd; + } + /* For sharded pubsub, the client's network bytes metrics must be reset here, + * as resetClient() is not called until subscription ends. */ + c->net_output_bytes_curr_cmd = 0; + c->slot = save_slot; +} + +/* Adds reply for the ORDERBY variant. + * Response is ordered based on the sort result. */ +static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) { + slotStatForSort slot_stats[CLUSTER_SLOTS]; + collectAndSortSlotStats(slot_stats, order_by, desc); + addReplySortedSlotStats(c, slot_stats, limit); +} + +/* Resets applicable slot statistics. */ +void clusterSlotStatReset(int slot) { + kvstoreDictMetadata *meta = getSlotMeta(slot, 0); + if (!meta) return; + meta->cpu_usec = 0; + meta->network_bytes_in = 0; + meta->network_bytes_out = 0; + kvstoreFreeDictIfNeeded(server.db->keys, slot); +} + +void clusterSlotStatResetAll(void) { + for (int slot = 0; slot < CLUSTER_SLOTS; slot++) + clusterSlotStatReset(slot); +} + +/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped. + * This is due to their unique callstack, where the c->duration for + * EXEC, EVAL and FCALL already includes all of its nested commands. + * Meaning, the accumulation of cpu-usec for these nested commands + * would equate to repeating the same calculation twice. + */ +static int canAddCpuDuration(client *c) { + return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_CPU) && /* CPU tracking should be enabled. */ + c->slot != INVALID_CLUSTER_SLOT && /* Command should be slot specific. */ + (!server.execution_nesting || /* Either command should not be nested, */ + (c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */ +} + +void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) { + if (!canAddCpuDuration(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); + meta->cpu_usec += duration; +} + +/* For cross-slot scripting, its caller client's slot must be invalidated, + * such that its slot-stats aggregation is bypassed. */ +void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) { + if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return; + + ctx->original_client->slot = -1; +} + +static int canAddNetworkBytesIn(client *c) { + /* First, network tracking must be enabled. + * Second, command should target a specific slot. + * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. + * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of + * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ + return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT && + !(c->flags & CLIENT_BLOCKED) && !server.in_exec; +} + +/* Adds network ingress bytes of the current command in execution, + * calculated earlier within networking.c layer. + * + * Note: Below function should only be called once c->slot is parsed. + * Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure. + * */ +void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) { + if (!canAddNetworkBytesIn(c)) return; + + if (c->cmd->proc == execCommand) { + /* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */ + c->net_input_bytes_curr_cmd += 15; + } + + kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); + meta->network_bytes_in += c->net_input_bytes_curr_cmd; +} + +void clusterSlotStatsCommand(client *c) { + if (!server.cluster_enabled) { + addReplyError(c, "This instance has cluster support disabled"); + return; + } + + /* Parse additional arguments. */ + if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) { + /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */ + int start_slot, end_slot; + if ((start_slot = getSlotOrReply(c, c->argv[3])) == -1 || + (end_slot = getSlotOrReply(c, c->argv[4])) == -1) { + return; + } + if (start_slot > end_slot) { + addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", start_slot, end_slot); + return; + } + /* Initialize slot assignment array. */ + unsigned char assigned_slots[CLUSTER_SLOTS] = {0}; + int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, start_slot, end_slot); + addReplySlotsRange(c, assigned_slots, start_slot, end_slot, assigned_slots_count); + + } else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) { + /* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */ + int desc = 1; + slotStatType order_by = INVALID; + int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU; + int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET; + int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot; + if (!strcasecmp(c->argv[3]->ptr, "key-count")) { + order_by = KEY_COUNT; + } else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && cpu_enabled) { + order_by = CPU_USEC; + } else if (!strcasecmp(c->argv[3]->ptr, "memory-bytes") && mem_enabled) { + order_by = MEMORY_BYTES; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && net_enabled) { + order_by = NETWORK_BYTES_IN; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && net_enabled) { + order_by = NETWORK_BYTES_OUT; + } else { + addReplyError(c, "Unrecognized sort metric for ORDERBY."); + return; + } + int i = 4; /* Next argument index, following ORDERBY */ + int limit_counter = 0, asc_desc_counter = 0; + long limit = CLUSTER_SLOTS; + while (i < c->argc) { + int moreargs = c->argc > i + 1; + if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) { + if (getRangeLongFromObjectOrReply( + c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit, + "Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) { + return; + } + i++; + limit_counter++; + } else if (!strcasecmp(c->argv[i]->ptr, "asc")) { + desc = 0; + asc_desc_counter++; + } else if (!strcasecmp(c->argv[i]->ptr, "desc")) { + desc = 1; + asc_desc_counter++; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + if (limit_counter > 1 || asc_desc_counter > 1) { + addReplyError(c, "Multiple filters of the same type are disallowed."); + return; + } + i++; + } + addReplyOrderBy(c, order_by, limit, desc); + + } else { + addReplySubcommandSyntaxError(c); + } +} |
