aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/functions.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/functions.c
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/functions.c')
-rw-r--r--examples/redis-unstable/src/functions.c1138
1 files changed, 0 insertions, 1138 deletions
diff --git a/examples/redis-unstable/src/functions.c b/examples/redis-unstable/src/functions.c
deleted file mode 100644
index 58ae815..0000000
--- a/examples/redis-unstable/src/functions.c
+++ /dev/null
@@ -1,1138 +0,0 @@
1/*
2 * Copyright (c) 2011-Present, Redis Ltd.
3 * All rights reserved.
4 *
5 * Licensed under your choice of (a) the Redis Source Available License 2.0
6 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
7 * GNU Affero General Public License v3 (AGPLv3).
8 */
9
10#include "functions.h"
11#include "sds.h"
12#include "dict.h"
13#include "adlist.h"
14#include "atomicvar.h"
15
16#define LOAD_TIMEOUT_MS 500
17
18typedef enum {
19 restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace
20} restorePolicy;
21
22static size_t engine_cache_memory = 0;
23
24/* Forward declaration */
25static void engineFunctionDispose(dict *d, void *obj);
26static void engineStatsDispose(dict *d, void *obj);
27static void engineLibraryDispose(dict *d, void *obj);
28static void engineDispose(dict *d, void *obj);
29static int functionsVerifyName(sds name);
30
31typedef struct functionsLibEngineStats {
32 size_t n_lib;
33 size_t n_functions;
34} functionsLibEngineStats;
35
36struct functionsLibCtx {
37 dict *libraries; /* Library name -> Library object */
38 dict *functions; /* Function name -> Function object that can be used to run the function */
39 size_t cache_memory; /* Overhead memory (structs, dictionaries, ..) used by all the functions */
40 dict *engines_stats; /* Per engine statistics */
41};
42
43typedef struct functionsLibMataData {
44 sds engine;
45 sds name;
46 sds code;
47} functionsLibMataData;
48
49dictType engineDictType = {
50 dictSdsCaseHash, /* hash function */
51 dictSdsDup, /* key dup */
52 NULL, /* val dup */
53 dictSdsKeyCaseCompare, /* key compare */
54 dictSdsDestructor, /* key destructor */
55 engineDispose, /* val destructor */
56 NULL /* allow to expand */
57};
58
59dictType functionDictType = {
60 dictSdsCaseHash, /* hash function */
61 dictSdsDup, /* key dup */
62 NULL, /* val dup */
63 dictSdsKeyCaseCompare,/* key compare */
64 dictSdsDestructor, /* key destructor */
65 NULL, /* val destructor */
66 NULL /* allow to expand */
67};
68
69dictType engineStatsDictType = {
70 dictSdsCaseHash, /* hash function */
71 dictSdsDup, /* key dup */
72 NULL, /* val dup */
73 dictSdsKeyCaseCompare,/* key compare */
74 dictSdsDestructor, /* key destructor */
75 engineStatsDispose, /* val destructor */
76 NULL /* allow to expand */
77};
78
79dictType libraryFunctionDictType = {
80 dictSdsHash, /* hash function */
81 dictSdsDup, /* key dup */
82 NULL, /* val dup */
83 dictSdsKeyCompare, /* key compare */
84 dictSdsDestructor, /* key destructor */
85 engineFunctionDispose,/* val destructor */
86 NULL /* allow to expand */
87};
88
89dictType librariesDictType = {
90 dictSdsHash, /* hash function */
91 dictSdsDup, /* key dup */
92 NULL, /* val dup */
93 dictSdsKeyCompare, /* key compare */
94 dictSdsDestructor, /* key destructor */
95 engineLibraryDispose, /* val destructor */
96 NULL /* allow to expand */
97};
98
99/* Dictionary of engines */
100static dict *engines = NULL;
101
102/* Libraries Ctx. */
103static functionsLibCtx *curr_functions_lib_ctx = NULL;
104
105static size_t functionMallocSize(functionInfo *fi) {
106 return zmalloc_size(fi) + sdsZmallocSize(fi->name)
107 + (fi->desc ? sdsZmallocSize(fi->desc) : 0)
108 + fi->li->ei->engine->get_function_memory_overhead(fi->function);
109}
110
111static size_t libraryMallocSize(functionLibInfo *li) {
112 return zmalloc_size(li) + sdsZmallocSize(li->name)
113 + sdsZmallocSize(li->code);
114}
115
116static void engineStatsDispose(dict *d, void *obj) {
117 UNUSED(d);
118 functionsLibEngineStats *stats = obj;
119 zfree(stats);
120}
121
122/* Dispose function memory */
123static void engineFunctionDispose(dict *d, void *obj) {
124 UNUSED(d);
125 if (!obj) {
126 return;
127 }
128 functionInfo *fi = obj;
129 sdsfree(fi->name);
130 if (fi->desc) {
131 sdsfree(fi->desc);
132 }
133 engine *engine = fi->li->ei->engine;
134 engine->free_function(engine->engine_ctx, fi->function);
135 zfree(fi);
136}
137
138static void engineLibraryFree(functionLibInfo* li) {
139 if (!li) {
140 return;
141 }
142 dictRelease(li->functions);
143 sdsfree(li->name);
144 sdsfree(li->code);
145 zfree(li);
146}
147
148static void engineLibraryFreeGeneric(void *li) {
149 engineLibraryFree((functionLibInfo *)li);
150}
151
152static void engineLibraryDispose(dict *d, void *obj) {
153 UNUSED(d);
154 engineLibraryFree(obj);
155}
156
157static void engineDispose(dict *d, void *obj) {
158 UNUSED(d);
159 engineInfo *ei = obj;
160 freeClient(ei->c);
161 sdsfree(ei->name);
162 ei->engine->free_ctx(ei->engine->engine_ctx);
163 zfree(ei->engine);
164 zfree(ei);
165}
166
167/* Clear all the functions from the given library ctx */
168void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
169 dictEmpty(lib_ctx->functions, NULL);
170 dictEmpty(lib_ctx->libraries, NULL);
171 dictIterator iter;
172 dictEntry *entry = NULL;
173 dictInitIterator(&iter, lib_ctx->engines_stats);
174 while ((entry = dictNext(&iter))) {
175 functionsLibEngineStats *stats = dictGetVal(entry);
176 stats->n_functions = 0;
177 stats->n_lib = 0;
178 }
179 dictResetIterator(&iter);
180 lib_ctx->cache_memory = 0;
181}
182
183void functionsLibCtxClearCurrent(int async) {
184 if (async) {
185 functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
186 dict *old_engines = engines;
187 freeFunctionsAsync(old_l_ctx, old_engines);
188 } else {
189 functionsLibCtxFree(curr_functions_lib_ctx);
190 dictRelease(engines);
191 }
192 functionsInit();
193}
194
195/* Free the given functions ctx */
196void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
197 functionsLibCtxClear(functions_lib_ctx);
198 dictRelease(functions_lib_ctx->functions);
199 dictRelease(functions_lib_ctx->libraries);
200 dictRelease(functions_lib_ctx->engines_stats);
201 zfree(functions_lib_ctx);
202}
203
204/* Swap the current functions ctx with the given one.
205 * Free the old functions ctx. */
206void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) {
207 functionsLibCtxFree(curr_functions_lib_ctx);
208 curr_functions_lib_ctx = new_lib_ctx;
209}
210
211/* return the current functions ctx */
212functionsLibCtx* functionsLibCtxGetCurrent(void) {
213 return curr_functions_lib_ctx;
214}
215
216/* Create a new functions ctx */
217functionsLibCtx* functionsLibCtxCreate(void) {
218 functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx));
219 ret->libraries = dictCreate(&librariesDictType);
220 ret->functions = dictCreate(&functionDictType);
221 ret->engines_stats = dictCreate(&engineStatsDictType);
222 dictIterator iter;
223 dictEntry *entry = NULL;
224 dictInitIterator(&iter, engines);
225 while ((entry = dictNext(&iter))) {
226 engineInfo *ei = dictGetVal(entry);
227 functionsLibEngineStats *stats = zcalloc(sizeof(*stats));
228 dictAdd(ret->engines_stats, ei->name, stats);
229 }
230 dictResetIterator(&iter);
231 ret->cache_memory = 0;
232 return ret;
233}
234
235/*
236 * Creating a function inside the given library.
237 * On success, return C_OK.
238 * On error, return C_ERR and set err output parameter with a relevant error message.
239 *
240 * Note: the code assumes 'name' is NULL terminated but not require it to be binary safe.
241 * the function will verify that the given name is following the naming format
242 * and return an error if its not.
243 */
244int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err) {
245 if (functionsVerifyName(name) != C_OK) {
246 *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long");
247 return C_ERR;
248 }
249
250 if (dictFetchValue(li->functions, name)) {
251 *err = sdsnew("Function already exists in the library");
252 return C_ERR;
253 }
254
255 functionInfo *fi = zmalloc(sizeof(*fi));
256 *fi = (functionInfo) {
257 .name = name,
258 .function = function,
259 .li = li,
260 .desc = desc,
261 .f_flags = f_flags,
262 };
263
264 int res = dictAdd(li->functions, fi->name, fi);
265 serverAssert(res == DICT_OK);
266
267 return C_OK;
268}
269
270static functionLibInfo* engineLibraryCreate(sds name, engineInfo *ei, sds code) {
271 functionLibInfo *li = zmalloc(sizeof(*li));
272 *li = (functionLibInfo) {
273 .name = sdsdup(name),
274 .functions = dictCreate(&libraryFunctionDictType),
275 .ei = ei,
276 .code = sdsdup(code),
277 };
278 return li;
279}
280
281static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo* li) {
282 dictIterator iter;
283 dictEntry *entry = NULL;
284 dictInitIterator(&iter, li->functions);
285 while ((entry = dictNext(&iter))) {
286 functionInfo *fi = dictGetVal(entry);
287 int ret = dictDelete(lib_ctx->functions, fi->name);
288 serverAssert(ret == DICT_OK);
289 lib_ctx->cache_memory -= functionMallocSize(fi);
290 }
291 dictResetIterator(&iter);
292 entry = dictUnlink(lib_ctx->libraries, li->name);
293 dictSetVal(lib_ctx->libraries, entry, NULL);
294 dictFreeUnlinkedEntry(lib_ctx->libraries, entry);
295 lib_ctx->cache_memory -= libraryMallocSize(li);
296
297 /* update stats */
298 functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name);
299 serverAssert(stats);
300 stats->n_lib--;
301 stats->n_functions -= dictSize(li->functions);
302}
303
304static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo* li) {
305 dictIterator iter;
306 dictEntry *entry = NULL;
307 dictInitIterator(&iter, li->functions);
308 while ((entry = dictNext(&iter))) {
309 functionInfo *fi = dictGetVal(entry);
310 dictAdd(lib_ctx->functions, fi->name, fi);
311 lib_ctx->cache_memory += functionMallocSize(fi);
312 }
313 dictResetIterator(&iter);
314
315 dictAdd(lib_ctx->libraries, li->name, li);
316 lib_ctx->cache_memory += libraryMallocSize(li);
317
318 /* update stats */
319 functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name);
320 serverAssert(stats);
321 stats->n_lib++;
322 stats->n_functions += dictSize(li->functions);
323}
324
325/* Takes all libraries from lib_ctx_src and add to lib_ctx_dst.
326 * On collision, if 'replace' argument is true, replace the existing library with the new one.
327 * Otherwise abort and leave 'lib_ctx_dst' and 'lib_ctx_src' untouched.
328 * Return C_OK on success and C_ERR if aborted. If C_ERR is returned, set a relevant
329 * error message on the 'err' out parameter.
330 * */
331static int libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_lib_ctx_src, int replace, sds *err) {
332 int ret = C_ERR;
333 dictIterator iter;
334 /* Stores the libraries we need to replace in case a revert is required.
335 * Only initialized when needed */
336 list *old_libraries_list = NULL;
337 dictEntry *entry = NULL;
338 dictInitIterator(&iter, functions_lib_ctx_src->libraries);
339 while ((entry = dictNext(&iter))) {
340 functionLibInfo *li = dictGetVal(entry);
341 functionLibInfo *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name);
342 if (old_li) {
343 if (!replace) {
344 /* library already exists, failed the restore. */
345 *err = sdscatfmt(sdsempty(), "Library %s already exists", li->name);
346 dictResetIterator(&iter);
347 goto done;
348 } else {
349 if (!old_libraries_list) {
350 old_libraries_list = listCreate();
351 listSetFreeMethod(old_libraries_list, engineLibraryFreeGeneric);
352 }
353 libraryUnlink(functions_lib_ctx_dst, old_li);
354 listAddNodeTail(old_libraries_list, old_li);
355 }
356 }
357 }
358 dictResetIterator(&iter);
359
360 /* Make sure no functions collision */
361 dictInitIterator(&iter, functions_lib_ctx_src->functions);
362 while ((entry = dictNext(&iter))) {
363 functionInfo *fi = dictGetVal(entry);
364 if (dictFetchValue(functions_lib_ctx_dst->functions, fi->name)) {
365 *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
366 dictResetIterator(&iter);
367 goto done;
368 }
369 }
370 dictResetIterator(&iter);
371
372 /* No collision, it is safe to link all the new libraries. */
373 dictInitIterator(&iter, functions_lib_ctx_src->libraries);
374 while ((entry = dictNext(&iter))) {
375 functionLibInfo *li = dictGetVal(entry);
376 libraryLink(functions_lib_ctx_dst, li);
377 dictSetVal(functions_lib_ctx_src->libraries, entry, NULL);
378 }
379 dictResetIterator(&iter);
380
381 functionsLibCtxClear(functions_lib_ctx_src);
382 if (old_libraries_list) {
383 listRelease(old_libraries_list);
384 old_libraries_list = NULL;
385 }
386 ret = C_OK;
387
388done:
389 if (old_libraries_list) {
390 /* Link back all libraries on tmp_l_ctx */
391 while (listLength(old_libraries_list) > 0) {
392 listNode *head = listFirst(old_libraries_list);
393 functionLibInfo *li = listNodeValue(head);
394 listNodeValue(head) = NULL;
395 libraryLink(functions_lib_ctx_dst, li);
396 listDelNode(old_libraries_list, head);
397 }
398 listRelease(old_libraries_list);
399 }
400 return ret;
401}
402
403/* Register an engine, should be called once by the engine on startup and give the following:
404 *
405 * - engine_name - name of the engine to register
406 * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */
407int functionsRegisterEngine(const char *engine_name, engine *engine) {
408 sds engine_name_sds = sdsnew(engine_name);
409 if (dictFetchValue(engines, engine_name_sds)) {
410 serverLog(LL_WARNING, "Same engine was registered twice");
411 sdsfree(engine_name_sds);
412 return C_ERR;
413 }
414
415 client *c = createClient(NULL);
416 c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT);
417 engineInfo *ei = zmalloc(sizeof(*ei));
418 *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,};
419
420 dictAdd(engines, engine_name_sds, ei);
421
422 engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) +
423 zmalloc_size(engine) +
424 engine->get_engine_memory_overhead(engine->engine_ctx);
425
426 return C_OK;
427}
428
429/*
430 * FUNCTION STATS
431 */
432void functionStatsCommand(client *c) {
433 if (scriptIsRunning() && scriptIsEval()) {
434 addReplyErrorObject(c, shared.slowevalerr);
435 return;
436 }
437
438 addReplyMapLen(c, 2);
439
440 addReplyBulkCString(c, "running_script");
441 if (!scriptIsRunning()) {
442 addReplyNull(c);
443 } else {
444 addReplyMapLen(c, 3);
445 addReplyBulkCString(c, "name");
446 addReplyBulkCString(c, scriptCurrFunction());
447 addReplyBulkCString(c, "command");
448 client *script_client = scriptGetCaller();
449 addReplyArrayLen(c, script_client->argc);
450 for (int i = 0 ; i < script_client->argc ; ++i) {
451 addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr));
452 }
453 addReplyBulkCString(c, "duration_ms");
454 addReplyLongLong(c, scriptRunDuration());
455 }
456
457 addReplyBulkCString(c, "engines");
458 addReplyMapLen(c, dictSize(engines));
459 dictIterator iter;
460 dictEntry *entry = NULL;
461 dictInitIterator(&iter, engines);
462 while ((entry = dictNext(&iter))) {
463 engineInfo *ei = dictGetVal(entry);
464 addReplyBulkCString(c, ei->name);
465 addReplyMapLen(c, 2);
466 functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name);
467 addReplyBulkCString(c, "libraries_count");
468 addReplyLongLong(c, e_stats->n_lib);
469 addReplyBulkCString(c, "functions_count");
470 addReplyLongLong(c, e_stats->n_functions);
471 }
472 dictResetIterator(&iter);
473}
474
475static void functionListReplyFlags(client *c, functionInfo *fi) {
476 /* First count the number of flags we have */
477 int flagcount = 0;
478 for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) {
479 if (fi->f_flags & flag->flag) {
480 ++flagcount;
481 }
482 }
483
484 addReplySetLen(c, flagcount);
485
486 for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) {
487 if (fi->f_flags & flag->flag) {
488 addReplyStatus(c, flag->str);
489 }
490 }
491}
492
493/*
494 * FUNCTION LIST [LIBRARYNAME PATTERN] [WITHCODE]
495 *
496 * Return general information about all the libraries:
497 * * Library name
498 * * The engine used to run the Library
499 * * Functions list
500 * * Library code (if WITHCODE is given)
501 *
502 * It is also possible to given library name pattern using
503 * LIBRARYNAME argument, if given, return only libraries
504 * that matches the given pattern.
505 */
506void functionListCommand(client *c) {
507 int with_code = 0;
508 sds library_name = NULL;
509 for (int i = 2 ; i < c->argc ; ++i) {
510 robj *next_arg = c->argv[i];
511 if (!with_code && !strcasecmp(next_arg->ptr, "withcode")) {
512 with_code = 1;
513 continue;
514 }
515 if (!library_name && !strcasecmp(next_arg->ptr, "libraryname")) {
516 if (i >= c->argc - 1) {
517 addReplyError(c, "library name argument was not given");
518 return;
519 }
520 library_name = c->argv[++i]->ptr;
521 continue;
522 }
523 addReplyErrorSds(c, sdscatfmt(sdsempty(), "Unknown argument %s", next_arg->ptr));
524 return;
525 }
526 size_t reply_len = 0;
527 void *len_ptr = NULL;
528 if (library_name) {
529 len_ptr = addReplyDeferredLen(c);
530 } else {
531 /* If no pattern is asked we know the reply len and we can just set it */
532 addReplyArrayLen(c, dictSize(curr_functions_lib_ctx->libraries));
533 }
534 dictIterator iter;
535 dictEntry *entry = NULL;
536 dictInitIterator(&iter, curr_functions_lib_ctx->libraries);
537 while ((entry = dictNext(&iter))) {
538 functionLibInfo *li = dictGetVal(entry);
539 if (library_name) {
540 if (!stringmatchlen(library_name, sdslen(library_name), li->name, sdslen(li->name), 1)) {
541 continue;
542 }
543 }
544 ++reply_len;
545 addReplyMapLen(c, with_code? 4 : 3);
546 addReplyBulkCString(c, "library_name");
547 addReplyBulkCBuffer(c, li->name, sdslen(li->name));
548 addReplyBulkCString(c, "engine");
549 addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name));
550
551 addReplyBulkCString(c, "functions");
552 addReplyArrayLen(c, dictSize(li->functions));
553 dictIterator functions_iter;
554 dictEntry *function_entry = NULL;
555 dictInitIterator(&functions_iter, li->functions);
556 while ((function_entry = dictNext(&functions_iter))) {
557 functionInfo *fi = dictGetVal(function_entry);
558 addReplyMapLen(c, 3);
559 addReplyBulkCString(c, "name");
560 addReplyBulkCBuffer(c, fi->name, sdslen(fi->name));
561 addReplyBulkCString(c, "description");
562 if (fi->desc) {
563 addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc));
564 } else {
565 addReplyNull(c);
566 }
567 addReplyBulkCString(c, "flags");
568 functionListReplyFlags(c, fi);
569 }
570 dictResetIterator(&functions_iter);
571
572 if (with_code) {
573 addReplyBulkCString(c, "library_code");
574 addReplyBulkCBuffer(c, li->code, sdslen(li->code));
575 }
576 }
577 dictResetIterator(&iter);
578 if (len_ptr) {
579 setDeferredArrayLen(c, len_ptr, reply_len);
580 }
581}
582
583/*
584 * FUNCTION DELETE <LIBRARY NAME>
585 */
586void functionDeleteCommand(client *c) {
587 robj *function_name = c->argv[2];
588 functionLibInfo *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr);
589 if (!li) {
590 addReplyError(c, "Library not found");
591 return;
592 }
593
594 libraryUnlink(curr_functions_lib_ctx, li);
595 engineLibraryFree(li);
596 /* Indicate that the command changed the data so it will be replicated and
597 * counted as a data change (for persistence configuration) */
598 server.dirty++;
599 addReply(c, shared.ok);
600}
601
602/* FUNCTION KILL */
603void functionKillCommand(client *c) {
604 scriptKill(c, 0);
605}
606
607/* Try to extract command flags if we can, returns the modified flags.
608 * Note that it does not guarantee the command arguments are right. */
609uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) {
610 robj *function_name = c->argv[1];
611 c->cur_script = dictFind(curr_functions_lib_ctx->functions, function_name->ptr);
612 if (!c->cur_script)
613 return cmd_flags;
614 functionInfo *fi = dictGetVal(c->cur_script);
615 uint64_t script_flags = fi->f_flags;
616 return scriptFlagsToCmdFlags(cmd_flags, script_flags);
617}
618
619static void fcallCommandGeneric(client *c, int ro) {
620 /* Functions need to be fed to monitors before the commands they execute. */
621 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
622
623 robj *function_name = c->argv[1];
624 dictEntry *de = c->cur_script;
625 if (!de)
626 de = dictFind(curr_functions_lib_ctx->functions, function_name->ptr);
627 if (!de) {
628 addReplyError(c, "Function not found");
629 return;
630 }
631 functionInfo *fi = dictGetVal(de);
632 engine *engine = fi->li->ei->engine;
633
634 long long numkeys;
635 /* Get the number of arguments that are keys */
636 if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) {
637 addReplyError(c, "Bad number of keys provided");
638 return;
639 }
640 if (numkeys > (c->argc - 3)) {
641 addReplyError(c, "Number of keys can't be greater than number of args");
642 return;
643 } else if (numkeys < 0) {
644 addReplyError(c, "Number of keys can't be negative");
645 return;
646 }
647
648 scriptRunCtx run_ctx;
649
650 if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK)
651 return;
652
653 engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys,
654 c->argv + 3 + numkeys, c->argc - 3 - numkeys);
655 scriptResetRun(&run_ctx);
656}
657
658/*
659 * FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn>
660 */
661void fcallCommand(client *c) {
662 fcallCommandGeneric(c, 0);
663}
664
665/*
666 * FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn>
667 */
668void fcallroCommand(client *c) {
669 fcallCommandGeneric(c, 1);
670}
671
672/*
673 * Returns a binary payload representing all the libraries.
674 * Can be loaded using FUNCTION RESTORE
675 *
676 * The payload structure is the same as on RDB. Each library
677 * is saved separately with the following information:
678 * * Library name
679 * * Engine name
680 * * Library code
681 * RDB_OPCODE_FUNCTION2 is saved before each library to present
682 * that the payload is a library.
683 * RDB version and crc64 is saved at the end of the payload.
684 * The RDB version is saved for backward compatibility.
685 * crc64 is saved so we can verify the payload content.
686 */
687void createFunctionDumpPayload(rio *payload) {
688 uint64_t crc;
689 unsigned char buf[2];
690
691 rioInitWithBuffer(payload, sdsempty());
692
693 rdbSaveFunctions(payload);
694
695 /* RDB version */
696 buf[0] = RDB_VERSION & 0xff;
697 buf[1] = (RDB_VERSION >> 8) & 0xff;
698 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr, buf, 2);
699
700 /* CRC64 */
701 crc = crc64(0, (unsigned char*) payload->io.buffer.ptr,
702 sdslen(payload->io.buffer.ptr));
703 memrev64ifbe(&crc);
704 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr, &crc, 8);
705}
706
707/*
708 * FUNCTION DUMP
709 */
710void functionDumpCommand(client *c) {
711 rio payload;
712 createFunctionDumpPayload(&payload);
713
714 addReplyBulkSds(c, payload.io.buffer.ptr);
715}
716
717/*
718 * FUNCTION RESTORE <payload> [FLUSH|APPEND|REPLACE]
719 *
720 * Restore the libraries represented by the give payload.
721 * Restore policy to can be given to control how to handle existing libraries (default APPEND):
722 * * FLUSH: delete all existing libraries.
723 * * APPEND: appends the restored libraries to the existing libraries. On collision, abort.
724 * * REPLACE: appends the restored libraries to the existing libraries.
725 * On collision, replace the old libraries with the new libraries.
726 */
727void functionRestoreCommand(client *c) {
728 if (c->argc > 4) {
729 addReplySubcommandSyntaxError(c);
730 return;
731 }
732
733 restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */
734 sds data = c->argv[2]->ptr;
735 size_t data_len = sdslen(data);
736 rio payload;
737 sds err = NULL;
738
739 if (c->argc == 4) {
740 const char *restore_policy_str = c->argv[3]->ptr;
741 if (!strcasecmp(restore_policy_str, "append")) {
742 restore_replicy = restorePolicy_Append;
743 } else if (!strcasecmp(restore_policy_str, "replace")) {
744 restore_replicy = restorePolicy_Replace;
745 } else if (!strcasecmp(restore_policy_str, "flush")) {
746 restore_replicy = restorePolicy_Flush;
747 } else {
748 addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE.");
749 return;
750 }
751 }
752
753 uint16_t rdbver;
754 if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) {
755 addReplyError(c, "DUMP payload version or checksum are wrong");
756 return;
757 }
758
759 functionsLibCtx *functions_lib_ctx = functionsLibCtxCreate();
760 rioInitWithBuffer(&payload, data);
761
762 /* Read until reaching last 10 bytes that should contain RDB version and checksum. */
763 while (data_len - payload.io.buffer.pos > 10) {
764 int type;
765 if ((type = rdbLoadType(&payload)) == -1) {
766 err = sdsnew("can not read data type");
767 goto load_error;
768 }
769 if (type == RDB_OPCODE_FUNCTION_PRE_GA) {
770 err = sdsnew("Pre-GA function format not supported");
771 goto load_error;
772 }
773 if (type != RDB_OPCODE_FUNCTION2) {
774 err = sdsnew("given type is not a function");
775 goto load_error;
776 }
777 if (rdbFunctionLoad(&payload, rdbver, functions_lib_ctx, RDBFLAGS_NONE, &err) != C_OK) {
778 if (!err) {
779 err = sdsnew("failed loading the given functions payload");
780 }
781 goto load_error;
782 }
783 }
784
785 if (restore_replicy == restorePolicy_Flush) {
786 functionsLibCtxSwapWithCurrent(functions_lib_ctx);
787 functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */
788 } else {
789 if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) != C_OK) {
790 goto load_error;
791 }
792 }
793
794 /* Indicate that the command changed the data so it will be replicated and
795 * counted as a data change (for persistence configuration) */
796 server.dirty++;
797
798load_error:
799 if (err) {
800 addReplyErrorSds(c, err);
801 } else {
802 addReply(c, shared.ok);
803 }
804 if (functions_lib_ctx) {
805 functionsLibCtxFree(functions_lib_ctx);
806 }
807}
808
809/* FUNCTION FLUSH [ASYNC | SYNC] */
810void functionFlushCommand(client *c) {
811 if (c->argc > 3) {
812 addReplySubcommandSyntaxError(c);
813 return;
814 }
815 int async = 0;
816 if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) {
817 async = 0;
818 } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) {
819 async = 1;
820 } else if (c->argc == 2) {
821 async = server.lazyfree_lazy_user_flush ? 1 : 0;
822 } else {
823 addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option");
824 return;
825 }
826
827 functionsLibCtxClearCurrent(async);
828
829 /* Indicate that the command changed the data so it will be replicated and
830 * counted as a data change (for persistence configuration) */
831 server.dirty++;
832 addReply(c,shared.ok);
833}
834
835/* FUNCTION HELP */
836void functionHelpCommand(client *c) {
837 const char *help[] = {
838"LOAD [REPLACE] <FUNCTION CODE>",
839" Create a new library with the given library name and code.",
840"DELETE <LIBRARY NAME>",
841" Delete the given library.",
842"LIST [LIBRARYNAME PATTERN] [WITHCODE]",
843" Return general information on all the libraries:",
844" * Library name",
845" * The engine used to run the Library",
846" * Functions list",
847" * Library code (if WITHCODE is given)",
848" It also possible to get only function that matches a pattern using LIBRARYNAME argument.",
849"STATS",
850" Return information about the current function running:",
851" * Function name",
852" * Command used to run the function",
853" * Duration in MS that the function is running",
854" If no function is running, return nil",
855" In addition, returns a list of available engines.",
856"KILL",
857" Kill the current running function.",
858"FLUSH [ASYNC|SYNC]",
859" Delete all the libraries.",
860" When called without the optional mode argument, the behavior is determined by the",
861" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
862" * ASYNC: Asynchronously flush the libraries.",
863" * SYNC: Synchronously flush the libraries.",
864"DUMP",
865" Return a serialized payload representing the current libraries, can be restored using FUNCTION RESTORE command",
866"RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]",
867" Restore the libraries represented by the given payload, it is possible to give a restore policy to",
868" control how to handle existing libraries (default APPEND):",
869" * FLUSH: delete all existing libraries.",
870" * APPEND: appends the restored libraries to the existing libraries. On collision, abort.",
871" * REPLACE: appends the restored libraries to the existing libraries, On collision, replace the old",
872" libraries with the new libraries (notice that even on this option there is a chance of failure",
873" in case of functions name collision with another library).",
874NULL };
875 addReplyHelp(c, help);
876}
877
878/* Verify that the function name is of the format: [a-zA-Z0-9_][a-zA-Z0-9_]? */
879static int functionsVerifyName(sds name) {
880 if (sdslen(name) == 0) {
881 return C_ERR;
882 }
883 for (size_t i = 0 ; i < sdslen(name) ; ++i) {
884 char curr_char = name[i];
885 if ((curr_char >= 'a' && curr_char <= 'z') ||
886 (curr_char >= 'A' && curr_char <= 'Z') ||
887 (curr_char >= '0' && curr_char <= '9') ||
888 (curr_char == '_'))
889 {
890 continue;
891 }
892 return C_ERR;
893 }
894 return C_OK;
895}
896
897int functionExtractLibMetaData(sds payload, functionsLibMataData *md, sds *err) {
898 sds name = NULL;
899 sds engine = NULL;
900 if (strncmp(payload, "#!", 2) != 0) {
901 *err = sdsnew("Missing library metadata");
902 return C_ERR;
903 }
904 char *shebang_end = strchr(payload, '\n');
905 if (shebang_end == NULL) {
906 *err = sdsnew("Invalid library metadata");
907 return C_ERR;
908 }
909 size_t shebang_len = shebang_end - payload;
910 sds shebang = sdsnewlen(payload, shebang_len);
911 int numparts;
912 sds *parts = sdssplitargs(shebang, &numparts);
913 sdsfree(shebang);
914 if (!parts || numparts == 0) {
915 *err = sdsnew("Invalid library metadata");
916 sdsfreesplitres(parts, numparts);
917 return C_ERR;
918 }
919 engine = sdsdup(parts[0]);
920 sdsrange(engine, 2, -1);
921 for (int i = 1 ; i < numparts ; ++i) {
922 sds part = parts[i];
923 if (strncasecmp(part, "name=", 5) == 0) {
924 if (name) {
925 *err = sdscatfmt(sdsempty(), "Invalid metadata value, name argument was given multiple times");
926 goto error;
927 }
928 name = sdsdup(part);
929 sdsrange(name, 5, -1);
930 continue;
931 }
932 *err = sdscatfmt(sdsempty(), "Invalid metadata value given: %s", part);
933 goto error;
934 }
935
936 if (!name) {
937 *err = sdsnew("Library name was not given");
938 goto error;
939 }
940
941 sdsfreesplitres(parts, numparts);
942
943 md->name = name;
944 md->code = sdsnewlen(shebang_end, sdslen(payload) - shebang_len);
945 md->engine = engine;
946
947 return C_OK;
948
949error:
950 if (name) sdsfree(name);
951 if (engine) sdsfree(engine);
952 sdsfreesplitres(parts, numparts);
953 return C_ERR;
954}
955
956void functionFreeLibMetaData(functionsLibMataData *md) {
957 if (md->code) sdsfree(md->code);
958 if (md->name) sdsfree(md->name);
959 if (md->engine) sdsfree(md->engine);
960}
961
962/* Compile and save the given library, return the loaded library name on success
963 * and NULL on failure. In case on failure the err out param is set with relevant error message */
964sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx, size_t timeout) {
965 dictIterator iter;
966 dictEntry *entry = NULL;
967 functionLibInfo *new_li = NULL;
968 functionLibInfo *old_li = NULL;
969 functionsLibMataData md = {0};
970 if (functionExtractLibMetaData(code, &md, err) != C_OK) {
971 return NULL;
972 }
973
974 if (functionsVerifyName(md.name)) {
975 *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long");
976 goto error;
977 }
978
979 engineInfo *ei = dictFetchValue(engines, md.engine);
980 if (!ei) {
981 *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine);
982 goto error;
983 }
984 engine *engine = ei->engine;
985
986 old_li = dictFetchValue(lib_ctx->libraries, md.name);
987 if (old_li && !replace) {
988 old_li = NULL;
989 *err = sdscatfmt(sdsempty(), "Library '%S' already exists", md.name);
990 goto error;
991 }
992
993 if (old_li) {
994 libraryUnlink(lib_ctx, old_li);
995 }
996
997 new_li = engineLibraryCreate(md.name, ei, code);
998 if (engine->create(engine->engine_ctx, new_li, md.code, timeout, err) != C_OK) {
999 goto error;
1000 }
1001
1002 if (dictSize(new_li->functions) == 0) {
1003 *err = sdsnew("No functions registered");
1004 goto error;
1005 }
1006
1007 /* Verify no duplicate functions */
1008 dictInitIterator(&iter, new_li->functions);
1009 while ((entry = dictNext(&iter))) {
1010 functionInfo *fi = dictGetVal(entry);
1011 if (dictFetchValue(lib_ctx->functions, fi->name)) {
1012 /* functions name collision, abort. */
1013 *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
1014 dictResetIterator(&iter);
1015 goto error;
1016 }
1017 }
1018 dictResetIterator(&iter);
1019
1020 libraryLink(lib_ctx, new_li);
1021
1022 if (old_li) {
1023 engineLibraryFree(old_li);
1024 }
1025
1026 sds loaded_lib_name = md.name;
1027 md.name = NULL;
1028 functionFreeLibMetaData(&md);
1029
1030 return loaded_lib_name;
1031
1032error:
1033 if (new_li) engineLibraryFree(new_li);
1034 if (old_li) libraryLink(lib_ctx, old_li);
1035 functionFreeLibMetaData(&md);
1036 return NULL;
1037}
1038
1039/*
1040 * FUNCTION LOAD [REPLACE] <LIBRARY CODE>
1041 * REPLACE - optional, replace existing library
1042 * LIBRARY CODE - library code to pass to the engine
1043 */
1044void functionLoadCommand(client *c) {
1045 int replace = 0;
1046 int argc_pos = 2;
1047 while (argc_pos < c->argc - 1) {
1048 robj *next_arg = c->argv[argc_pos++];
1049 if (!strcasecmp(next_arg->ptr, "replace")) {
1050 replace = 1;
1051 continue;
1052 }
1053 addReplyErrorFormat(c, "Unknown option given: %s", (char*)next_arg->ptr);
1054 return;
1055 }
1056
1057 if (argc_pos >= c->argc) {
1058 addReplyError(c, "Function code is missing");
1059 return;
1060 }
1061
1062 robj *code = c->argv[argc_pos];
1063 sds err = NULL;
1064 sds library_name = NULL;
1065 size_t timeout = LOAD_TIMEOUT_MS;
1066 if (mustObeyClient(c)) {
1067 timeout = 0;
1068 }
1069 if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx, timeout)))
1070 {
1071 addReplyErrorSds(c, err);
1072 return;
1073 }
1074 /* Indicate that the command changed the data so it will be replicated and
1075 * counted as a data change (for persistence configuration) */
1076 server.dirty++;
1077 addReplyBulkSds(c, library_name);
1078}
1079
1080/* Return memory usage of all the engines combine */
1081unsigned long functionsMemoryVM(void) {
1082 dictIterator iter;
1083 dictEntry *entry = NULL;
1084 size_t engines_memory = 0;
1085
1086 dictInitIterator(&iter, engines);
1087 while ((entry = dictNext(&iter))) {
1088 engineInfo *ei = dictGetVal(entry);
1089 engine *engine = ei->engine;
1090 engines_memory += engine->get_used_memory(engine->engine_ctx);
1091 }
1092 dictResetIterator(&iter);
1093
1094 return engines_memory;
1095}
1096
1097/* Return memory overhead of all the engines combine */
1098unsigned long functionsMemoryEngine(void) {
1099 size_t memory_overhead = dictMemUsage(engines);
1100 memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions);
1101 memory_overhead += sizeof(functionsLibCtx);
1102 memory_overhead += curr_functions_lib_ctx->cache_memory;
1103 memory_overhead += engine_cache_memory;
1104
1105 return memory_overhead;
1106}
1107
1108/* Returns the number of functions */
1109unsigned long functionsNum(void) {
1110 return dictSize(curr_functions_lib_ctx->functions);
1111}
1112
1113unsigned long functionsLibNum(void) {
1114 return dictSize(curr_functions_lib_ctx->libraries);
1115}
1116
1117dict* functionsLibGet(void) {
1118 return curr_functions_lib_ctx->libraries;
1119}
1120
1121size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx) {
1122 return dictSize(functions_ctx->functions);
1123}
1124
1125/* Initialize engine data structures.
1126 * Should be called once on server initialization */
1127int functionsInit(void) {
1128 engines = dictCreate(&engineDictType);
1129
1130 if (luaEngineInitEngine() != C_OK) {
1131 return C_ERR;
1132 }
1133
1134 /* Must be initialized after engines initialization */
1135 curr_functions_lib_ctx = functionsLibCtxCreate();
1136
1137 return C_OK;
1138}