summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/hotkeys.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/hotkeys.c')
-rw-r--r--examples/redis-unstable/src/hotkeys.c614
1 files changed, 614 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/hotkeys.c b/examples/redis-unstable/src/hotkeys.c
new file mode 100644
index 0000000..9b6726f
--- /dev/null
+++ b/examples/redis-unstable/src/hotkeys.c
@@ -0,0 +1,614 @@
1/* Hotkey tracking related functionality
2 *
3 * Copyright (c) 2026-Present, Redis Ltd.
4 * All rights reserved.
5 *
6 * Licensed under your choice of (a) the Redis Source Available License 2.0
7 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
8 * GNU Affero General Public License v3 (AGPLv3).
9 */
10
11#include "server.h"
12#include "chk.h"
13#include "cluster.h"
14#include <sys/resource.h>
15
16static inline int nearestNextPowerOf2(unsigned int count) {
17 if (count <= 1) return 1;
18 return 1 << (32 - __builtin_clz(count-1));
19}
20
21/* Initialize the hotkeys structure and start tracking. If tracking keys in
22 * specific slots is desired the user should pass along an already allocated and
23 * populated slots array. The hotkeys structure takes ownership of the array and
24 * will free it upon release. On failure the slots memory is released. */
25hotkeyStats *hotkeyStatsCreate(int count, int duration, int sample_ratio,
26 int *slots, int slots_count, uint64_t tracked_metrics)
27{
28 serverAssert(tracked_metrics & (HOTKEYS_TRACK_CPU | HOTKEYS_TRACK_NET));
29
30 hotkeyStats *hotkeys = zcalloc(sizeof(hotkeyStats));
31
32 /* We track count * 10 keys for better accuracy. Numbuckets is roughly 10
33 * times the elements we track (actually num_buckets == 7-8 * count is
34 * enough) again for better accuracy. Note the CHK implementation uses a
35 * power of 2 numbuckets for better cache locality. */
36 if (tracked_metrics & HOTKEYS_TRACK_CPU)
37 hotkeys->cpu = chkTopKCreate(count * 10, nearestNextPowerOf2((unsigned)count * 100), 1.08);
38
39 if (tracked_metrics & HOTKEYS_TRACK_NET)
40 hotkeys->net = chkTopKCreate(count * 10, nearestNextPowerOf2((unsigned)count * 100), 1.08);
41
42 hotkeys->tracked_metrics = tracked_metrics;
43 hotkeys->tracking_count = count;
44 hotkeys->duration = duration;
45 hotkeys->sample_ratio = sample_ratio;
46 hotkeys->slots = slots;
47 hotkeys->numslots = slots_count;
48 hotkeys->active = 1;
49 hotkeys->keys_result = (getKeysResult)GETKEYS_RESULT_INIT;
50 hotkeys->start = server.mstime;
51
52 /* Store initial rusage for CPU time tracking */
53 struct rusage rusage;
54 getrusage(RUSAGE_SELF, &rusage);
55 hotkeys->ru_utime = rusage.ru_utime;
56 hotkeys->ru_stime = rusage.ru_stime;
57
58 return hotkeys;
59}
60
61void hotkeyStatsRelease(hotkeyStats *hotkeys) {
62 if (!hotkeys) return;
63 if (hotkeys->cpu) chkTopKRelease(hotkeys->cpu);
64 if (hotkeys->net) chkTopKRelease(hotkeys->net);
65 zfree(hotkeys->slots);
66 getKeysFreeResult(&hotkeys->keys_result);
67
68 zfree(hotkeys);
69}
70
71/* Helper function for hotkey tracking to check if a slot is in the selected
72 * slots list. If numslots is 0 then all slots are selected. */
73static inline int isSlotSelected(hotkeyStats *hotkeys, int slot) {
74 if (hotkeys->numslots == 0) return 1;
75 for (int i = 0; i < hotkeys->numslots; i++) {
76 if (hotkeys->slots[i] == slot) return 1;
77 }
78 return 0;
79}
80
81/* Preparation for updates of the hotkeyStats for the current command, f.e
82 * cache the current client and the getKeysResult. */
83void hotkeyStatsPreCurrentCmd(hotkeyStats *hotkeys, client *c) {
84 if (!hotkeys || !hotkeys->active) return;
85
86 robj **argv = c->original_argv ? c->original_argv : c->argv;
87 int argc = c->original_argv ? c->original_argc : c->argc;
88
89 hotkeys->keys_result = (getKeysResult)GETKEYS_RESULT_INIT;
90 if (getKeysFromCommandWithSpecs(c->realcmd, argv, argc, GET_KEYSPEC_DEFAULT,
91 &hotkeys->keys_result) == 0)
92 {
93 return;
94 }
95
96 /* Check if command is sampled */
97 hotkeys->is_sampled = 1;
98 if (hotkeys->sample_ratio > 1 &&
99 (double)rand() / RAND_MAX >= 1.0 / hotkeys->sample_ratio)
100 {
101 hotkeys->is_sampled = 0;
102 }
103
104 hotkeys->is_in_selected_slots = isSlotSelected(hotkeys, c->slot);
105
106 hotkeys->current_client = c;
107}
108
109/* Update the hotkeyStats with passed metrics. This can be called multiple times
110 * between the calls to hotkeyStatsPreCurrentCmd and hotkeyStatsPostCurrentCmd */
111void hotkeyStatsUpdateCurrentCmd(hotkeyStats *hotkeys, hotkeyMetrics metrics) {
112 if (!hotkeys || !hotkeys->active) return;
113 if (hotkeys->keys_result.numkeys == 0) return;
114
115 /* Don't update stats for nested calls */
116 if (server.execution_nesting) return;
117
118 serverAssert(hotkeys->current_client);
119
120 int numkeys = hotkeys->keys_result.numkeys;
121 uint64_t duration_per_key = metrics.cpu_time_usec / numkeys;
122 uint64_t total_bytes = metrics.net_bytes;
123 uint64_t bytes_per_key = total_bytes / numkeys;
124
125 /* Update statistics counters */
126 hotkeys->time_all_commands_all_slots += metrics.cpu_time_usec;
127 hotkeys->net_bytes_all_commands_all_slots += total_bytes;
128
129 if (hotkeys->is_in_selected_slots) {
130 hotkeys->time_all_commands_selected_slots += metrics.cpu_time_usec;
131 hotkeys->net_bytes_all_commands_selected_slots += total_bytes;
132
133 if (hotkeys->is_sampled && hotkeys->sample_ratio > 1) {
134 hotkeys->time_sampled_commands_selected_slots += metrics.cpu_time_usec;
135 hotkeys->net_bytes_sampled_commands_selected_slots += total_bytes;
136 }
137 }
138
139 /* Only add keys to topK structure if command was sampled and is in selected
140 * slots. */
141 if (!hotkeys->is_sampled || !hotkeys->is_in_selected_slots) {
142 return;
143 }
144
145 mstime_t start_time = ustime();
146
147 /* Keys we've cached in the keys_result only track positions in the client's
148 * argv array so we must fetch it. */
149 client *c = hotkeys->current_client;
150 robj **argv = c->original_argv ? c->original_argv : c->argv;
151
152 /* Add all keys to topK structure */
153 for (int i = 0; i < numkeys; ++i) {
154 int pos = hotkeys->keys_result.keys[i].pos;
155
156 if (hotkeys->tracked_metrics & HOTKEYS_TRACK_CPU) {
157 sds ret = chkTopKUpdate(hotkeys->cpu, argv[pos]->ptr, sdslen(argv[pos]->ptr), duration_per_key);
158 if (ret) sdsfree(ret);
159 }
160
161 if (hotkeys->tracked_metrics & HOTKEYS_TRACK_NET) {
162 sds ret = chkTopKUpdate(hotkeys->net, argv[pos]->ptr, sdslen(argv[pos]->ptr), bytes_per_key);
163 if (ret) sdsfree(ret);
164 }
165 }
166
167 /* Track CPU time spent updating the topk structures. */
168 mstime_t end_time = ustime();
169 hotkeys->cpu_time += (end_time - start_time)/1000;
170}
171
172/* Some cleanup work for hotkeyStats after the command has finished execution */
173void hotkeyStatsPostCurrentCmd(hotkeyStats *hotkeys) {
174 if (!hotkeys || !hotkeys->active) return;
175
176 getKeysFreeResult(&hotkeys->keys_result);
177 hotkeys->keys_result = (getKeysResult)GETKEYS_RESULT_INIT;
178
179 hotkeys->current_client = NULL;
180 hotkeys->is_sampled = 0;
181 hotkeys->is_in_selected_slots = 0;
182}
183
184size_t hotkeysGetMemoryUsage(hotkeyStats *hotkeys) {
185 if (!hotkeys) return 0;
186
187 size_t memory_usage = sizeof(hotkeyStats);
188 if (hotkeys->cpu) {
189 memory_usage += chkTopKGetMemoryUsage(hotkeys->cpu);
190 }
191 if (hotkeys->net) {
192 memory_usage += chkTopKGetMemoryUsage(hotkeys->net);
193 }
194 /* Add memory for slots array if present */
195 if (hotkeys->slots) {
196 memory_usage += sizeof(int) * hotkeys->numslots;
197 }
198
199 return memory_usage;
200}
201
202static int64_t time_diff_ms(struct timeval a, struct timeval b) {
203 int64_t sec = (int64_t)(a.tv_sec - b.tv_sec);
204 int64_t usec = (int64_t)(a.tv_usec - b.tv_usec);
205
206 if (usec < 0) {
207 sec--;
208 usec += 1000000;
209 }
210
211 return sec * 1000 + usec / 1000;
212}
213
214/* HOTKEYS command implementation
215 *
216 * HOTKEYS START
217 * <METRICS count [CPU] [NET]>
218 * [COUNT k]
219 * [DURATION duration]
220 * [SAMPLE ratio]
221 * [SLOTS count slot…]
222 * HOTKEYS STOP
223 * HOTKEYS RESET
224 * HOTKEYS GET
225 */
226void hotkeysCommand(client *c) {
227 if (c->argc < 2) {
228 addReplyError(c, "HOTKEYS subcommand required");
229 return;
230 }
231
232 char *sub = c->argv[1]->ptr;
233
234 if (!strcasecmp(sub, "START")) {
235 /* HOTKEYS START
236 * <METRICS count [CPU] [NET]>
237 * [COUNT k]
238 * [DURATION seconds]
239 * [SAMPLE ratio]
240 * [SLOTS count slot…] */
241 /* Return error if a session is already started */
242 if (server.hotkeys && server.hotkeys->active) {
243 addReplyError(c, "hotkey tracking session already in progress");
244 return;
245 }
246
247 /* METRICS is required and must be the first argument */
248 if (c->argc < 4 || strcasecmp(c->argv[2]->ptr, "METRICS")) {
249 addReplyError(c, "METRICS parameter is required");
250 return;
251 }
252
253 long metrics_count;
254 char errmsg[128];
255 snprintf(errmsg, 128, "METRICS count must be > 0 and <= %d", HOTKEYS_METRICS_COUNT);
256 if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, HOTKEYS_METRICS_COUNT,
257 &metrics_count, errmsg) != C_OK)
258 {
259 return;
260 }
261
262 uint64_t tracked_metrics = 0;
263
264 int j = 4;
265
266 /* Parse CPU and NET tokens */
267 int metrics_parsed = 0;
268 int valid_metrics = 0;
269 while (j < c->argc && metrics_parsed < metrics_count) {
270 if (!strcasecmp(c->argv[j]->ptr, "CPU")) {
271 if (tracked_metrics & HOTKEYS_TRACK_CPU) {
272 addReplyError(c, "METRICS CPU defined more than once!");
273 return;
274 }
275 tracked_metrics |= HOTKEYS_TRACK_CPU;
276 ++valid_metrics;
277 } else if (!strcasecmp(c->argv[j]->ptr, "NET")) {
278 if (tracked_metrics & HOTKEYS_TRACK_NET) {
279 addReplyError(c, "METRICS NET defined more than once!");
280 return;
281 }
282 tracked_metrics |= HOTKEYS_TRACK_NET;
283 ++valid_metrics;
284 }
285 ++metrics_parsed;
286 ++j;
287 }
288
289 if (metrics_parsed != metrics_count) {
290 addReplyError(c, "METRICS count does not match number of metric types provided");
291 return;
292 }
293
294 if (valid_metrics == 0) {
295 addReplyError(c, "METRICS no valid metrics passed. Supported: CPU|NET");
296 return;
297 }
298
299 int count = 10; /* default */
300 long duration = 0; /* default: no auto-stop */
301 int sample_ratio = 1; /* default: track every key */
302 int slots_count = 0;
303 int *slots = NULL;
304 while (j < c->argc) {
305 int moreargs = (c->argc-1) - j;
306 if (moreargs && !strcasecmp(c->argv[j]->ptr, "COUNT")) {
307 long count_val;
308 if (getRangeLongFromObjectOrReply(c, c->argv[j+1], 1, 64,
309 &count_val, "COUNT must be between 1 and 64") != C_OK)
310 {
311 zfree(slots);
312 return;
313 }
314 count = (int)count_val;
315 j += 2;
316 } else if (moreargs && !strcasecmp(c->argv[j]->ptr, "DURATION")) {
317 /* Arbitrary 1 million seconds limit, so we don't overflow the
318 * duration member which is kept in milliseconds */
319 if (getRangeLongFromObjectOrReply(c, c->argv[j+1], 1, 1000000,
320 &duration, "DURATION must be between 1 and 1000000") != C_OK)
321 {
322 zfree(slots);
323 return;
324 }
325 duration *= 1000;
326 j += 2;
327 } else if (moreargs && !strcasecmp(c->argv[j]->ptr, "SAMPLE")) {
328 long ratio_val;
329 if (getRangeLongFromObjectOrReply(c, c->argv[j+1], 1, INT_MAX,
330 &ratio_val, "SAMPLE ratio must be positive") != C_OK)
331 {
332 zfree(slots);
333 return;
334 }
335 sample_ratio = (int)ratio_val;
336 j += 2;
337 } else if (moreargs && !strcasecmp(c->argv[j]->ptr, "SLOTS")) {
338 if (slots) {
339 addReplyError(c, "SLOTS parameter already specified");
340 zfree(slots);
341 return;
342 }
343 long slots_count_val;
344 char msg[64];
345 snprintf(msg, 64, "SLOTS count must be between 1 and %d",
346 CLUSTER_SLOTS);
347 if (getRangeLongFromObjectOrReply(c, c->argv[j+1], 1,
348 CLUSTER_SLOTS, &slots_count_val, msg) != C_OK)
349 {
350 return;
351 }
352 slots_count = (int)slots_count_val;
353
354 /* Parse slot numbers */
355 if (j + 1 + slots_count >= c->argc) {
356 addReplyError(c, "not enough slot numbers provided");
357 return;
358 }
359 slots = zmalloc(sizeof(int) * slots_count);
360 for (int i = 0; i < slots_count; i++) {
361 long slot_val;
362 if ((slot_val = getSlotOrReply(c, c->argv[j+2+i])) == -1) {
363 zfree(slots);
364 return;
365 }
366 /* Check for duplicate slot indices */
367 for (int k = 0; k < i; ++k) {
368 if (slots[k] == slot_val) {
369 addReplyError(c, "duplicate slot number");
370 zfree(slots);
371 return;
372 }
373 }
374
375 slots[i] = (int)slot_val;
376 }
377 j += 2 + slots_count;
378 } else {
379 addReplyError(c, "syntax error");
380 if (slots) zfree(slots);
381 return;
382 }
383 }
384
385 hotkeyStats *hotkeys = hotkeyStatsCreate(count, duration, sample_ratio,
386 slots, slots_count, tracked_metrics);
387
388 hotkeyStatsRelease(server.hotkeys);
389 server.hotkeys = hotkeys;
390
391 addReply(c, shared.ok);
392
393 } else if (!strcasecmp(sub, "STOP")) {
394 /* HOTKEYS STOP */
395 if (c->argc != 2) {
396 addReplyError(c, "wrong number of arguments for 'hotkeys|stop' command");
397 return;
398 }
399
400 if (!server.hotkeys || !server.hotkeys->active) {
401 addReplyNull(c);
402 return;
403 }
404
405 server.hotkeys->active = 0;
406 server.hotkeys->duration = server.mstime - server.hotkeys->start;
407 addReply(c, shared.ok);
408
409 } else if (!strcasecmp(sub, "GET")) {
410 /* HOTKEYS GET */
411 if (c->argc != 2) {
412 addReplyError(c, "wrong number of arguments for 'hotkeys|get' command");
413 return;
414 }
415
416 /* If no tracking is started, return (nil) */
417 if (!server.hotkeys) {
418 addReplyNull(c);
419 return;
420 }
421
422 serverAssert(server.hotkeys->tracked_metrics);
423
424 /* Calculate duration */
425 int duration = 0;
426 if (!server.hotkeys->active) {
427 duration = server.hotkeys->duration;
428 } else {
429 duration = server.mstime - server.hotkeys->start;
430 }
431
432 /* Get total CPU time using rusage (RUSAGE_SELF) -
433 * only if CPU tracking is enabled */
434 uint64_t total_cpu_user_msec = 0;
435 uint64_t total_cpu_sys_msec = 0;
436 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_CPU) {
437 struct rusage current_ru;
438 getrusage(RUSAGE_SELF, &current_ru);
439
440 /* Calculate difference in user and sys time */
441 total_cpu_user_msec = time_diff_ms(current_ru.ru_utime, server.hotkeys->ru_utime);
442 total_cpu_sys_msec = time_diff_ms(current_ru.ru_stime, server.hotkeys->ru_stime);
443 }
444
445 /* Get totals and lists for enabled metrics */
446 uint64_t total_net_bytes = 0;
447 chkHeapBucket *cpu = NULL;
448 chkHeapBucket *net = NULL;
449 int cpu_count = 0;
450 int net_count = 0;
451
452 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_CPU) {
453 cpu = chkTopKList(server.hotkeys->cpu);
454 for (int i = 0; i < server.hotkeys->tracking_count; ++i) {
455 if (cpu[i].count == 0) break;
456 cpu_count++;
457 }
458 }
459
460 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_NET) {
461 total_net_bytes = server.hotkeys->net->total;
462 net = chkTopKList(server.hotkeys->net);
463 for (int i = 0; i < server.hotkeys->tracking_count; ++i) {
464 if (net[i].count == 0) break;
465 net_count++;
466 }
467 }
468
469 int has_selected_slots = (server.hotkeys->numslots > 0);
470 int has_sampling = (server.hotkeys->sample_ratio > 1);
471
472 int total_len = 14;
473 void *arraylenptr = addReplyDeferredLen(c);
474
475 /* tracking-active */
476 addReplyBulkCString(c, "tracking-active");
477 addReplyLongLong(c, server.hotkeys->active ? 1 : 0);
478
479 /* sample-ratio */
480 addReplyBulkCString(c, "sample-ratio");
481 addReplyLongLong(c, server.hotkeys->sample_ratio);
482
483 /* selected-slots */
484 addReplyBulkCString(c, "selected-slots");
485 addReplyArrayLen(c, server.hotkeys->numslots);
486 for (int i = 0; i < server.hotkeys->numslots; i++) {
487 addReplyLongLong(c, server.hotkeys->slots[i]);
488 }
489
490 /* sampled-command-selected-slots-ms (conditional) */
491 if (has_sampling && has_selected_slots) {
492 addReplyBulkCString(c, "sampled-command-selected-slots-ms");
493 addReplyLongLong(c, server.hotkeys->time_sampled_commands_selected_slots / 1000);
494
495 total_len += 2;
496 }
497
498 /* all-commands-selected-slots-ms (conditional) */
499 if (has_selected_slots) {
500 addReplyBulkCString(c, "all-commands-selected-slots-ms");
501 addReplyLongLong(c, server.hotkeys->time_all_commands_selected_slots / 1000);
502
503 total_len += 2;
504 }
505
506 /* all-commands-all-slots-ms */
507 addReplyBulkCString(c, "all-commands-all-slots-ms");
508 addReplyLongLong(c, server.hotkeys->time_all_commands_all_slots / 1000);
509
510 /* net-bytes-sampled-commands-selected-slots (conditional) */
511 if (has_sampling && has_selected_slots) {
512 addReplyBulkCString(c, "net-bytes-sampled-commands-selected-slots");
513 addReplyLongLong(c, server.hotkeys->net_bytes_sampled_commands_selected_slots);
514
515 total_len += 2;
516 }
517
518 /* net-bytes-all-commands-selected-slots (conditional) */
519 if (has_selected_slots) {
520 addReplyBulkCString(c, "net-bytes-all-commands-selected-slots");
521 addReplyLongLong(c,
522 server.hotkeys->net_bytes_all_commands_selected_slots);
523
524 total_len += 2;
525 }
526
527 /* net-bytes-all-commands-all-slots */
528 addReplyBulkCString(c, "net-bytes-all-commands-all-slots");
529 addReplyLongLong(c, server.hotkeys->net_bytes_all_commands_all_slots);
530
531 /* collection-start-time-unix-ms */
532 addReplyBulkCString(c, "collection-start-time-unix-ms");
533 addReplyLongLong(c, server.hotkeys->start);
534
535 /* collection-duration-ms */
536 addReplyBulkCString(c, "collection-duration-ms");
537 addReplyLongLong(c, duration);
538
539 /* total-cpu-time-user-ms (in milliseconds) - only if CPU tracking is enabled */
540 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_CPU) {
541 addReplyBulkCString(c, "total-cpu-time-user-ms");
542 addReplyLongLong(c, total_cpu_user_msec);
543
544 /* total-cpu-time-sys-ms (in milliseconds) */
545 addReplyBulkCString(c, "total-cpu-time-sys-ms");
546 addReplyLongLong(c, total_cpu_sys_msec);
547
548 total_len += 4;
549 }
550
551 /* total-net-bytes - only if NET tracking is enabled */
552 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_NET) {
553 addReplyBulkCString(c, "total-net-bytes");
554 addReplyLongLong(c, total_net_bytes);
555
556 total_len += 2;
557 }
558
559 /* by-cpu-time - only if CPU tracking is enabled */
560 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_CPU) {
561 addReplyBulkCString(c, "by-cpu-time");
562 /* Nested array of key-value pairs */
563 addReplyArrayLen(c, 2 * cpu_count);
564 for (int i = 0; i < cpu_count; ++i) {
565 addReplyBulkCBuffer(c, cpu[i].item, sdslen(cpu[i].item));
566 /* Return raw microsec value */
567 addReplyLongLong(c, cpu[i].count);
568 }
569 zfree(cpu);
570
571 total_len += 2;
572 }
573
574 /* by-net-bytes - only if NET tracking is enabled */
575 if (server.hotkeys->tracked_metrics & HOTKEYS_TRACK_NET) {
576 addReplyBulkCString(c, "by-net-bytes");
577 /* Nested array of key-value pairs */
578 addReplyArrayLen(c, 2 * net_count);
579 for (int i = 0; i < net_count; ++i) {
580 addReplyBulkCBuffer(c, net[i].item, sdslen(net[i].item));
581 /* Return raw byte value */
582 addReplyLongLong(c, net[i].count);
583 }
584 zfree(net);
585
586 total_len += 2;
587 }
588
589 setDeferredArrayLen(c, arraylenptr, total_len);
590
591 } else if (!strcasecmp(sub, "RESET")) {
592 /* HOTKEYS RESET */
593 if (c->argc != 2) {
594 addReplyError(c,
595 "wrong number of arguments for 'hotkeys|reset' command");
596 return;
597 }
598
599 /* Return error if session is in progress and not yet completed */
600 if (server.hotkeys && server.hotkeys->active) {
601 addReplyError(c,
602 "hotkey tracking session in progress, stop tracking first");
603 return;
604 }
605
606 /* Release the resources used for hotkey tracking */
607 hotkeyStatsRelease(server.hotkeys);
608 server.hotkeys = NULL;
609
610 addReply(c, shared.ok);
611 } else {
612 addReplyError(c, "unknown subcommand or wrong number of arguments");
613 }
614}