diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/cluster_slot_stats.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/cluster_slot_stats.c')
| -rw-r--r-- | examples/redis-unstable/src/cluster_slot_stats.c | 373 |
1 files changed, 0 insertions, 373 deletions
diff --git a/examples/redis-unstable/src/cluster_slot_stats.c b/examples/redis-unstable/src/cluster_slot_stats.c deleted file mode 100644 index 3d7c39a..0000000 --- a/examples/redis-unstable/src/cluster_slot_stats.c +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Copyright (c) 2009-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -#include "cluster_slot_stats.h" -#include "cluster.h" - -typedef enum { - KEY_COUNT, - CPU_USEC, - MEMORY_BYTES, - NETWORK_BYTES_IN, - NETWORK_BYTES_OUT, - SLOT_STAT_COUNT, - INVALID -} slotStatType; - -/* ----------------------------------------------------------------------------- - * CLUSTER SLOT-STATS command - * -------------------------------------------------------------------------- */ - -/* Struct used to temporarily hold slot statistics for sorting. */ -typedef struct { - int slot; - uint64_t stat; -} slotStatForSort; - -static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) { - clusterNode *primary = clusterNodeGetMaster(getMyClusterNode()); - int assigned_slots_count = 0; - for (int slot = start_slot; slot <= end_slot; slot++) { - if (!clusterNodeCoversSlot(primary, slot)) continue; - assigned_slots[slot]++; - assigned_slots_count++; - } - return assigned_slots_count; -} - -static inline kvstoreDictMetadata *getSlotMeta(int slot, int createIfNeeded) { - return kvstoreGetDictMeta(server.db->keys, slot, createIfNeeded); -} - -static uint64_t getSlotStat(int slot, slotStatType stat_type) { - kvstoreDictMetadata *meta = getSlotMeta(slot, 0); - switch (stat_type) { - case KEY_COUNT: return countKeysInSlot(slot); - case CPU_USEC: return meta ? meta->cpu_usec : 0; - case MEMORY_BYTES: return meta ? meta->alloc_size : 0; - case NETWORK_BYTES_IN: return meta ? meta->network_bytes_in : 0; - case NETWORK_BYTES_OUT: return meta ? meta->network_bytes_out : 0; - default: serverPanic("Invalid slot stat type %d was found.", stat_type); - } -} - -/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */ -static int slotStatForSortAscCmp(const void *a, const void *b) { - const slotStatForSort *entry_a = a; - const slotStatForSort *entry_b = b; - if (entry_a->stat == entry_b->stat) { - return entry_a->slot - entry_b->slot; - } - return entry_a->stat - entry_b->stat; -} - -/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */ -static int slotStatForSortDescCmp(const void *a, const void *b) { - const slotStatForSort *entry_a = a; - const slotStatForSort *entry_b = b; - if (entry_b->stat == entry_a->stat) { - return entry_a->slot - entry_b->slot; - } - return entry_b->stat - entry_a->stat; -} - -static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) { - clusterNode *primary = clusterNodeGetMaster(getMyClusterNode()); - int i = 0; - for (int slot = 0; slot < CLUSTER_SLOTS; slot++) { - if (!clusterNodeCoversSlot(primary, slot)) continue; - slot_stats[i].slot = slot; - slot_stats[i].stat = getSlotStat(slot, order_by); - i++; - } - qsort(slot_stats, i, sizeof(slotStatForSort), desc ? slotStatForSortDescCmp : slotStatForSortAscCmp); -} - -static void addReplySlotStat(client *c, int slot) { - int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU; - int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET; - int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot; - - addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot, - * and 1st index represents (map) usage statistics. */ - addReplyLongLong(c, slot); - /* Nested map representing slot usage statistics. */ - addReplyMapLen(c, 1 + /* key-count */ - (mem_enabled ? 1 : 0) + /* memory-bytes */ - (cpu_enabled ? 1 : 0) + /* cpu-usec */ - (net_enabled ? 2 : 0)); /* network-bytes-in/out */ - addReplyBulkCString(c, "key-count"); - addReplyLongLong(c, countKeysInSlot(slot)); - - /* Any additional metrics aside from key-count come with a performance trade-off, - * and are aggregated and returned based on its server config. */ - kvstoreDictMetadata *meta = getSlotMeta(slot, 0); - if (mem_enabled) { - addReplyBulkCString(c, "memory-bytes"); - addReplyLongLong(c, meta ? meta->alloc_size : 0); - } - if (cpu_enabled) { - addReplyBulkCString(c, "cpu-usec"); - addReplyLongLong(c, meta ? meta->cpu_usec : 0); - } - if (net_enabled) { - addReplyBulkCString(c, "network-bytes-in"); - addReplyLongLong(c, meta ? meta->network_bytes_in : 0); - addReplyBulkCString(c, "network-bytes-out"); - addReplyLongLong(c, meta ? meta->network_bytes_out : 0); - } -} - -/* Adds reply for the SLOTSRANGE variant. - * Response is ordered in ascending slot number. */ -static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int start_slot, int end_slot, int len) { - addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */ - - for (int slot = start_slot; slot <= end_slot; slot++) { - if (assigned_slots[slot]) addReplySlotStat(c, slot); - } -} - -static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) { - int num_slots_assigned = getMyShardSlotCount(); - int len = min(limit, num_slots_assigned); - addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */ - - for (int i = 0; i < len; i++) { - addReplySlotStat(c, slot_stats[i].slot); - } -} - -static int canAddNetworkBytesOut(client *c) { - return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT; -} - -/* Accumulates egress bytes upon sending RESP responses back to user clients. */ -void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { - if (!canAddNetworkBytesOut(c)) return; - - serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); - kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); - meta->network_bytes_out += c->net_output_bytes_curr_cmd; -} - -/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ -static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { - client *c = server.current_client; - if (c == NULL || !canAddNetworkBytesOut(c)) return; - - /* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */ - len *= (long long)listLength(server.slaves); - serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); - serverAssert(clusterNodeIsMaster(getMyClusterNode())); - kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); - /* We sometimes want to adjust the counter downwards (for example when we want to undo accounting for - * SELECT commands that don't belong to any slot) so let's make sure we don't underflow the counter. */ - debugServerAssert(len >= 0 || meta->network_bytes_out >= (uint64_t)-len); - meta->network_bytes_out += len; -} - -/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica - * count. */ -void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) { - clusterSlotStatsUpdateNetworkBytesOutForReplication(len); -} - -/* Decrement network bytes out for replication stream. - * This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command. - * This will decrement `len` value times the active replica count. */ -void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { - clusterSlotStatsUpdateNetworkBytesOutForReplication(-len); -} - -/* Upon SPUBLISH, two egress events are triggered. - * 1) Internal propagation, for clients that are subscribed to the current node. - * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). - * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. - * This function covers the internal propagation component. */ -void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { - /* For a blocked client, c->slot could be pre-filled. - * Thus c->slot is backed-up for restoration after aggregation is completed. */ - int save_slot = c->slot; - c->slot = slot; - if (canAddNetworkBytesOut(c)) { - serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); - kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); - meta->network_bytes_out += c->net_output_bytes_curr_cmd; - } - /* For sharded pubsub, the client's network bytes metrics must be reset here, - * as resetClient() is not called until subscription ends. */ - c->net_output_bytes_curr_cmd = 0; - c->slot = save_slot; -} - -/* Adds reply for the ORDERBY variant. - * Response is ordered based on the sort result. */ -static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) { - slotStatForSort slot_stats[CLUSTER_SLOTS]; - collectAndSortSlotStats(slot_stats, order_by, desc); - addReplySortedSlotStats(c, slot_stats, limit); -} - -/* Resets applicable slot statistics. */ -void clusterSlotStatReset(int slot) { - kvstoreDictMetadata *meta = getSlotMeta(slot, 0); - if (!meta) return; - meta->cpu_usec = 0; - meta->network_bytes_in = 0; - meta->network_bytes_out = 0; - kvstoreFreeDictIfNeeded(server.db->keys, slot); -} - -void clusterSlotStatResetAll(void) { - for (int slot = 0; slot < CLUSTER_SLOTS; slot++) - clusterSlotStatReset(slot); -} - -/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped. - * This is due to their unique callstack, where the c->duration for - * EXEC, EVAL and FCALL already includes all of its nested commands. - * Meaning, the accumulation of cpu-usec for these nested commands - * would equate to repeating the same calculation twice. - */ -static int canAddCpuDuration(client *c) { - return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_CPU) && /* CPU tracking should be enabled. */ - c->slot != INVALID_CLUSTER_SLOT && /* Command should be slot specific. */ - (!server.execution_nesting || /* Either command should not be nested, */ - (c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */ -} - -void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) { - if (!canAddCpuDuration(c)) return; - - serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); - kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); - meta->cpu_usec += duration; -} - -/* For cross-slot scripting, its caller client's slot must be invalidated, - * such that its slot-stats aggregation is bypassed. */ -void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) { - if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return; - - ctx->original_client->slot = -1; -} - -static int canAddNetworkBytesIn(client *c) { - /* First, network tracking must be enabled. - * Second, command should target a specific slot. - * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. - * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of - * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ - return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT && - !(c->flags & CLIENT_BLOCKED) && !server.in_exec; -} - -/* Adds network ingress bytes of the current command in execution, - * calculated earlier within networking.c layer. - * - * Note: Below function should only be called once c->slot is parsed. - * Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure. - * */ -void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) { - if (!canAddNetworkBytesIn(c)) return; - - if (c->cmd->proc == execCommand) { - /* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */ - c->net_input_bytes_curr_cmd += 15; - } - - kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1); - meta->network_bytes_in += c->net_input_bytes_curr_cmd; -} - -void clusterSlotStatsCommand(client *c) { - if (!server.cluster_enabled) { - addReplyError(c, "This instance has cluster support disabled"); - return; - } - - /* Parse additional arguments. */ - if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) { - /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */ - int start_slot, end_slot; - if ((start_slot = getSlotOrReply(c, c->argv[3])) == -1 || - (end_slot = getSlotOrReply(c, c->argv[4])) == -1) { - return; - } - if (start_slot > end_slot) { - addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", start_slot, end_slot); - return; - } - /* Initialize slot assignment array. */ - unsigned char assigned_slots[CLUSTER_SLOTS] = {0}; - int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, start_slot, end_slot); - addReplySlotsRange(c, assigned_slots, start_slot, end_slot, assigned_slots_count); - - } else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) { - /* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */ - int desc = 1; - slotStatType order_by = INVALID; - int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU; - int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET; - int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot; - if (!strcasecmp(c->argv[3]->ptr, "key-count")) { - order_by = KEY_COUNT; - } else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && cpu_enabled) { - order_by = CPU_USEC; - } else if (!strcasecmp(c->argv[3]->ptr, "memory-bytes") && mem_enabled) { - order_by = MEMORY_BYTES; - } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && net_enabled) { - order_by = NETWORK_BYTES_IN; - } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && net_enabled) { - order_by = NETWORK_BYTES_OUT; - } else { - addReplyError(c, "Unrecognized sort metric for ORDERBY."); - return; - } - int i = 4; /* Next argument index, following ORDERBY */ - int limit_counter = 0, asc_desc_counter = 0; - long limit = CLUSTER_SLOTS; - while (i < c->argc) { - int moreargs = c->argc > i + 1; - if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) { - if (getRangeLongFromObjectOrReply( - c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit, - "Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) { - return; - } - i++; - limit_counter++; - } else if (!strcasecmp(c->argv[i]->ptr, "asc")) { - desc = 0; - asc_desc_counter++; - } else if (!strcasecmp(c->argv[i]->ptr, "desc")) { - desc = 1; - asc_desc_counter++; - } else { - addReplyErrorObject(c, shared.syntaxerr); - return; - } - if (limit_counter > 1 || asc_desc_counter > 1) { - addReplyError(c, "Multiple filters of the same type are disallowed."); - return; - } - i++; - } - addReplyOrderBy(c, order_by, limit, desc); - - } else { - addReplySubcommandSyntaxError(c); - } -} |
