diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/functions.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/functions.c')
| -rw-r--r-- | examples/redis-unstable/src/functions.c | 1138 |
1 files changed, 0 insertions, 1138 deletions
diff --git a/examples/redis-unstable/src/functions.c b/examples/redis-unstable/src/functions.c deleted file mode 100644 index 58ae815..0000000 --- a/examples/redis-unstable/src/functions.c +++ /dev/null | |||
| @@ -1,1138 +0,0 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (c) 2011-Present, Redis Ltd. | ||
| 3 | * All rights reserved. | ||
| 4 | * | ||
| 5 | * Licensed under your choice of (a) the Redis Source Available License 2.0 | ||
| 6 | * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the | ||
| 7 | * GNU Affero General Public License v3 (AGPLv3). | ||
| 8 | */ | ||
| 9 | |||
| 10 | #include "functions.h" | ||
| 11 | #include "sds.h" | ||
| 12 | #include "dict.h" | ||
| 13 | #include "adlist.h" | ||
| 14 | #include "atomicvar.h" | ||
| 15 | |||
| 16 | #define LOAD_TIMEOUT_MS 500 | ||
| 17 | |||
| 18 | typedef enum { | ||
| 19 | restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace | ||
| 20 | } restorePolicy; | ||
| 21 | |||
| 22 | static size_t engine_cache_memory = 0; | ||
| 23 | |||
| 24 | /* Forward declaration */ | ||
| 25 | static void engineFunctionDispose(dict *d, void *obj); | ||
| 26 | static void engineStatsDispose(dict *d, void *obj); | ||
| 27 | static void engineLibraryDispose(dict *d, void *obj); | ||
| 28 | static void engineDispose(dict *d, void *obj); | ||
| 29 | static int functionsVerifyName(sds name); | ||
| 30 | |||
| 31 | typedef struct functionsLibEngineStats { | ||
| 32 | size_t n_lib; | ||
| 33 | size_t n_functions; | ||
| 34 | } functionsLibEngineStats; | ||
| 35 | |||
| 36 | struct functionsLibCtx { | ||
| 37 | dict *libraries; /* Library name -> Library object */ | ||
| 38 | dict *functions; /* Function name -> Function object that can be used to run the function */ | ||
| 39 | size_t cache_memory; /* Overhead memory (structs, dictionaries, ..) used by all the functions */ | ||
| 40 | dict *engines_stats; /* Per engine statistics */ | ||
| 41 | }; | ||
| 42 | |||
| 43 | typedef struct functionsLibMataData { | ||
| 44 | sds engine; | ||
| 45 | sds name; | ||
| 46 | sds code; | ||
| 47 | } functionsLibMataData; | ||
| 48 | |||
| 49 | dictType engineDictType = { | ||
| 50 | dictSdsCaseHash, /* hash function */ | ||
| 51 | dictSdsDup, /* key dup */ | ||
| 52 | NULL, /* val dup */ | ||
| 53 | dictSdsKeyCaseCompare, /* key compare */ | ||
| 54 | dictSdsDestructor, /* key destructor */ | ||
| 55 | engineDispose, /* val destructor */ | ||
| 56 | NULL /* allow to expand */ | ||
| 57 | }; | ||
| 58 | |||
| 59 | dictType functionDictType = { | ||
| 60 | dictSdsCaseHash, /* hash function */ | ||
| 61 | dictSdsDup, /* key dup */ | ||
| 62 | NULL, /* val dup */ | ||
| 63 | dictSdsKeyCaseCompare,/* key compare */ | ||
| 64 | dictSdsDestructor, /* key destructor */ | ||
| 65 | NULL, /* val destructor */ | ||
| 66 | NULL /* allow to expand */ | ||
| 67 | }; | ||
| 68 | |||
| 69 | dictType engineStatsDictType = { | ||
| 70 | dictSdsCaseHash, /* hash function */ | ||
| 71 | dictSdsDup, /* key dup */ | ||
| 72 | NULL, /* val dup */ | ||
| 73 | dictSdsKeyCaseCompare,/* key compare */ | ||
| 74 | dictSdsDestructor, /* key destructor */ | ||
| 75 | engineStatsDispose, /* val destructor */ | ||
| 76 | NULL /* allow to expand */ | ||
| 77 | }; | ||
| 78 | |||
| 79 | dictType libraryFunctionDictType = { | ||
| 80 | dictSdsHash, /* hash function */ | ||
| 81 | dictSdsDup, /* key dup */ | ||
| 82 | NULL, /* val dup */ | ||
| 83 | dictSdsKeyCompare, /* key compare */ | ||
| 84 | dictSdsDestructor, /* key destructor */ | ||
| 85 | engineFunctionDispose,/* val destructor */ | ||
| 86 | NULL /* allow to expand */ | ||
| 87 | }; | ||
| 88 | |||
| 89 | dictType librariesDictType = { | ||
| 90 | dictSdsHash, /* hash function */ | ||
| 91 | dictSdsDup, /* key dup */ | ||
| 92 | NULL, /* val dup */ | ||
| 93 | dictSdsKeyCompare, /* key compare */ | ||
| 94 | dictSdsDestructor, /* key destructor */ | ||
| 95 | engineLibraryDispose, /* val destructor */ | ||
| 96 | NULL /* allow to expand */ | ||
| 97 | }; | ||
| 98 | |||
| 99 | /* Dictionary of engines */ | ||
| 100 | static dict *engines = NULL; | ||
| 101 | |||
| 102 | /* Libraries Ctx. */ | ||
| 103 | static functionsLibCtx *curr_functions_lib_ctx = NULL; | ||
| 104 | |||
| 105 | static size_t functionMallocSize(functionInfo *fi) { | ||
| 106 | return zmalloc_size(fi) + sdsZmallocSize(fi->name) | ||
| 107 | + (fi->desc ? sdsZmallocSize(fi->desc) : 0) | ||
| 108 | + fi->li->ei->engine->get_function_memory_overhead(fi->function); | ||
| 109 | } | ||
| 110 | |||
| 111 | static size_t libraryMallocSize(functionLibInfo *li) { | ||
| 112 | return zmalloc_size(li) + sdsZmallocSize(li->name) | ||
| 113 | + sdsZmallocSize(li->code); | ||
| 114 | } | ||
| 115 | |||
| 116 | static void engineStatsDispose(dict *d, void *obj) { | ||
| 117 | UNUSED(d); | ||
| 118 | functionsLibEngineStats *stats = obj; | ||
| 119 | zfree(stats); | ||
| 120 | } | ||
| 121 | |||
| 122 | /* Dispose function memory */ | ||
| 123 | static void engineFunctionDispose(dict *d, void *obj) { | ||
| 124 | UNUSED(d); | ||
| 125 | if (!obj) { | ||
| 126 | return; | ||
| 127 | } | ||
| 128 | functionInfo *fi = obj; | ||
| 129 | sdsfree(fi->name); | ||
| 130 | if (fi->desc) { | ||
| 131 | sdsfree(fi->desc); | ||
| 132 | } | ||
| 133 | engine *engine = fi->li->ei->engine; | ||
| 134 | engine->free_function(engine->engine_ctx, fi->function); | ||
| 135 | zfree(fi); | ||
| 136 | } | ||
| 137 | |||
| 138 | static void engineLibraryFree(functionLibInfo* li) { | ||
| 139 | if (!li) { | ||
| 140 | return; | ||
| 141 | } | ||
| 142 | dictRelease(li->functions); | ||
| 143 | sdsfree(li->name); | ||
| 144 | sdsfree(li->code); | ||
| 145 | zfree(li); | ||
| 146 | } | ||
| 147 | |||
| 148 | static void engineLibraryFreeGeneric(void *li) { | ||
| 149 | engineLibraryFree((functionLibInfo *)li); | ||
| 150 | } | ||
| 151 | |||
| 152 | static void engineLibraryDispose(dict *d, void *obj) { | ||
| 153 | UNUSED(d); | ||
| 154 | engineLibraryFree(obj); | ||
| 155 | } | ||
| 156 | |||
| 157 | static void engineDispose(dict *d, void *obj) { | ||
| 158 | UNUSED(d); | ||
| 159 | engineInfo *ei = obj; | ||
| 160 | freeClient(ei->c); | ||
| 161 | sdsfree(ei->name); | ||
| 162 | ei->engine->free_ctx(ei->engine->engine_ctx); | ||
| 163 | zfree(ei->engine); | ||
| 164 | zfree(ei); | ||
| 165 | } | ||
| 166 | |||
| 167 | /* Clear all the functions from the given library ctx */ | ||
| 168 | void functionsLibCtxClear(functionsLibCtx *lib_ctx) { | ||
| 169 | dictEmpty(lib_ctx->functions, NULL); | ||
| 170 | dictEmpty(lib_ctx->libraries, NULL); | ||
| 171 | dictIterator iter; | ||
| 172 | dictEntry *entry = NULL; | ||
| 173 | dictInitIterator(&iter, lib_ctx->engines_stats); | ||
| 174 | while ((entry = dictNext(&iter))) { | ||
| 175 | functionsLibEngineStats *stats = dictGetVal(entry); | ||
| 176 | stats->n_functions = 0; | ||
| 177 | stats->n_lib = 0; | ||
| 178 | } | ||
| 179 | dictResetIterator(&iter); | ||
| 180 | lib_ctx->cache_memory = 0; | ||
| 181 | } | ||
| 182 | |||
| 183 | void functionsLibCtxClearCurrent(int async) { | ||
| 184 | if (async) { | ||
| 185 | functionsLibCtx *old_l_ctx = curr_functions_lib_ctx; | ||
| 186 | dict *old_engines = engines; | ||
| 187 | freeFunctionsAsync(old_l_ctx, old_engines); | ||
| 188 | } else { | ||
| 189 | functionsLibCtxFree(curr_functions_lib_ctx); | ||
| 190 | dictRelease(engines); | ||
| 191 | } | ||
| 192 | functionsInit(); | ||
| 193 | } | ||
| 194 | |||
| 195 | /* Free the given functions ctx */ | ||
| 196 | void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) { | ||
| 197 | functionsLibCtxClear(functions_lib_ctx); | ||
| 198 | dictRelease(functions_lib_ctx->functions); | ||
| 199 | dictRelease(functions_lib_ctx->libraries); | ||
| 200 | dictRelease(functions_lib_ctx->engines_stats); | ||
| 201 | zfree(functions_lib_ctx); | ||
| 202 | } | ||
| 203 | |||
| 204 | /* Swap the current functions ctx with the given one. | ||
| 205 | * Free the old functions ctx. */ | ||
| 206 | void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) { | ||
| 207 | functionsLibCtxFree(curr_functions_lib_ctx); | ||
| 208 | curr_functions_lib_ctx = new_lib_ctx; | ||
| 209 | } | ||
| 210 | |||
| 211 | /* return the current functions ctx */ | ||
| 212 | functionsLibCtx* functionsLibCtxGetCurrent(void) { | ||
| 213 | return curr_functions_lib_ctx; | ||
| 214 | } | ||
| 215 | |||
| 216 | /* Create a new functions ctx */ | ||
| 217 | functionsLibCtx* functionsLibCtxCreate(void) { | ||
| 218 | functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx)); | ||
| 219 | ret->libraries = dictCreate(&librariesDictType); | ||
| 220 | ret->functions = dictCreate(&functionDictType); | ||
| 221 | ret->engines_stats = dictCreate(&engineStatsDictType); | ||
| 222 | dictIterator iter; | ||
| 223 | dictEntry *entry = NULL; | ||
| 224 | dictInitIterator(&iter, engines); | ||
| 225 | while ((entry = dictNext(&iter))) { | ||
| 226 | engineInfo *ei = dictGetVal(entry); | ||
| 227 | functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); | ||
| 228 | dictAdd(ret->engines_stats, ei->name, stats); | ||
| 229 | } | ||
| 230 | dictResetIterator(&iter); | ||
| 231 | ret->cache_memory = 0; | ||
| 232 | return ret; | ||
| 233 | } | ||
| 234 | |||
| 235 | /* | ||
| 236 | * Creating a function inside the given library. | ||
| 237 | * On success, return C_OK. | ||
| 238 | * On error, return C_ERR and set err output parameter with a relevant error message. | ||
| 239 | * | ||
| 240 | * Note: the code assumes 'name' is NULL terminated but not require it to be binary safe. | ||
| 241 | * the function will verify that the given name is following the naming format | ||
| 242 | * and return an error if its not. | ||
| 243 | */ | ||
| 244 | int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err) { | ||
| 245 | if (functionsVerifyName(name) != C_OK) { | ||
| 246 | *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long"); | ||
| 247 | return C_ERR; | ||
| 248 | } | ||
| 249 | |||
| 250 | if (dictFetchValue(li->functions, name)) { | ||
| 251 | *err = sdsnew("Function already exists in the library"); | ||
| 252 | return C_ERR; | ||
| 253 | } | ||
| 254 | |||
| 255 | functionInfo *fi = zmalloc(sizeof(*fi)); | ||
| 256 | *fi = (functionInfo) { | ||
| 257 | .name = name, | ||
| 258 | .function = function, | ||
| 259 | .li = li, | ||
| 260 | .desc = desc, | ||
| 261 | .f_flags = f_flags, | ||
| 262 | }; | ||
| 263 | |||
| 264 | int res = dictAdd(li->functions, fi->name, fi); | ||
| 265 | serverAssert(res == DICT_OK); | ||
| 266 | |||
| 267 | return C_OK; | ||
| 268 | } | ||
| 269 | |||
| 270 | static functionLibInfo* engineLibraryCreate(sds name, engineInfo *ei, sds code) { | ||
| 271 | functionLibInfo *li = zmalloc(sizeof(*li)); | ||
| 272 | *li = (functionLibInfo) { | ||
| 273 | .name = sdsdup(name), | ||
| 274 | .functions = dictCreate(&libraryFunctionDictType), | ||
| 275 | .ei = ei, | ||
| 276 | .code = sdsdup(code), | ||
| 277 | }; | ||
| 278 | return li; | ||
| 279 | } | ||
| 280 | |||
| 281 | static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo* li) { | ||
| 282 | dictIterator iter; | ||
| 283 | dictEntry *entry = NULL; | ||
| 284 | dictInitIterator(&iter, li->functions); | ||
| 285 | while ((entry = dictNext(&iter))) { | ||
| 286 | functionInfo *fi = dictGetVal(entry); | ||
| 287 | int ret = dictDelete(lib_ctx->functions, fi->name); | ||
| 288 | serverAssert(ret == DICT_OK); | ||
| 289 | lib_ctx->cache_memory -= functionMallocSize(fi); | ||
| 290 | } | ||
| 291 | dictResetIterator(&iter); | ||
| 292 | entry = dictUnlink(lib_ctx->libraries, li->name); | ||
| 293 | dictSetVal(lib_ctx->libraries, entry, NULL); | ||
| 294 | dictFreeUnlinkedEntry(lib_ctx->libraries, entry); | ||
| 295 | lib_ctx->cache_memory -= libraryMallocSize(li); | ||
| 296 | |||
| 297 | /* update stats */ | ||
| 298 | functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); | ||
| 299 | serverAssert(stats); | ||
| 300 | stats->n_lib--; | ||
| 301 | stats->n_functions -= dictSize(li->functions); | ||
| 302 | } | ||
| 303 | |||
| 304 | static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo* li) { | ||
| 305 | dictIterator iter; | ||
| 306 | dictEntry *entry = NULL; | ||
| 307 | dictInitIterator(&iter, li->functions); | ||
| 308 | while ((entry = dictNext(&iter))) { | ||
| 309 | functionInfo *fi = dictGetVal(entry); | ||
| 310 | dictAdd(lib_ctx->functions, fi->name, fi); | ||
| 311 | lib_ctx->cache_memory += functionMallocSize(fi); | ||
| 312 | } | ||
| 313 | dictResetIterator(&iter); | ||
| 314 | |||
| 315 | dictAdd(lib_ctx->libraries, li->name, li); | ||
| 316 | lib_ctx->cache_memory += libraryMallocSize(li); | ||
| 317 | |||
| 318 | /* update stats */ | ||
| 319 | functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); | ||
| 320 | serverAssert(stats); | ||
| 321 | stats->n_lib++; | ||
| 322 | stats->n_functions += dictSize(li->functions); | ||
| 323 | } | ||
| 324 | |||
| 325 | /* Takes all libraries from lib_ctx_src and add to lib_ctx_dst. | ||
| 326 | * On collision, if 'replace' argument is true, replace the existing library with the new one. | ||
| 327 | * Otherwise abort and leave 'lib_ctx_dst' and 'lib_ctx_src' untouched. | ||
| 328 | * Return C_OK on success and C_ERR if aborted. If C_ERR is returned, set a relevant | ||
| 329 | * error message on the 'err' out parameter. | ||
| 330 | * */ | ||
| 331 | static int libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_lib_ctx_src, int replace, sds *err) { | ||
| 332 | int ret = C_ERR; | ||
| 333 | dictIterator iter; | ||
| 334 | /* Stores the libraries we need to replace in case a revert is required. | ||
| 335 | * Only initialized when needed */ | ||
| 336 | list *old_libraries_list = NULL; | ||
| 337 | dictEntry *entry = NULL; | ||
| 338 | dictInitIterator(&iter, functions_lib_ctx_src->libraries); | ||
| 339 | while ((entry = dictNext(&iter))) { | ||
| 340 | functionLibInfo *li = dictGetVal(entry); | ||
| 341 | functionLibInfo *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name); | ||
| 342 | if (old_li) { | ||
| 343 | if (!replace) { | ||
| 344 | /* library already exists, failed the restore. */ | ||
| 345 | *err = sdscatfmt(sdsempty(), "Library %s already exists", li->name); | ||
| 346 | dictResetIterator(&iter); | ||
| 347 | goto done; | ||
| 348 | } else { | ||
| 349 | if (!old_libraries_list) { | ||
| 350 | old_libraries_list = listCreate(); | ||
| 351 | listSetFreeMethod(old_libraries_list, engineLibraryFreeGeneric); | ||
| 352 | } | ||
| 353 | libraryUnlink(functions_lib_ctx_dst, old_li); | ||
| 354 | listAddNodeTail(old_libraries_list, old_li); | ||
| 355 | } | ||
| 356 | } | ||
| 357 | } | ||
| 358 | dictResetIterator(&iter); | ||
| 359 | |||
| 360 | /* Make sure no functions collision */ | ||
| 361 | dictInitIterator(&iter, functions_lib_ctx_src->functions); | ||
| 362 | while ((entry = dictNext(&iter))) { | ||
| 363 | functionInfo *fi = dictGetVal(entry); | ||
| 364 | if (dictFetchValue(functions_lib_ctx_dst->functions, fi->name)) { | ||
| 365 | *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); | ||
| 366 | dictResetIterator(&iter); | ||
| 367 | goto done; | ||
| 368 | } | ||
| 369 | } | ||
| 370 | dictResetIterator(&iter); | ||
| 371 | |||
| 372 | /* No collision, it is safe to link all the new libraries. */ | ||
| 373 | dictInitIterator(&iter, functions_lib_ctx_src->libraries); | ||
| 374 | while ((entry = dictNext(&iter))) { | ||
| 375 | functionLibInfo *li = dictGetVal(entry); | ||
| 376 | libraryLink(functions_lib_ctx_dst, li); | ||
| 377 | dictSetVal(functions_lib_ctx_src->libraries, entry, NULL); | ||
| 378 | } | ||
| 379 | dictResetIterator(&iter); | ||
| 380 | |||
| 381 | functionsLibCtxClear(functions_lib_ctx_src); | ||
| 382 | if (old_libraries_list) { | ||
| 383 | listRelease(old_libraries_list); | ||
| 384 | old_libraries_list = NULL; | ||
| 385 | } | ||
| 386 | ret = C_OK; | ||
| 387 | |||
| 388 | done: | ||
| 389 | if (old_libraries_list) { | ||
| 390 | /* Link back all libraries on tmp_l_ctx */ | ||
| 391 | while (listLength(old_libraries_list) > 0) { | ||
| 392 | listNode *head = listFirst(old_libraries_list); | ||
| 393 | functionLibInfo *li = listNodeValue(head); | ||
| 394 | listNodeValue(head) = NULL; | ||
| 395 | libraryLink(functions_lib_ctx_dst, li); | ||
| 396 | listDelNode(old_libraries_list, head); | ||
| 397 | } | ||
| 398 | listRelease(old_libraries_list); | ||
| 399 | } | ||
| 400 | return ret; | ||
| 401 | } | ||
| 402 | |||
| 403 | /* Register an engine, should be called once by the engine on startup and give the following: | ||
| 404 | * | ||
| 405 | * - engine_name - name of the engine to register | ||
| 406 | * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */ | ||
| 407 | int functionsRegisterEngine(const char *engine_name, engine *engine) { | ||
| 408 | sds engine_name_sds = sdsnew(engine_name); | ||
| 409 | if (dictFetchValue(engines, engine_name_sds)) { | ||
| 410 | serverLog(LL_WARNING, "Same engine was registered twice"); | ||
| 411 | sdsfree(engine_name_sds); | ||
| 412 | return C_ERR; | ||
| 413 | } | ||
| 414 | |||
| 415 | client *c = createClient(NULL); | ||
| 416 | c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT); | ||
| 417 | engineInfo *ei = zmalloc(sizeof(*ei)); | ||
| 418 | *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,}; | ||
| 419 | |||
| 420 | dictAdd(engines, engine_name_sds, ei); | ||
| 421 | |||
| 422 | engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) + | ||
| 423 | zmalloc_size(engine) + | ||
| 424 | engine->get_engine_memory_overhead(engine->engine_ctx); | ||
| 425 | |||
| 426 | return C_OK; | ||
| 427 | } | ||
| 428 | |||
| 429 | /* | ||
| 430 | * FUNCTION STATS | ||
| 431 | */ | ||
| 432 | void functionStatsCommand(client *c) { | ||
| 433 | if (scriptIsRunning() && scriptIsEval()) { | ||
| 434 | addReplyErrorObject(c, shared.slowevalerr); | ||
| 435 | return; | ||
| 436 | } | ||
| 437 | |||
| 438 | addReplyMapLen(c, 2); | ||
| 439 | |||
| 440 | addReplyBulkCString(c, "running_script"); | ||
| 441 | if (!scriptIsRunning()) { | ||
| 442 | addReplyNull(c); | ||
| 443 | } else { | ||
| 444 | addReplyMapLen(c, 3); | ||
| 445 | addReplyBulkCString(c, "name"); | ||
| 446 | addReplyBulkCString(c, scriptCurrFunction()); | ||
| 447 | addReplyBulkCString(c, "command"); | ||
| 448 | client *script_client = scriptGetCaller(); | ||
| 449 | addReplyArrayLen(c, script_client->argc); | ||
| 450 | for (int i = 0 ; i < script_client->argc ; ++i) { | ||
| 451 | addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr)); | ||
| 452 | } | ||
| 453 | addReplyBulkCString(c, "duration_ms"); | ||
| 454 | addReplyLongLong(c, scriptRunDuration()); | ||
| 455 | } | ||
| 456 | |||
| 457 | addReplyBulkCString(c, "engines"); | ||
| 458 | addReplyMapLen(c, dictSize(engines)); | ||
| 459 | dictIterator iter; | ||
| 460 | dictEntry *entry = NULL; | ||
| 461 | dictInitIterator(&iter, engines); | ||
| 462 | while ((entry = dictNext(&iter))) { | ||
| 463 | engineInfo *ei = dictGetVal(entry); | ||
| 464 | addReplyBulkCString(c, ei->name); | ||
| 465 | addReplyMapLen(c, 2); | ||
| 466 | functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name); | ||
| 467 | addReplyBulkCString(c, "libraries_count"); | ||
| 468 | addReplyLongLong(c, e_stats->n_lib); | ||
| 469 | addReplyBulkCString(c, "functions_count"); | ||
| 470 | addReplyLongLong(c, e_stats->n_functions); | ||
| 471 | } | ||
| 472 | dictResetIterator(&iter); | ||
| 473 | } | ||
| 474 | |||
| 475 | static void functionListReplyFlags(client *c, functionInfo *fi) { | ||
| 476 | /* First count the number of flags we have */ | ||
| 477 | int flagcount = 0; | ||
| 478 | for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) { | ||
| 479 | if (fi->f_flags & flag->flag) { | ||
| 480 | ++flagcount; | ||
| 481 | } | ||
| 482 | } | ||
| 483 | |||
| 484 | addReplySetLen(c, flagcount); | ||
| 485 | |||
| 486 | for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) { | ||
| 487 | if (fi->f_flags & flag->flag) { | ||
| 488 | addReplyStatus(c, flag->str); | ||
| 489 | } | ||
| 490 | } | ||
| 491 | } | ||
| 492 | |||
| 493 | /* | ||
| 494 | * FUNCTION LIST [LIBRARYNAME PATTERN] [WITHCODE] | ||
| 495 | * | ||
| 496 | * Return general information about all the libraries: | ||
| 497 | * * Library name | ||
| 498 | * * The engine used to run the Library | ||
| 499 | * * Functions list | ||
| 500 | * * Library code (if WITHCODE is given) | ||
| 501 | * | ||
| 502 | * It is also possible to given library name pattern using | ||
| 503 | * LIBRARYNAME argument, if given, return only libraries | ||
| 504 | * that matches the given pattern. | ||
| 505 | */ | ||
| 506 | void functionListCommand(client *c) { | ||
| 507 | int with_code = 0; | ||
| 508 | sds library_name = NULL; | ||
| 509 | for (int i = 2 ; i < c->argc ; ++i) { | ||
| 510 | robj *next_arg = c->argv[i]; | ||
| 511 | if (!with_code && !strcasecmp(next_arg->ptr, "withcode")) { | ||
| 512 | with_code = 1; | ||
| 513 | continue; | ||
| 514 | } | ||
| 515 | if (!library_name && !strcasecmp(next_arg->ptr, "libraryname")) { | ||
| 516 | if (i >= c->argc - 1) { | ||
| 517 | addReplyError(c, "library name argument was not given"); | ||
| 518 | return; | ||
| 519 | } | ||
| 520 | library_name = c->argv[++i]->ptr; | ||
| 521 | continue; | ||
| 522 | } | ||
| 523 | addReplyErrorSds(c, sdscatfmt(sdsempty(), "Unknown argument %s", next_arg->ptr)); | ||
| 524 | return; | ||
| 525 | } | ||
| 526 | size_t reply_len = 0; | ||
| 527 | void *len_ptr = NULL; | ||
| 528 | if (library_name) { | ||
| 529 | len_ptr = addReplyDeferredLen(c); | ||
| 530 | } else { | ||
| 531 | /* If no pattern is asked we know the reply len and we can just set it */ | ||
| 532 | addReplyArrayLen(c, dictSize(curr_functions_lib_ctx->libraries)); | ||
| 533 | } | ||
| 534 | dictIterator iter; | ||
| 535 | dictEntry *entry = NULL; | ||
| 536 | dictInitIterator(&iter, curr_functions_lib_ctx->libraries); | ||
| 537 | while ((entry = dictNext(&iter))) { | ||
| 538 | functionLibInfo *li = dictGetVal(entry); | ||
| 539 | if (library_name) { | ||
| 540 | if (!stringmatchlen(library_name, sdslen(library_name), li->name, sdslen(li->name), 1)) { | ||
| 541 | continue; | ||
| 542 | } | ||
| 543 | } | ||
| 544 | ++reply_len; | ||
| 545 | addReplyMapLen(c, with_code? 4 : 3); | ||
| 546 | addReplyBulkCString(c, "library_name"); | ||
| 547 | addReplyBulkCBuffer(c, li->name, sdslen(li->name)); | ||
| 548 | addReplyBulkCString(c, "engine"); | ||
| 549 | addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name)); | ||
| 550 | |||
| 551 | addReplyBulkCString(c, "functions"); | ||
| 552 | addReplyArrayLen(c, dictSize(li->functions)); | ||
| 553 | dictIterator functions_iter; | ||
| 554 | dictEntry *function_entry = NULL; | ||
| 555 | dictInitIterator(&functions_iter, li->functions); | ||
| 556 | while ((function_entry = dictNext(&functions_iter))) { | ||
| 557 | functionInfo *fi = dictGetVal(function_entry); | ||
| 558 | addReplyMapLen(c, 3); | ||
| 559 | addReplyBulkCString(c, "name"); | ||
| 560 | addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); | ||
| 561 | addReplyBulkCString(c, "description"); | ||
| 562 | if (fi->desc) { | ||
| 563 | addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); | ||
| 564 | } else { | ||
| 565 | addReplyNull(c); | ||
| 566 | } | ||
| 567 | addReplyBulkCString(c, "flags"); | ||
| 568 | functionListReplyFlags(c, fi); | ||
| 569 | } | ||
| 570 | dictResetIterator(&functions_iter); | ||
| 571 | |||
| 572 | if (with_code) { | ||
| 573 | addReplyBulkCString(c, "library_code"); | ||
| 574 | addReplyBulkCBuffer(c, li->code, sdslen(li->code)); | ||
| 575 | } | ||
| 576 | } | ||
| 577 | dictResetIterator(&iter); | ||
| 578 | if (len_ptr) { | ||
| 579 | setDeferredArrayLen(c, len_ptr, reply_len); | ||
| 580 | } | ||
| 581 | } | ||
| 582 | |||
| 583 | /* | ||
| 584 | * FUNCTION DELETE <LIBRARY NAME> | ||
| 585 | */ | ||
| 586 | void functionDeleteCommand(client *c) { | ||
| 587 | robj *function_name = c->argv[2]; | ||
| 588 | functionLibInfo *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr); | ||
| 589 | if (!li) { | ||
| 590 | addReplyError(c, "Library not found"); | ||
| 591 | return; | ||
| 592 | } | ||
| 593 | |||
| 594 | libraryUnlink(curr_functions_lib_ctx, li); | ||
| 595 | engineLibraryFree(li); | ||
| 596 | /* Indicate that the command changed the data so it will be replicated and | ||
| 597 | * counted as a data change (for persistence configuration) */ | ||
| 598 | server.dirty++; | ||
| 599 | addReply(c, shared.ok); | ||
| 600 | } | ||
| 601 | |||
| 602 | /* FUNCTION KILL */ | ||
| 603 | void functionKillCommand(client *c) { | ||
| 604 | scriptKill(c, 0); | ||
| 605 | } | ||
| 606 | |||
| 607 | /* Try to extract command flags if we can, returns the modified flags. | ||
| 608 | * Note that it does not guarantee the command arguments are right. */ | ||
| 609 | uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) { | ||
| 610 | robj *function_name = c->argv[1]; | ||
| 611 | c->cur_script = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); | ||
| 612 | if (!c->cur_script) | ||
| 613 | return cmd_flags; | ||
| 614 | functionInfo *fi = dictGetVal(c->cur_script); | ||
| 615 | uint64_t script_flags = fi->f_flags; | ||
| 616 | return scriptFlagsToCmdFlags(cmd_flags, script_flags); | ||
| 617 | } | ||
| 618 | |||
| 619 | static void fcallCommandGeneric(client *c, int ro) { | ||
| 620 | /* Functions need to be fed to monitors before the commands they execute. */ | ||
| 621 | replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); | ||
| 622 | |||
| 623 | robj *function_name = c->argv[1]; | ||
| 624 | dictEntry *de = c->cur_script; | ||
| 625 | if (!de) | ||
| 626 | de = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); | ||
| 627 | if (!de) { | ||
| 628 | addReplyError(c, "Function not found"); | ||
| 629 | return; | ||
| 630 | } | ||
| 631 | functionInfo *fi = dictGetVal(de); | ||
| 632 | engine *engine = fi->li->ei->engine; | ||
| 633 | |||
| 634 | long long numkeys; | ||
| 635 | /* Get the number of arguments that are keys */ | ||
| 636 | if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { | ||
| 637 | addReplyError(c, "Bad number of keys provided"); | ||
| 638 | return; | ||
| 639 | } | ||
| 640 | if (numkeys > (c->argc - 3)) { | ||
| 641 | addReplyError(c, "Number of keys can't be greater than number of args"); | ||
| 642 | return; | ||
| 643 | } else if (numkeys < 0) { | ||
| 644 | addReplyError(c, "Number of keys can't be negative"); | ||
| 645 | return; | ||
| 646 | } | ||
| 647 | |||
| 648 | scriptRunCtx run_ctx; | ||
| 649 | |||
| 650 | if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) | ||
| 651 | return; | ||
| 652 | |||
| 653 | engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, | ||
| 654 | c->argv + 3 + numkeys, c->argc - 3 - numkeys); | ||
| 655 | scriptResetRun(&run_ctx); | ||
| 656 | } | ||
| 657 | |||
| 658 | /* | ||
| 659 | * FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> | ||
| 660 | */ | ||
| 661 | void fcallCommand(client *c) { | ||
| 662 | fcallCommandGeneric(c, 0); | ||
| 663 | } | ||
| 664 | |||
| 665 | /* | ||
| 666 | * FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> | ||
| 667 | */ | ||
| 668 | void fcallroCommand(client *c) { | ||
| 669 | fcallCommandGeneric(c, 1); | ||
| 670 | } | ||
| 671 | |||
| 672 | /* | ||
| 673 | * Returns a binary payload representing all the libraries. | ||
| 674 | * Can be loaded using FUNCTION RESTORE | ||
| 675 | * | ||
| 676 | * The payload structure is the same as on RDB. Each library | ||
| 677 | * is saved separately with the following information: | ||
| 678 | * * Library name | ||
| 679 | * * Engine name | ||
| 680 | * * Library code | ||
| 681 | * RDB_OPCODE_FUNCTION2 is saved before each library to present | ||
| 682 | * that the payload is a library. | ||
| 683 | * RDB version and crc64 is saved at the end of the payload. | ||
| 684 | * The RDB version is saved for backward compatibility. | ||
| 685 | * crc64 is saved so we can verify the payload content. | ||
| 686 | */ | ||
| 687 | void createFunctionDumpPayload(rio *payload) { | ||
| 688 | uint64_t crc; | ||
| 689 | unsigned char buf[2]; | ||
| 690 | |||
| 691 | rioInitWithBuffer(payload, sdsempty()); | ||
| 692 | |||
| 693 | rdbSaveFunctions(payload); | ||
| 694 | |||
| 695 | /* RDB version */ | ||
| 696 | buf[0] = RDB_VERSION & 0xff; | ||
| 697 | buf[1] = (RDB_VERSION >> 8) & 0xff; | ||
| 698 | payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr, buf, 2); | ||
| 699 | |||
| 700 | /* CRC64 */ | ||
| 701 | crc = crc64(0, (unsigned char*) payload->io.buffer.ptr, | ||
| 702 | sdslen(payload->io.buffer.ptr)); | ||
| 703 | memrev64ifbe(&crc); | ||
| 704 | payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr, &crc, 8); | ||
| 705 | } | ||
| 706 | |||
| 707 | /* | ||
| 708 | * FUNCTION DUMP | ||
| 709 | */ | ||
| 710 | void functionDumpCommand(client *c) { | ||
| 711 | rio payload; | ||
| 712 | createFunctionDumpPayload(&payload); | ||
| 713 | |||
| 714 | addReplyBulkSds(c, payload.io.buffer.ptr); | ||
| 715 | } | ||
| 716 | |||
| 717 | /* | ||
| 718 | * FUNCTION RESTORE <payload> [FLUSH|APPEND|REPLACE] | ||
| 719 | * | ||
| 720 | * Restore the libraries represented by the give payload. | ||
| 721 | * Restore policy to can be given to control how to handle existing libraries (default APPEND): | ||
| 722 | * * FLUSH: delete all existing libraries. | ||
| 723 | * * APPEND: appends the restored libraries to the existing libraries. On collision, abort. | ||
| 724 | * * REPLACE: appends the restored libraries to the existing libraries. | ||
| 725 | * On collision, replace the old libraries with the new libraries. | ||
| 726 | */ | ||
| 727 | void functionRestoreCommand(client *c) { | ||
| 728 | if (c->argc > 4) { | ||
| 729 | addReplySubcommandSyntaxError(c); | ||
| 730 | return; | ||
| 731 | } | ||
| 732 | |||
| 733 | restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */ | ||
| 734 | sds data = c->argv[2]->ptr; | ||
| 735 | size_t data_len = sdslen(data); | ||
| 736 | rio payload; | ||
| 737 | sds err = NULL; | ||
| 738 | |||
| 739 | if (c->argc == 4) { | ||
| 740 | const char *restore_policy_str = c->argv[3]->ptr; | ||
| 741 | if (!strcasecmp(restore_policy_str, "append")) { | ||
| 742 | restore_replicy = restorePolicy_Append; | ||
| 743 | } else if (!strcasecmp(restore_policy_str, "replace")) { | ||
| 744 | restore_replicy = restorePolicy_Replace; | ||
| 745 | } else if (!strcasecmp(restore_policy_str, "flush")) { | ||
| 746 | restore_replicy = restorePolicy_Flush; | ||
| 747 | } else { | ||
| 748 | addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE."); | ||
| 749 | return; | ||
| 750 | } | ||
| 751 | } | ||
| 752 | |||
| 753 | uint16_t rdbver; | ||
| 754 | if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) { | ||
| 755 | addReplyError(c, "DUMP payload version or checksum are wrong"); | ||
| 756 | return; | ||
| 757 | } | ||
| 758 | |||
| 759 | functionsLibCtx *functions_lib_ctx = functionsLibCtxCreate(); | ||
| 760 | rioInitWithBuffer(&payload, data); | ||
| 761 | |||
| 762 | /* Read until reaching last 10 bytes that should contain RDB version and checksum. */ | ||
| 763 | while (data_len - payload.io.buffer.pos > 10) { | ||
| 764 | int type; | ||
| 765 | if ((type = rdbLoadType(&payload)) == -1) { | ||
| 766 | err = sdsnew("can not read data type"); | ||
| 767 | goto load_error; | ||
| 768 | } | ||
| 769 | if (type == RDB_OPCODE_FUNCTION_PRE_GA) { | ||
| 770 | err = sdsnew("Pre-GA function format not supported"); | ||
| 771 | goto load_error; | ||
| 772 | } | ||
| 773 | if (type != RDB_OPCODE_FUNCTION2) { | ||
| 774 | err = sdsnew("given type is not a function"); | ||
| 775 | goto load_error; | ||
| 776 | } | ||
| 777 | if (rdbFunctionLoad(&payload, rdbver, functions_lib_ctx, RDBFLAGS_NONE, &err) != C_OK) { | ||
| 778 | if (!err) { | ||
| 779 | err = sdsnew("failed loading the given functions payload"); | ||
| 780 | } | ||
| 781 | goto load_error; | ||
| 782 | } | ||
| 783 | } | ||
| 784 | |||
| 785 | if (restore_replicy == restorePolicy_Flush) { | ||
| 786 | functionsLibCtxSwapWithCurrent(functions_lib_ctx); | ||
| 787 | functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */ | ||
| 788 | } else { | ||
| 789 | if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) != C_OK) { | ||
| 790 | goto load_error; | ||
| 791 | } | ||
| 792 | } | ||
| 793 | |||
| 794 | /* Indicate that the command changed the data so it will be replicated and | ||
| 795 | * counted as a data change (for persistence configuration) */ | ||
| 796 | server.dirty++; | ||
| 797 | |||
| 798 | load_error: | ||
| 799 | if (err) { | ||
| 800 | addReplyErrorSds(c, err); | ||
| 801 | } else { | ||
| 802 | addReply(c, shared.ok); | ||
| 803 | } | ||
| 804 | if (functions_lib_ctx) { | ||
| 805 | functionsLibCtxFree(functions_lib_ctx); | ||
| 806 | } | ||
| 807 | } | ||
| 808 | |||
| 809 | /* FUNCTION FLUSH [ASYNC | SYNC] */ | ||
| 810 | void functionFlushCommand(client *c) { | ||
| 811 | if (c->argc > 3) { | ||
| 812 | addReplySubcommandSyntaxError(c); | ||
| 813 | return; | ||
| 814 | } | ||
| 815 | int async = 0; | ||
| 816 | if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) { | ||
| 817 | async = 0; | ||
| 818 | } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) { | ||
| 819 | async = 1; | ||
| 820 | } else if (c->argc == 2) { | ||
| 821 | async = server.lazyfree_lazy_user_flush ? 1 : 0; | ||
| 822 | } else { | ||
| 823 | addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option"); | ||
| 824 | return; | ||
| 825 | } | ||
| 826 | |||
| 827 | functionsLibCtxClearCurrent(async); | ||
| 828 | |||
| 829 | /* Indicate that the command changed the data so it will be replicated and | ||
| 830 | * counted as a data change (for persistence configuration) */ | ||
| 831 | server.dirty++; | ||
| 832 | addReply(c,shared.ok); | ||
| 833 | } | ||
| 834 | |||
| 835 | /* FUNCTION HELP */ | ||
| 836 | void functionHelpCommand(client *c) { | ||
| 837 | const char *help[] = { | ||
| 838 | "LOAD [REPLACE] <FUNCTION CODE>", | ||
| 839 | " Create a new library with the given library name and code.", | ||
| 840 | "DELETE <LIBRARY NAME>", | ||
| 841 | " Delete the given library.", | ||
| 842 | "LIST [LIBRARYNAME PATTERN] [WITHCODE]", | ||
| 843 | " Return general information on all the libraries:", | ||
| 844 | " * Library name", | ||
| 845 | " * The engine used to run the Library", | ||
| 846 | " * Functions list", | ||
| 847 | " * Library code (if WITHCODE is given)", | ||
| 848 | " It also possible to get only function that matches a pattern using LIBRARYNAME argument.", | ||
| 849 | "STATS", | ||
| 850 | " Return information about the current function running:", | ||
| 851 | " * Function name", | ||
| 852 | " * Command used to run the function", | ||
| 853 | " * Duration in MS that the function is running", | ||
| 854 | " If no function is running, return nil", | ||
| 855 | " In addition, returns a list of available engines.", | ||
| 856 | "KILL", | ||
| 857 | " Kill the current running function.", | ||
| 858 | "FLUSH [ASYNC|SYNC]", | ||
| 859 | " Delete all the libraries.", | ||
| 860 | " When called without the optional mode argument, the behavior is determined by the", | ||
| 861 | " lazyfree-lazy-user-flush configuration directive. Valid modes are:", | ||
| 862 | " * ASYNC: Asynchronously flush the libraries.", | ||
| 863 | " * SYNC: Synchronously flush the libraries.", | ||
| 864 | "DUMP", | ||
| 865 | " Return a serialized payload representing the current libraries, can be restored using FUNCTION RESTORE command", | ||
| 866 | "RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]", | ||
| 867 | " Restore the libraries represented by the given payload, it is possible to give a restore policy to", | ||
| 868 | " control how to handle existing libraries (default APPEND):", | ||
| 869 | " * FLUSH: delete all existing libraries.", | ||
| 870 | " * APPEND: appends the restored libraries to the existing libraries. On collision, abort.", | ||
| 871 | " * REPLACE: appends the restored libraries to the existing libraries, On collision, replace the old", | ||
| 872 | " libraries with the new libraries (notice that even on this option there is a chance of failure", | ||
| 873 | " in case of functions name collision with another library).", | ||
| 874 | NULL }; | ||
| 875 | addReplyHelp(c, help); | ||
| 876 | } | ||
| 877 | |||
| 878 | /* Verify that the function name is of the format: [a-zA-Z0-9_][a-zA-Z0-9_]? */ | ||
| 879 | static int functionsVerifyName(sds name) { | ||
| 880 | if (sdslen(name) == 0) { | ||
| 881 | return C_ERR; | ||
| 882 | } | ||
| 883 | for (size_t i = 0 ; i < sdslen(name) ; ++i) { | ||
| 884 | char curr_char = name[i]; | ||
| 885 | if ((curr_char >= 'a' && curr_char <= 'z') || | ||
| 886 | (curr_char >= 'A' && curr_char <= 'Z') || | ||
| 887 | (curr_char >= '0' && curr_char <= '9') || | ||
| 888 | (curr_char == '_')) | ||
| 889 | { | ||
| 890 | continue; | ||
| 891 | } | ||
| 892 | return C_ERR; | ||
| 893 | } | ||
| 894 | return C_OK; | ||
| 895 | } | ||
| 896 | |||
| 897 | int functionExtractLibMetaData(sds payload, functionsLibMataData *md, sds *err) { | ||
| 898 | sds name = NULL; | ||
| 899 | sds engine = NULL; | ||
| 900 | if (strncmp(payload, "#!", 2) != 0) { | ||
| 901 | *err = sdsnew("Missing library metadata"); | ||
| 902 | return C_ERR; | ||
| 903 | } | ||
| 904 | char *shebang_end = strchr(payload, '\n'); | ||
| 905 | if (shebang_end == NULL) { | ||
| 906 | *err = sdsnew("Invalid library metadata"); | ||
| 907 | return C_ERR; | ||
| 908 | } | ||
| 909 | size_t shebang_len = shebang_end - payload; | ||
| 910 | sds shebang = sdsnewlen(payload, shebang_len); | ||
| 911 | int numparts; | ||
| 912 | sds *parts = sdssplitargs(shebang, &numparts); | ||
| 913 | sdsfree(shebang); | ||
| 914 | if (!parts || numparts == 0) { | ||
| 915 | *err = sdsnew("Invalid library metadata"); | ||
| 916 | sdsfreesplitres(parts, numparts); | ||
| 917 | return C_ERR; | ||
| 918 | } | ||
| 919 | engine = sdsdup(parts[0]); | ||
| 920 | sdsrange(engine, 2, -1); | ||
| 921 | for (int i = 1 ; i < numparts ; ++i) { | ||
| 922 | sds part = parts[i]; | ||
| 923 | if (strncasecmp(part, "name=", 5) == 0) { | ||
| 924 | if (name) { | ||
| 925 | *err = sdscatfmt(sdsempty(), "Invalid metadata value, name argument was given multiple times"); | ||
| 926 | goto error; | ||
| 927 | } | ||
| 928 | name = sdsdup(part); | ||
| 929 | sdsrange(name, 5, -1); | ||
| 930 | continue; | ||
| 931 | } | ||
| 932 | *err = sdscatfmt(sdsempty(), "Invalid metadata value given: %s", part); | ||
| 933 | goto error; | ||
| 934 | } | ||
| 935 | |||
| 936 | if (!name) { | ||
| 937 | *err = sdsnew("Library name was not given"); | ||
| 938 | goto error; | ||
| 939 | } | ||
| 940 | |||
| 941 | sdsfreesplitres(parts, numparts); | ||
| 942 | |||
| 943 | md->name = name; | ||
| 944 | md->code = sdsnewlen(shebang_end, sdslen(payload) - shebang_len); | ||
| 945 | md->engine = engine; | ||
| 946 | |||
| 947 | return C_OK; | ||
| 948 | |||
| 949 | error: | ||
| 950 | if (name) sdsfree(name); | ||
| 951 | if (engine) sdsfree(engine); | ||
| 952 | sdsfreesplitres(parts, numparts); | ||
| 953 | return C_ERR; | ||
| 954 | } | ||
| 955 | |||
| 956 | void functionFreeLibMetaData(functionsLibMataData *md) { | ||
| 957 | if (md->code) sdsfree(md->code); | ||
| 958 | if (md->name) sdsfree(md->name); | ||
| 959 | if (md->engine) sdsfree(md->engine); | ||
| 960 | } | ||
| 961 | |||
| 962 | /* Compile and save the given library, return the loaded library name on success | ||
| 963 | * and NULL on failure. In case on failure the err out param is set with relevant error message */ | ||
| 964 | sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx, size_t timeout) { | ||
| 965 | dictIterator iter; | ||
| 966 | dictEntry *entry = NULL; | ||
| 967 | functionLibInfo *new_li = NULL; | ||
| 968 | functionLibInfo *old_li = NULL; | ||
| 969 | functionsLibMataData md = {0}; | ||
| 970 | if (functionExtractLibMetaData(code, &md, err) != C_OK) { | ||
| 971 | return NULL; | ||
| 972 | } | ||
| 973 | |||
| 974 | if (functionsVerifyName(md.name)) { | ||
| 975 | *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long"); | ||
| 976 | goto error; | ||
| 977 | } | ||
| 978 | |||
| 979 | engineInfo *ei = dictFetchValue(engines, md.engine); | ||
| 980 | if (!ei) { | ||
| 981 | *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); | ||
| 982 | goto error; | ||
| 983 | } | ||
| 984 | engine *engine = ei->engine; | ||
| 985 | |||
| 986 | old_li = dictFetchValue(lib_ctx->libraries, md.name); | ||
| 987 | if (old_li && !replace) { | ||
| 988 | old_li = NULL; | ||
| 989 | *err = sdscatfmt(sdsempty(), "Library '%S' already exists", md.name); | ||
| 990 | goto error; | ||
| 991 | } | ||
| 992 | |||
| 993 | if (old_li) { | ||
| 994 | libraryUnlink(lib_ctx, old_li); | ||
| 995 | } | ||
| 996 | |||
| 997 | new_li = engineLibraryCreate(md.name, ei, code); | ||
| 998 | if (engine->create(engine->engine_ctx, new_li, md.code, timeout, err) != C_OK) { | ||
| 999 | goto error; | ||
| 1000 | } | ||
| 1001 | |||
| 1002 | if (dictSize(new_li->functions) == 0) { | ||
| 1003 | *err = sdsnew("No functions registered"); | ||
| 1004 | goto error; | ||
| 1005 | } | ||
| 1006 | |||
| 1007 | /* Verify no duplicate functions */ | ||
| 1008 | dictInitIterator(&iter, new_li->functions); | ||
| 1009 | while ((entry = dictNext(&iter))) { | ||
| 1010 | functionInfo *fi = dictGetVal(entry); | ||
| 1011 | if (dictFetchValue(lib_ctx->functions, fi->name)) { | ||
| 1012 | /* functions name collision, abort. */ | ||
| 1013 | *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); | ||
| 1014 | dictResetIterator(&iter); | ||
| 1015 | goto error; | ||
| 1016 | } | ||
| 1017 | } | ||
| 1018 | dictResetIterator(&iter); | ||
| 1019 | |||
| 1020 | libraryLink(lib_ctx, new_li); | ||
| 1021 | |||
| 1022 | if (old_li) { | ||
| 1023 | engineLibraryFree(old_li); | ||
| 1024 | } | ||
| 1025 | |||
| 1026 | sds loaded_lib_name = md.name; | ||
| 1027 | md.name = NULL; | ||
| 1028 | functionFreeLibMetaData(&md); | ||
| 1029 | |||
| 1030 | return loaded_lib_name; | ||
| 1031 | |||
| 1032 | error: | ||
| 1033 | if (new_li) engineLibraryFree(new_li); | ||
| 1034 | if (old_li) libraryLink(lib_ctx, old_li); | ||
| 1035 | functionFreeLibMetaData(&md); | ||
| 1036 | return NULL; | ||
| 1037 | } | ||
| 1038 | |||
| 1039 | /* | ||
| 1040 | * FUNCTION LOAD [REPLACE] <LIBRARY CODE> | ||
| 1041 | * REPLACE - optional, replace existing library | ||
| 1042 | * LIBRARY CODE - library code to pass to the engine | ||
| 1043 | */ | ||
| 1044 | void functionLoadCommand(client *c) { | ||
| 1045 | int replace = 0; | ||
| 1046 | int argc_pos = 2; | ||
| 1047 | while (argc_pos < c->argc - 1) { | ||
| 1048 | robj *next_arg = c->argv[argc_pos++]; | ||
| 1049 | if (!strcasecmp(next_arg->ptr, "replace")) { | ||
| 1050 | replace = 1; | ||
| 1051 | continue; | ||
| 1052 | } | ||
| 1053 | addReplyErrorFormat(c, "Unknown option given: %s", (char*)next_arg->ptr); | ||
| 1054 | return; | ||
| 1055 | } | ||
| 1056 | |||
| 1057 | if (argc_pos >= c->argc) { | ||
| 1058 | addReplyError(c, "Function code is missing"); | ||
| 1059 | return; | ||
| 1060 | } | ||
| 1061 | |||
| 1062 | robj *code = c->argv[argc_pos]; | ||
| 1063 | sds err = NULL; | ||
| 1064 | sds library_name = NULL; | ||
| 1065 | size_t timeout = LOAD_TIMEOUT_MS; | ||
| 1066 | if (mustObeyClient(c)) { | ||
| 1067 | timeout = 0; | ||
| 1068 | } | ||
| 1069 | if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx, timeout))) | ||
| 1070 | { | ||
| 1071 | addReplyErrorSds(c, err); | ||
| 1072 | return; | ||
| 1073 | } | ||
| 1074 | /* Indicate that the command changed the data so it will be replicated and | ||
| 1075 | * counted as a data change (for persistence configuration) */ | ||
| 1076 | server.dirty++; | ||
| 1077 | addReplyBulkSds(c, library_name); | ||
| 1078 | } | ||
| 1079 | |||
| 1080 | /* Return memory usage of all the engines combine */ | ||
| 1081 | unsigned long functionsMemoryVM(void) { | ||
| 1082 | dictIterator iter; | ||
| 1083 | dictEntry *entry = NULL; | ||
| 1084 | size_t engines_memory = 0; | ||
| 1085 | |||
| 1086 | dictInitIterator(&iter, engines); | ||
| 1087 | while ((entry = dictNext(&iter))) { | ||
| 1088 | engineInfo *ei = dictGetVal(entry); | ||
| 1089 | engine *engine = ei->engine; | ||
| 1090 | engines_memory += engine->get_used_memory(engine->engine_ctx); | ||
| 1091 | } | ||
| 1092 | dictResetIterator(&iter); | ||
| 1093 | |||
| 1094 | return engines_memory; | ||
| 1095 | } | ||
| 1096 | |||
| 1097 | /* Return memory overhead of all the engines combine */ | ||
| 1098 | unsigned long functionsMemoryEngine(void) { | ||
| 1099 | size_t memory_overhead = dictMemUsage(engines); | ||
| 1100 | memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); | ||
| 1101 | memory_overhead += sizeof(functionsLibCtx); | ||
| 1102 | memory_overhead += curr_functions_lib_ctx->cache_memory; | ||
| 1103 | memory_overhead += engine_cache_memory; | ||
| 1104 | |||
| 1105 | return memory_overhead; | ||
| 1106 | } | ||
| 1107 | |||
| 1108 | /* Returns the number of functions */ | ||
| 1109 | unsigned long functionsNum(void) { | ||
| 1110 | return dictSize(curr_functions_lib_ctx->functions); | ||
| 1111 | } | ||
| 1112 | |||
| 1113 | unsigned long functionsLibNum(void) { | ||
| 1114 | return dictSize(curr_functions_lib_ctx->libraries); | ||
| 1115 | } | ||
| 1116 | |||
| 1117 | dict* functionsLibGet(void) { | ||
| 1118 | return curr_functions_lib_ctx->libraries; | ||
| 1119 | } | ||
| 1120 | |||
| 1121 | size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx) { | ||
| 1122 | return dictSize(functions_ctx->functions); | ||
| 1123 | } | ||
| 1124 | |||
| 1125 | /* Initialize engine data structures. | ||
| 1126 | * Should be called once on server initialization */ | ||
| 1127 | int functionsInit(void) { | ||
| 1128 | engines = dictCreate(&engineDictType); | ||
| 1129 | |||
| 1130 | if (luaEngineInitEngine() != C_OK) { | ||
| 1131 | return C_ERR; | ||
| 1132 | } | ||
| 1133 | |||
| 1134 | /* Must be initialized after engines initialization */ | ||
| 1135 | curr_functions_lib_ctx = functionsLibCtxCreate(); | ||
| 1136 | |||
| 1137 | return C_OK; | ||
| 1138 | } | ||
