summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/cluster_slot_stats.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/cluster_slot_stats.c
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/cluster_slot_stats.c')
-rw-r--r--examples/redis-unstable/src/cluster_slot_stats.c373
1 files changed, 373 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/cluster_slot_stats.c b/examples/redis-unstable/src/cluster_slot_stats.c
new file mode 100644
index 0000000..3d7c39a
--- /dev/null
+++ b/examples/redis-unstable/src/cluster_slot_stats.c
@@ -0,0 +1,373 @@
+/*
+ * Copyright (c) 2009-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Copyright (c) 2024-present, Valkey contributors.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ *
+ * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+ */
+
+#include "cluster_slot_stats.h"
+#include "cluster.h"
+
+typedef enum {
+ KEY_COUNT,
+ CPU_USEC,
+ MEMORY_BYTES,
+ NETWORK_BYTES_IN,
+ NETWORK_BYTES_OUT,
+ SLOT_STAT_COUNT,
+ INVALID
+} slotStatType;
+
+/* -----------------------------------------------------------------------------
+ * CLUSTER SLOT-STATS command
+ * -------------------------------------------------------------------------- */
+
+/* Struct used to temporarily hold slot statistics for sorting. */
+typedef struct {
+ int slot;
+ uint64_t stat;
+} slotStatForSort;
+
+static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
+ clusterNode *primary = clusterNodeGetMaster(getMyClusterNode());
+ int assigned_slots_count = 0;
+ for (int slot = start_slot; slot <= end_slot; slot++) {
+ if (!clusterNodeCoversSlot(primary, slot)) continue;
+ assigned_slots[slot]++;
+ assigned_slots_count++;
+ }
+ return assigned_slots_count;
+}
+
+static inline kvstoreDictMetadata *getSlotMeta(int slot, int createIfNeeded) {
+ return kvstoreGetDictMeta(server.db->keys, slot, createIfNeeded);
+}
+
+static uint64_t getSlotStat(int slot, slotStatType stat_type) {
+ kvstoreDictMetadata *meta = getSlotMeta(slot, 0);
+ switch (stat_type) {
+ case KEY_COUNT: return countKeysInSlot(slot);
+ case CPU_USEC: return meta ? meta->cpu_usec : 0;
+ case MEMORY_BYTES: return meta ? meta->alloc_size : 0;
+ case NETWORK_BYTES_IN: return meta ? meta->network_bytes_in : 0;
+ case NETWORK_BYTES_OUT: return meta ? meta->network_bytes_out : 0;
+ default: serverPanic("Invalid slot stat type %d was found.", stat_type);
+ }
+}
+
+/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */
+static int slotStatForSortAscCmp(const void *a, const void *b) {
+ const slotStatForSort *entry_a = a;
+ const slotStatForSort *entry_b = b;
+ if (entry_a->stat == entry_b->stat) {
+ return entry_a->slot - entry_b->slot;
+ }
+ return entry_a->stat - entry_b->stat;
+}
+
+/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */
+static int slotStatForSortDescCmp(const void *a, const void *b) {
+ const slotStatForSort *entry_a = a;
+ const slotStatForSort *entry_b = b;
+ if (entry_b->stat == entry_a->stat) {
+ return entry_a->slot - entry_b->slot;
+ }
+ return entry_b->stat - entry_a->stat;
+}
+
+static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
+ clusterNode *primary = clusterNodeGetMaster(getMyClusterNode());
+ int i = 0;
+ for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
+ if (!clusterNodeCoversSlot(primary, slot)) continue;
+ slot_stats[i].slot = slot;
+ slot_stats[i].stat = getSlotStat(slot, order_by);
+ i++;
+ }
+ qsort(slot_stats, i, sizeof(slotStatForSort), desc ? slotStatForSortDescCmp : slotStatForSortAscCmp);
+}
+
+static void addReplySlotStat(client *c, int slot) {
+ int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU;
+ int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET;
+ int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot;
+
+ addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
+ * and 1st index represents (map) usage statistics. */
+ addReplyLongLong(c, slot);
+ /* Nested map representing slot usage statistics. */
+ addReplyMapLen(c, 1 + /* key-count */
+ (mem_enabled ? 1 : 0) + /* memory-bytes */
+ (cpu_enabled ? 1 : 0) + /* cpu-usec */
+ (net_enabled ? 2 : 0)); /* network-bytes-in/out */
+ addReplyBulkCString(c, "key-count");
+ addReplyLongLong(c, countKeysInSlot(slot));
+
+ /* Any additional metrics aside from key-count come with a performance trade-off,
+ * and are aggregated and returned based on its server config. */
+ kvstoreDictMetadata *meta = getSlotMeta(slot, 0);
+ if (mem_enabled) {
+ addReplyBulkCString(c, "memory-bytes");
+ addReplyLongLong(c, meta ? meta->alloc_size : 0);
+ }
+ if (cpu_enabled) {
+ addReplyBulkCString(c, "cpu-usec");
+ addReplyLongLong(c, meta ? meta->cpu_usec : 0);
+ }
+ if (net_enabled) {
+ addReplyBulkCString(c, "network-bytes-in");
+ addReplyLongLong(c, meta ? meta->network_bytes_in : 0);
+ addReplyBulkCString(c, "network-bytes-out");
+ addReplyLongLong(c, meta ? meta->network_bytes_out : 0);
+ }
+}
+
+/* Adds reply for the SLOTSRANGE variant.
+ * Response is ordered in ascending slot number. */
+static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int start_slot, int end_slot, int len) {
+ addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
+
+ for (int slot = start_slot; slot <= end_slot; slot++) {
+ if (assigned_slots[slot]) addReplySlotStat(c, slot);
+ }
+}
+
+static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
+ int num_slots_assigned = getMyShardSlotCount();
+ int len = min(limit, num_slots_assigned);
+ addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
+
+ for (int i = 0; i < len; i++) {
+ addReplySlotStat(c, slot_stats[i].slot);
+ }
+}
+
+static int canAddNetworkBytesOut(client *c) {
+ return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT;
+}
+
+/* Accumulates egress bytes upon sending RESP responses back to user clients. */
+void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
+ if (!canAddNetworkBytesOut(c)) return;
+
+ serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
+ kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
+ meta->network_bytes_out += c->net_output_bytes_curr_cmd;
+}
+
+/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
+static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
+ client *c = server.current_client;
+ if (c == NULL || !canAddNetworkBytesOut(c)) return;
+
+ /* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */
+ len *= (long long)listLength(server.slaves);
+ serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
+ serverAssert(clusterNodeIsMaster(getMyClusterNode()));
+ kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
+ /* We sometimes want to adjust the counter downwards (for example when we want to undo accounting for
+ * SELECT commands that don't belong to any slot) so let's make sure we don't underflow the counter. */
+ debugServerAssert(len >= 0 || meta->network_bytes_out >= (uint64_t)-len);
+ meta->network_bytes_out += len;
+}
+
+/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica
+ * count. */
+void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) {
+ clusterSlotStatsUpdateNetworkBytesOutForReplication(len);
+}
+
+/* Decrement network bytes out for replication stream.
+ * This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command.
+ * This will decrement `len` value times the active replica count. */
+void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
+ clusterSlotStatsUpdateNetworkBytesOutForReplication(-len);
+}
+
+/* Upon SPUBLISH, two egress events are triggered.
+ * 1) Internal propagation, for clients that are subscribed to the current node.
+ * 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
+ * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
+ * This function covers the internal propagation component. */
+void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
+ /* For a blocked client, c->slot could be pre-filled.
+ * Thus c->slot is backed-up for restoration after aggregation is completed. */
+ int save_slot = c->slot;
+ c->slot = slot;
+ if (canAddNetworkBytesOut(c)) {
+ serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
+ kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
+ meta->network_bytes_out += c->net_output_bytes_curr_cmd;
+ }
+ /* For sharded pubsub, the client's network bytes metrics must be reset here,
+ * as resetClient() is not called until subscription ends. */
+ c->net_output_bytes_curr_cmd = 0;
+ c->slot = save_slot;
+}
+
+/* Adds reply for the ORDERBY variant.
+ * Response is ordered based on the sort result. */
+static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
+ slotStatForSort slot_stats[CLUSTER_SLOTS];
+ collectAndSortSlotStats(slot_stats, order_by, desc);
+ addReplySortedSlotStats(c, slot_stats, limit);
+}
+
+/* Resets applicable slot statistics. */
+void clusterSlotStatReset(int slot) {
+ kvstoreDictMetadata *meta = getSlotMeta(slot, 0);
+ if (!meta) return;
+ meta->cpu_usec = 0;
+ meta->network_bytes_in = 0;
+ meta->network_bytes_out = 0;
+ kvstoreFreeDictIfNeeded(server.db->keys, slot);
+}
+
+void clusterSlotStatResetAll(void) {
+ for (int slot = 0; slot < CLUSTER_SLOTS; slot++)
+ clusterSlotStatReset(slot);
+}
+
+/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped.
+ * This is due to their unique callstack, where the c->duration for
+ * EXEC, EVAL and FCALL already includes all of its nested commands.
+ * Meaning, the accumulation of cpu-usec for these nested commands
+ * would equate to repeating the same calculation twice.
+ */
+static int canAddCpuDuration(client *c) {
+ return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_CPU) && /* CPU tracking should be enabled. */
+ c->slot != INVALID_CLUSTER_SLOT && /* Command should be slot specific. */
+ (!server.execution_nesting || /* Either command should not be nested, */
+ (c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */
+}
+
+void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
+ if (!canAddCpuDuration(c)) return;
+
+ serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
+ kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
+ meta->cpu_usec += duration;
+}
+
+/* For cross-slot scripting, its caller client's slot must be invalidated,
+ * such that its slot-stats aggregation is bypassed. */
+void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
+ if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return;
+
+ ctx->original_client->slot = -1;
+}
+
+static int canAddNetworkBytesIn(client *c) {
+ /* First, network tracking must be enabled.
+ * Second, command should target a specific slot.
+ * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
+ * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
+ * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
+ return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT &&
+ !(c->flags & CLIENT_BLOCKED) && !server.in_exec;
+}
+
+/* Adds network ingress bytes of the current command in execution,
+ * calculated earlier within networking.c layer.
+ *
+ * Note: Below function should only be called once c->slot is parsed.
+ * Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
+ * */
+void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
+ if (!canAddNetworkBytesIn(c)) return;
+
+ if (c->cmd->proc == execCommand) {
+ /* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
+ c->net_input_bytes_curr_cmd += 15;
+ }
+
+ kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
+ meta->network_bytes_in += c->net_input_bytes_curr_cmd;
+}
+
+void clusterSlotStatsCommand(client *c) {
+ if (!server.cluster_enabled) {
+ addReplyError(c, "This instance has cluster support disabled");
+ return;
+ }
+
+ /* Parse additional arguments. */
+ if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
+ /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
+ int start_slot, end_slot;
+ if ((start_slot = getSlotOrReply(c, c->argv[3])) == -1 ||
+ (end_slot = getSlotOrReply(c, c->argv[4])) == -1) {
+ return;
+ }
+ if (start_slot > end_slot) {
+ addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", start_slot, end_slot);
+ return;
+ }
+ /* Initialize slot assignment array. */
+ unsigned char assigned_slots[CLUSTER_SLOTS] = {0};
+ int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, start_slot, end_slot);
+ addReplySlotsRange(c, assigned_slots, start_slot, end_slot, assigned_slots_count);
+
+ } else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
+ /* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
+ int desc = 1;
+ slotStatType order_by = INVALID;
+ int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU;
+ int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET;
+ int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot;
+ if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
+ order_by = KEY_COUNT;
+ } else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && cpu_enabled) {
+ order_by = CPU_USEC;
+ } else if (!strcasecmp(c->argv[3]->ptr, "memory-bytes") && mem_enabled) {
+ order_by = MEMORY_BYTES;
+ } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && net_enabled) {
+ order_by = NETWORK_BYTES_IN;
+ } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && net_enabled) {
+ order_by = NETWORK_BYTES_OUT;
+ } else {
+ addReplyError(c, "Unrecognized sort metric for ORDERBY.");
+ return;
+ }
+ int i = 4; /* Next argument index, following ORDERBY */
+ int limit_counter = 0, asc_desc_counter = 0;
+ long limit = CLUSTER_SLOTS;
+ while (i < c->argc) {
+ int moreargs = c->argc > i + 1;
+ if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
+ if (getRangeLongFromObjectOrReply(
+ c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
+ "Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
+ return;
+ }
+ i++;
+ limit_counter++;
+ } else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
+ desc = 0;
+ asc_desc_counter++;
+ } else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
+ desc = 1;
+ asc_desc_counter++;
+ } else {
+ addReplyErrorObject(c, shared.syntaxerr);
+ return;
+ }
+ if (limit_counter > 1 || asc_desc_counter > 1) {
+ addReplyError(c, "Multiple filters of the same type are disallowed.");
+ return;
+ }
+ i++;
+ }
+ addReplyOrderBy(c, order_by, limit, desc);
+
+ } else {
+ addReplySubcommandSyntaxError(c);
+ }
+}