1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#include "cluster_slot_stats.h"
#include "cluster.h"
typedef enum {
KEY_COUNT,
CPU_USEC,
MEMORY_BYTES,
NETWORK_BYTES_IN,
NETWORK_BYTES_OUT,
SLOT_STAT_COUNT,
INVALID
} slotStatType;
/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */
/* Struct used to temporarily hold slot statistics for sorting. */
typedef struct {
int slot;
uint64_t stat;
} slotStatForSort;
static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
clusterNode *primary = clusterNodeGetMaster(getMyClusterNode());
int assigned_slots_count = 0;
for (int slot = start_slot; slot <= end_slot; slot++) {
if (!clusterNodeCoversSlot(primary, slot)) continue;
assigned_slots[slot]++;
assigned_slots_count++;
}
return assigned_slots_count;
}
static inline kvstoreDictMetadata *getSlotMeta(int slot, int createIfNeeded) {
return kvstoreGetDictMeta(server.db->keys, slot, createIfNeeded);
}
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
kvstoreDictMetadata *meta = getSlotMeta(slot, 0);
switch (stat_type) {
case KEY_COUNT: return countKeysInSlot(slot);
case CPU_USEC: return meta ? meta->cpu_usec : 0;
case MEMORY_BYTES: return meta ? meta->alloc_size : 0;
case NETWORK_BYTES_IN: return meta ? meta->network_bytes_in : 0;
case NETWORK_BYTES_OUT: return meta ? meta->network_bytes_out : 0;
default: serverPanic("Invalid slot stat type %d was found.", stat_type);
}
}
/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortAscCmp(const void *a, const void *b) {
const slotStatForSort *entry_a = a;
const slotStatForSort *entry_b = b;
if (entry_a->stat == entry_b->stat) {
return entry_a->slot - entry_b->slot;
}
return entry_a->stat - entry_b->stat;
}
/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortDescCmp(const void *a, const void *b) {
const slotStatForSort *entry_a = a;
const slotStatForSort *entry_b = b;
if (entry_b->stat == entry_a->stat) {
return entry_a->slot - entry_b->slot;
}
return entry_b->stat - entry_a->stat;
}
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
clusterNode *primary = clusterNodeGetMaster(getMyClusterNode());
int i = 0;
for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (!clusterNodeCoversSlot(primary, slot)) continue;
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
qsort(slot_stats, i, sizeof(slotStatForSort), desc ? slotStatForSortDescCmp : slotStatForSortAscCmp);
}
static void addReplySlotStat(client *c, int slot) {
int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU;
int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET;
int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot;
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
/* Nested map representing slot usage statistics. */
addReplyMapLen(c, 1 + /* key-count */
(mem_enabled ? 1 : 0) + /* memory-bytes */
(cpu_enabled ? 1 : 0) + /* cpu-usec */
(net_enabled ? 2 : 0)); /* network-bytes-in/out */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
/* Any additional metrics aside from key-count come with a performance trade-off,
* and are aggregated and returned based on its server config. */
kvstoreDictMetadata *meta = getSlotMeta(slot, 0);
if (mem_enabled) {
addReplyBulkCString(c, "memory-bytes");
addReplyLongLong(c, meta ? meta->alloc_size : 0);
}
if (cpu_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, meta ? meta->cpu_usec : 0);
}
if (net_enabled) {
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, meta ? meta->network_bytes_in : 0);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, meta ? meta->network_bytes_out : 0);
}
}
/* Adds reply for the SLOTSRANGE variant.
* Response is ordered in ascending slot number. */
static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int start_slot, int end_slot, int len) {
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
for (int slot = start_slot; slot <= end_slot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}
static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}
static int canAddNetworkBytesOut(client *c) {
return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT;
}
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
meta->network_bytes_out += c->net_output_bytes_curr_cmd;
}
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
/* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */
len *= (long long)listLength(server.slaves);
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(clusterNodeIsMaster(getMyClusterNode()));
kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
/* We sometimes want to adjust the counter downwards (for example when we want to undo accounting for
* SELECT commands that don't belong to any slot) so let's make sure we don't underflow the counter. */
debugServerAssert(len >= 0 || meta->network_bytes_out >= (uint64_t)-len);
meta->network_bytes_out += len;
}
/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica
* count. */
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(len);
}
/* Decrement network bytes out for replication stream.
* This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command.
* This will decrement `len` value times the active replica count. */
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(-len);
}
/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int save_slot = c->slot;
c->slot = slot;
if (canAddNetworkBytesOut(c)) {
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
meta->network_bytes_out += c->net_output_bytes_curr_cmd;
}
/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = save_slot;
}
/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}
/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
kvstoreDictMetadata *meta = getSlotMeta(slot, 0);
if (!meta) return;
meta->cpu_usec = 0;
meta->network_bytes_in = 0;
meta->network_bytes_out = 0;
kvstoreFreeDictIfNeeded(server.db->keys, slot);
}
void clusterSlotStatResetAll(void) {
for (int slot = 0; slot < CLUSTER_SLOTS; slot++)
clusterSlotStatReset(slot);
}
/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped.
* This is due to their unique callstack, where the c->duration for
* EXEC, EVAL and FCALL already includes all of its nested commands.
* Meaning, the accumulation of cpu-usec for these nested commands
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_CPU) && /* CPU tracking should be enabled. */
c->slot != INVALID_CLUSTER_SLOT && /* Command should be slot specific. */
(!server.execution_nesting || /* Either command should not be nested, */
(c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */
}
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
if (!canAddCpuDuration(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
meta->cpu_usec += duration;
}
/* For cross-slot scripting, its caller client's slot must be invalidated,
* such that its slot-stats aggregation is bypassed. */
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return;
ctx->original_client->slot = -1;
}
static int canAddNetworkBytesIn(client *c) {
/* First, network tracking must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET) && c->slot != INVALID_CLUSTER_SLOT &&
!(c->flags & CLIENT_BLOCKED) && !server.in_exec;
}
/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;
if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}
kvstoreDictMetadata *meta = getSlotMeta(c->slot, 1);
meta->network_bytes_in += c->net_input_bytes_curr_cmd;
}
void clusterSlotStatsCommand(client *c) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int start_slot, end_slot;
if ((start_slot = getSlotOrReply(c, c->argv[3])) == -1 ||
(end_slot = getSlotOrReply(c, c->argv[4])) == -1) {
return;
}
if (start_slot > end_slot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", start_slot, end_slot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS] = {0};
int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, start_slot, end_slot);
addReplySlotsRange(c, assigned_slots, start_slot, end_slot, assigned_slots_count);
} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1;
slotStatType order_by = INVALID;
int cpu_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_CPU;
int net_enabled = server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_NET;
int mem_enabled = (server.cluster_slot_stats_enabled & CLUSTER_SLOT_STATS_MEM) && server.memory_tracking_per_slot;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && cpu_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "memory-bytes") && mem_enabled) {
order_by = MEMORY_BYTES;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && net_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && net_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit = CLUSTER_SLOTS;
while (i < c->argc) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
return;
}
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
addReplyOrderBy(c, order_by, limit, desc);
} else {
addReplySubcommandSyntaxError(c);
}
}
|