aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/script.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/script.c')
-rw-r--r--examples/redis-unstable/src/script.c699
1 files changed, 0 insertions, 699 deletions
diff --git a/examples/redis-unstable/src/script.c b/examples/redis-unstable/src/script.c
deleted file mode 100644
index f222b89..0000000
--- a/examples/redis-unstable/src/script.c
+++ /dev/null
@@ -1,699 +0,0 @@
1/*
2 * Copyright (c) 2009-Present, Redis Ltd.
3 * All rights reserved.
4 *
5 * Copyright (c) 2024-present, Valkey contributors.
6 * All rights reserved.
7 *
8 * Licensed under your choice of (a) the Redis Source Available License 2.0
9 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
10 * GNU Affero General Public License v3 (AGPLv3).
11 *
12 * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
13 */
14
15#include "server.h"
16#include "script.h"
17#include "cluster.h"
18#include "cluster_slot_stats.h"
19
20#include <lua.h>
21#include <lauxlib.h>
22
23scriptFlag scripts_flags_def[] = {
24 {.flag = SCRIPT_FLAG_NO_WRITES, .str = "no-writes"},
25 {.flag = SCRIPT_FLAG_ALLOW_OOM, .str = "allow-oom"},
26 {.flag = SCRIPT_FLAG_ALLOW_STALE, .str = "allow-stale"},
27 {.flag = SCRIPT_FLAG_NO_CLUSTER, .str = "no-cluster"},
28 {.flag = SCRIPT_FLAG_ALLOW_CROSS_SLOT, .str = "allow-cross-slot-keys"},
29 {.flag = 0, .str = NULL}, /* flags array end */
30};
31
32/* On script invocation, holding the current run context */
33static scriptRunCtx *curr_run_ctx = NULL;
34
35static void exitScriptTimedoutMode(scriptRunCtx *run_ctx) {
36 serverAssert(run_ctx == curr_run_ctx);
37 serverAssert(scriptIsTimedout());
38 run_ctx->flags &= ~SCRIPT_TIMEDOUT;
39 blockingOperationEnds();
40 /* if we are a replica and we have an active master, set it for continue processing */
41 if (server.masterhost && server.master) {
42 /* Master running in IO thread needs to be sent to main thread so that
43 * it can process any pending commands ASAP without waiting for the next
44 * read.
45 * We don't queue the client for reprocessing in this case as it will
46 * create contention with main thread when it deals with unblocked
47 * clients - see comment above queueClientForReprocessing. */
48 if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) {
49 pauseIOThread(server.master->tid);
50 enqueuePendingClientsToMainThread(server.master, 0);
51 resumeIOThread(server.master->tid);
52 return;
53 }
54
55 queueClientForReprocessing(server.master);
56 }
57}
58
59static void enterScriptTimedoutMode(scriptRunCtx *run_ctx) {
60 serverAssert(run_ctx == curr_run_ctx);
61 serverAssert(!scriptIsTimedout());
62 /* Mark script as timedout */
63 run_ctx->flags |= SCRIPT_TIMEDOUT;
64 blockingOperationStarts();
65}
66
67#if defined(USE_JEMALLOC)
68/* When lua uses jemalloc, pass in luaAlloc as a parameter of lua_newstate. */
69static void *luaAlloc(void *ud, void *ptr, size_t osize, size_t nsize) {
70 UNUSED(osize);
71
72 unsigned int tcache = (unsigned int)(uintptr_t)ud;
73 if (nsize == 0) {
74 zfree_with_flags(ptr, MALLOCX_ARENA(server.lua_arena) | MALLOCX_TCACHE(tcache));
75 return NULL;
76 } else {
77 return zrealloc_with_flags(ptr, nsize, MALLOCX_ARENA(server.lua_arena) | MALLOCX_TCACHE(tcache));
78 }
79}
80
81/* Create a lua interpreter, and use jemalloc as lua memory allocator. */
82lua_State *createLuaState(void) {
83 /* Every time a lua VM is created, a new private tcache is created for use.
84 * This private tcache will be destroyed after the lua VM is closed. */
85 unsigned int tcache;
86 size_t sz = sizeof(unsigned int);
87 int err = je_mallctl("tcache.create", (void *)&tcache, &sz, NULL, 0);
88 if (err) {
89 serverLog(LL_WARNING, "Failed creating the lua jemalloc tcache (err=%d).", err);
90 exit(1);
91 }
92
93 /* We pass tcache as ud so that it is not bound to the server. */
94 return lua_newstate(luaAlloc, (void *)(uintptr_t)tcache);
95}
96
97/* Under jemalloc we need to create a new arena for lua to avoid blocking
98 * defragger. */
99void luaEnvInit(void) {
100 unsigned int arena;
101 size_t sz = sizeof(unsigned int);
102 int err = je_mallctl("arenas.create", (void *)&arena, &sz, NULL, 0);
103 if (err) {
104 serverLog(LL_WARNING, "Failed creating the lua jemalloc arena (err=%d).", err);
105 exit(1);
106 }
107 server.lua_arena = arena;
108}
109
110#else
111
112/* Create a lua interpreter and use glibc (default) as lua memory allocator. */
113lua_State *createLuaState(void) {
114 return lua_open();
115}
116
117/* There is nothing to set up under glib. */
118void luaEnvInit(void) {
119 server.lua_arena = UINT_MAX;
120}
121
122#endif
123
124int scriptIsTimedout(void) {
125 return scriptIsRunning() && (curr_run_ctx->flags & SCRIPT_TIMEDOUT);
126}
127
128client* scriptGetClient(void) {
129 serverAssert(scriptIsRunning());
130 return curr_run_ctx->c;
131}
132
133client* scriptGetCaller(void) {
134 serverAssert(scriptIsRunning());
135 return curr_run_ctx->original_client;
136}
137
138/* interrupt function for scripts, should be call
139 * from time to time to reply some special command (like ping)
140 * and also check if the run should be terminated. */
141int scriptInterrupt(scriptRunCtx *run_ctx) {
142 if (run_ctx->flags & SCRIPT_TIMEDOUT) {
143 /* script already timedout
144 we just need to precess some events and return */
145 processEventsWhileBlocked();
146 return (run_ctx->flags & SCRIPT_KILLED) ? SCRIPT_KILL : SCRIPT_CONTINUE;
147 }
148
149 long long elapsed = elapsedMs(run_ctx->start_time);
150 if (elapsed < server.busy_reply_threshold) {
151 return SCRIPT_CONTINUE;
152 }
153
154 serverLog(LL_WARNING,
155 "Slow script detected: still in execution after %lld milliseconds. "
156 "You can try killing the script using the %s command. Script name is: %s.",
157 elapsed, (run_ctx->flags & SCRIPT_EVAL_MODE) ? "SCRIPT KILL" : "FUNCTION KILL", run_ctx->funcname);
158
159 enterScriptTimedoutMode(run_ctx);
160 /* Once the script timeouts we reenter the event loop to permit others
161 * some commands execution. For this reason
162 * we need to mask the client executing the script from the event loop.
163 * If we don't do that the client may disconnect and could no longer be
164 * here when the EVAL command will return. */
165 protectClient(run_ctx->original_client);
166
167 processEventsWhileBlocked();
168
169 return (run_ctx->flags & SCRIPT_KILLED) ? SCRIPT_KILL : SCRIPT_CONTINUE;
170}
171
172uint64_t scriptFlagsToCmdFlags(uint64_t cmd_flags, uint64_t script_flags) {
173 /* If the script declared flags, clear the ones from the command and use the ones it declared.*/
174 cmd_flags &= ~(CMD_STALE | CMD_DENYOOM | CMD_WRITE);
175
176 /* NO_WRITES implies ALLOW_OOM */
177 if (!(script_flags & (SCRIPT_FLAG_ALLOW_OOM | SCRIPT_FLAG_NO_WRITES)))
178 cmd_flags |= CMD_DENYOOM;
179 if (!(script_flags & SCRIPT_FLAG_NO_WRITES))
180 cmd_flags |= CMD_WRITE;
181 if (script_flags & SCRIPT_FLAG_ALLOW_STALE)
182 cmd_flags |= CMD_STALE;
183
184 /* In addition the MAY_REPLICATE flag is set for these commands, but
185 * if we have flags we know if it's gonna do any writes or not. */
186 cmd_flags &= ~CMD_MAY_REPLICATE;
187
188 return cmd_flags;
189}
190
191/* Prepare the given run ctx for execution */
192int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *caller, const char *funcname, uint64_t script_flags, int ro) {
193 serverAssert(!curr_run_ctx);
194 int client_allow_oom = !!(caller->flags & CLIENT_ALLOW_OOM);
195
196 int running_stale = server.masterhost &&
197 server.repl_state != REPL_STATE_CONNECTED &&
198 server.repl_serve_stale_data == 0;
199 int obey_client = mustObeyClient(caller);
200
201 if (!(script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE)) {
202 if ((script_flags & SCRIPT_FLAG_NO_CLUSTER) && server.cluster_enabled) {
203 addReplyError(caller, "Can not run script on cluster, 'no-cluster' flag is set.");
204 return C_ERR;
205 }
206
207 /* Can't run script with 'non-cluster' flag as above when cluster is enabled. */
208 if (script_flags & SCRIPT_FLAG_NO_CLUSTER) {
209 server.stat_cluster_incompatible_ops++;
210 }
211
212 if (running_stale && !(script_flags & SCRIPT_FLAG_ALLOW_STALE)) {
213 addReplyError(caller, "-MASTERDOWN Link with MASTER is down, "
214 "replica-serve-stale-data is set to 'no' "
215 "and 'allow-stale' flag is not set on the script.");
216 return C_ERR;
217 }
218
219 if (!(script_flags & SCRIPT_FLAG_NO_WRITES)) {
220 /* Script may perform writes we need to verify:
221 * 1. we are not a readonly replica
222 * 2. no disk error detected
223 * 3. command is not `fcall_ro`/`eval[sha]_ro` */
224 if (server.masterhost && server.repl_slave_ro && !obey_client) {
225 addReplyError(caller, "-READONLY Can not run script with write flag on readonly replica");
226 return C_ERR;
227 }
228
229 /* Deny writes if we're unable to persist. */
230 int deny_write_type = writeCommandsDeniedByDiskError();
231 if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
232 if (deny_write_type == DISK_ERROR_TYPE_RDB)
233 addReplyError(caller, "-MISCONF Redis is configured to save RDB snapshots, "
234 "but it's currently unable to persist to disk. "
235 "Writable scripts are blocked. Use 'no-writes' flag for read only scripts.");
236 else
237 addReplyErrorFormat(caller, "-MISCONF Redis is configured to persist data to AOF, "
238 "but it's currently unable to persist to disk. "
239 "Writable scripts are blocked. Use 'no-writes' flag for read only scripts. "
240 "AOF error: %s", strerror(server.aof_last_write_errno));
241 return C_ERR;
242 }
243
244 if (ro) {
245 addReplyError(caller, "Can not execute a script with write flag using *_ro command.");
246 return C_ERR;
247 }
248
249 /* Don't accept write commands if there are not enough good slaves and
250 * user configured the min-slaves-to-write option. */
251 if (!checkGoodReplicasStatus()) {
252 addReplyErrorObject(caller, shared.noreplicaserr);
253 return C_ERR;
254 }
255 }
256
257 /* Check OOM state. the no-writes flag imply allow-oom. we tested it
258 * after the no-write error, so no need to mention it in the error reply. */
259 if (!client_allow_oom && server.pre_command_oom_state && server.maxmemory &&
260 !(script_flags & (SCRIPT_FLAG_ALLOW_OOM|SCRIPT_FLAG_NO_WRITES)))
261 {
262 addReplyError(caller, "-OOM allow-oom flag is not set on the script, "
263 "can not run it when used memory > 'maxmemory'");
264 return C_ERR;
265 }
266
267 } else {
268 /* Special handling for backwards compatibility (no shebang eval[sha]) mode */
269 if (running_stale) {
270 addReplyErrorObject(caller, shared.masterdownerr);
271 return C_ERR;
272 }
273 }
274
275 run_ctx->c = engine_client;
276 run_ctx->original_client = caller;
277 run_ctx->funcname = funcname;
278 run_ctx->slot = caller->slot;
279 run_ctx->cluster_compatibility_check_slot = caller->cluster_compatibility_check_slot;
280
281 client *script_client = run_ctx->c;
282 client *curr_client = run_ctx->original_client;
283
284 /* Select the right DB in the context of the Lua client */
285 selectDb(script_client, curr_client->db->id);
286 script_client->resp = 2; /* Default is RESP2, scripts can change it. */
287
288 /* If we are in MULTI context, flag Lua client as CLIENT_MULTI. */
289 if (curr_client->flags & CLIENT_MULTI) {
290 script_client->flags |= CLIENT_MULTI;
291 }
292
293 run_ctx->start_time = getMonotonicUs();
294
295 run_ctx->flags = 0;
296 run_ctx->repl_flags = PROPAGATE_AOF | PROPAGATE_REPL;
297
298 if (ro || (!(script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE) && (script_flags & SCRIPT_FLAG_NO_WRITES))) {
299 /* On fcall_ro or on functions that do not have the 'write'
300 * flag, we will not allow write commands. */
301 run_ctx->flags |= SCRIPT_READ_ONLY;
302 }
303 if (client_allow_oom || (!(script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE) && (script_flags & SCRIPT_FLAG_ALLOW_OOM))) {
304 /* Note: we don't need to test the no-writes flag here and set this run_ctx flag,
305 * since only write commands can are deny-oom. */
306 run_ctx->flags |= SCRIPT_ALLOW_OOM;
307 }
308
309 if ((script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE) || (script_flags & SCRIPT_FLAG_ALLOW_CROSS_SLOT)) {
310 run_ctx->flags |= SCRIPT_ALLOW_CROSS_SLOT;
311 }
312
313 /* set the curr_run_ctx so we can use it to kill the script if needed */
314 curr_run_ctx = run_ctx;
315
316 return C_OK;
317}
318
319/* Reset the given run ctx after execution */
320void scriptResetRun(scriptRunCtx *run_ctx) {
321 serverAssert(curr_run_ctx);
322
323 /* After the script done, remove the MULTI state. */
324 run_ctx->c->flags &= ~CLIENT_MULTI;
325
326 if (scriptIsTimedout()) {
327 exitScriptTimedoutMode(run_ctx);
328 /* Restore the client that was protected when the script timeout
329 * was detected. */
330 unprotectClient(run_ctx->original_client);
331 }
332
333 run_ctx->slot = -1;
334 run_ctx->cluster_compatibility_check_slot = -2;
335
336 preventCommandPropagation(run_ctx->original_client);
337
338 /* unset curr_run_ctx so we will know there is no running script */
339 curr_run_ctx = NULL;
340}
341
342/* return true if a script is currently running */
343int scriptIsRunning(void) {
344 return curr_run_ctx != NULL;
345}
346
347const char* scriptCurrFunction(void) {
348 serverAssert(scriptIsRunning());
349 return curr_run_ctx->funcname;
350}
351
352int scriptIsEval(void) {
353 serverAssert(scriptIsRunning());
354 return curr_run_ctx->flags & SCRIPT_EVAL_MODE;
355}
356
357/* Kill the current running script */
358void scriptKill(client *c, int is_eval) {
359 if (!curr_run_ctx) {
360 addReplyError(c, "-NOTBUSY No scripts in execution right now.");
361 return;
362 }
363 if (mustObeyClient(curr_run_ctx->original_client)) {
364 addReplyError(c,
365 "-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed.");
366 return;
367 }
368 if (curr_run_ctx->flags & SCRIPT_WRITE_DIRTY) {
369 addReplyError(c,
370 "-UNKILLABLE Sorry the script already executed write "
371 "commands against the dataset. You can either wait the "
372 "script termination or kill the server in a hard way "
373 "using the SHUTDOWN NOSAVE command.");
374 return;
375 }
376 if (is_eval && !(curr_run_ctx->flags & SCRIPT_EVAL_MODE)) {
377 /* Kill a function with 'SCRIPT KILL' is not allow */
378 addReplyErrorObject(c, shared.slowscripterr);
379 return;
380 }
381 if (!is_eval && (curr_run_ctx->flags & SCRIPT_EVAL_MODE)) {
382 /* Kill an eval with 'FUNCTION KILL' is not allow */
383 addReplyErrorObject(c, shared.slowevalerr);
384 return;
385 }
386 curr_run_ctx->flags |= SCRIPT_KILLED;
387 addReply(c, shared.ok);
388}
389
390static int scriptVerifyCommandArity(struct redisCommand *cmd, int argc, sds *err) {
391 if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity))) {
392 if (cmd)
393 *err = sdsnew("Wrong number of args calling Redis command from script");
394 else
395 *err = sdsnew("Unknown Redis command called from script");
396 return C_ERR;
397 }
398 return C_OK;
399}
400
401static int scriptVerifyACL(client *c, sds *err) {
402 /* Check the ACLs. */
403 int acl_errpos;
404 int acl_retval = ACLCheckAllPerm(c, &acl_errpos);
405 if (acl_retval != ACL_OK) {
406 addACLLogEntry(c,acl_retval,ACL_LOG_CTX_LUA,acl_errpos,NULL,NULL);
407 sds msg = getAclErrorMessage(acl_retval, c->user, c->cmd, c->argv[acl_errpos]->ptr, 0);
408 *err = sdscatsds(sdsnew("ACL failure in script: "), msg);
409 sdsfree(msg);
410 return C_ERR;
411 }
412 return C_OK;
413}
414
415static int scriptVerifyWriteCommandAllow(scriptRunCtx *run_ctx, char **err) {
416
417 /* A write command, on an RO command or an RO script is rejected ASAP.
418 * Note: For scripts, we consider may-replicate commands as write commands.
419 * This also makes it possible to allow read-only scripts to be run during
420 * CLIENT PAUSE WRITE. */
421 if (run_ctx->flags & SCRIPT_READ_ONLY &&
422 (run_ctx->c->cmd->flags & (CMD_WRITE|CMD_MAY_REPLICATE)))
423 {
424 *err = sdsnew("Write commands are not allowed from read-only scripts.");
425 return C_ERR;
426 }
427
428 /* The other checks below are on the server state and are only relevant for
429 * write commands, return if this is not a write command. */
430 if (!(run_ctx->c->cmd->flags & CMD_WRITE))
431 return C_OK;
432
433 /* If the script already made a modification to the dataset, we can't
434 * fail it on unpredictable error state. */
435 if ((run_ctx->flags & SCRIPT_WRITE_DIRTY))
436 return C_OK;
437
438 /* Write commands are forbidden against read-only slaves, or if a
439 * command marked as non-deterministic was already called in the context
440 * of this script. */
441 int deny_write_type = writeCommandsDeniedByDiskError();
442
443 if (server.masterhost && server.repl_slave_ro &&
444 !mustObeyClient(run_ctx->original_client))
445 {
446 *err = sdsdup(shared.roslaveerr->ptr);
447 return C_ERR;
448 }
449
450 if (deny_write_type != DISK_ERROR_TYPE_NONE) {
451 *err = writeCommandsGetDiskErrorMessage(deny_write_type);
452 return C_ERR;
453 }
454
455 /* Don't accept write commands if there are not enough good slaves and
456 * user configured the min-slaves-to-write option. Note this only reachable
457 * for Eval scripts that didn't declare flags, see the other check in
458 * scriptPrepareForRun */
459 if (!checkGoodReplicasStatus()) {
460 *err = sdsdup(shared.noreplicaserr->ptr);
461 return C_ERR;
462 }
463
464 return C_OK;
465}
466
467static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) {
468 if (run_ctx->flags & SCRIPT_ALLOW_OOM) {
469 /* Allow running any command even if OOM reached */
470 return C_OK;
471 }
472
473 /* If we reached the memory limit configured via maxmemory, commands that
474 * could enlarge the memory usage are not allowed, but only if this is the
475 * first write in the context of this script, otherwise we can't stop
476 * in the middle. */
477
478 if (server.maxmemory && /* Maxmemory is actually enabled. */
479 !mustObeyClient(run_ctx->original_client) && /* Don't care about mem for replicas or AOF. */
480 !(run_ctx->flags & SCRIPT_WRITE_DIRTY) && /* Script had no side effects so far. */
481 server.pre_command_oom_state && /* Detected OOM when script start. */
482 (run_ctx->c->cmd->flags & CMD_DENYOOM))
483 {
484 *err = sdsdup(shared.oomerr->ptr);
485 return C_ERR;
486 }
487
488 return C_OK;
489}
490
491static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *original_c, sds *err) {
492 if (!server.cluster_enabled || mustObeyClient(original_c)) {
493 return C_OK;
494 }
495 /* If this is a Redis Cluster node, we need to make sure the script is not
496 * trying to access non-local keys, with the exception of commands
497 * received from our master or when loading the AOF back in memory. */
498 int error_code;
499 /* Duplicate relevant flags in the script client. */
500 c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
501 c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
502 const uint64_t cmd_flags = getCommandFlags(c);
503 int hashslot = -1;
504 if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, NULL, 0, cmd_flags, &error_code) != getMyClusterNode()) {
505 if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
506 *err = sdsnew(
507 "Script attempted to execute a write command while the "
508 "cluster is down and readonly");
509 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
510 *err = sdsnew("Script attempted to execute a command while the "
511 "cluster is down");
512 } else if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
513 *err = sdscatfmt(sdsempty(),
514 "Command '%S' in script attempted to access keys that don't hash to the same slot",
515 c->cmd->fullname);
516 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
517 /* The request spawns multiple keys in the same slot,
518 * but the slot is not "stable" currently as there is
519 * a migration or import in progress. */
520 *err = sdscatfmt(sdsempty(),
521 "Unable to execute command '%S' in script "
522 "because undeclared keys were accessed during rehashing of the slot",
523 c->cmd->fullname);
524 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
525 *err = sdsnew("Script attempted to access a slot not served");
526 } else {
527 /* error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK */
528 *err = sdsnew("Script attempted to access a non local key in a "
529 "cluster node");
530 }
531 return C_ERR;
532 }
533
534 /* If the script declared keys in advanced, the cross slot error would have
535 * already been thrown. This is only checking for cross slot keys being accessed
536 * that weren't pre-declared. */
537 if (hashslot != -1 && !(run_ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) {
538 if (run_ctx->slot == -1) {
539 run_ctx->slot = hashslot;
540 } else if (run_ctx->slot != hashslot) {
541 *err = sdsnew("Script attempted to access keys that do not hash to "
542 "the same slot");
543 return C_ERR;
544 }
545 }
546
547 c->slot = hashslot;
548 original_c->slot = hashslot;
549
550 return C_OK;
551}
552
553static void scriptCheckClusterCompatibility(scriptRunCtx *run_ctx, client *c) {
554 int hashslot = -1;
555
556 /* If we don't need to detect for this script or slot violation already
557 * detected and reported for this script, exit */
558 if (run_ctx->cluster_compatibility_check_slot == -2) return;
559
560 if (!areCommandKeysInSameSlot(c, &hashslot)) {
561 server.stat_cluster_incompatible_ops++;
562 /* Already found cross slot usage, skip the check for the rest of the script */
563 run_ctx->cluster_compatibility_check_slot = -2;
564 } else {
565 /* Check whether the declared keys and the accessed keys belong to the same slot.
566 * If having SCRIPT_ALLOW_CROSS_SLOT flag, skip this check since it's allowed
567 * in cluster mode, but it may fail when the slot doesn't belong to the node. */
568 if (hashslot != -1 && !(run_ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) {
569 if (run_ctx->cluster_compatibility_check_slot == -1) {
570 run_ctx->cluster_compatibility_check_slot = hashslot;
571 } else if (run_ctx->cluster_compatibility_check_slot != hashslot) {
572 server.stat_cluster_incompatible_ops++;
573 /* Already found cross slot usage, skip the check for the rest of the script */
574 run_ctx->cluster_compatibility_check_slot = -2;
575 }
576 }
577 }
578}
579
580/* set RESP for a given run_ctx */
581int scriptSetResp(scriptRunCtx *run_ctx, int resp) {
582 if (resp != 2 && resp != 3) {
583 return C_ERR;
584 }
585
586 run_ctx->c->resp = resp;
587 return C_OK;
588}
589
590/* set Repl for a given run_ctx
591 * either: PROPAGATE_AOF | PROPAGATE_REPL*/
592int scriptSetRepl(scriptRunCtx *run_ctx, int repl) {
593 if ((repl & ~(PROPAGATE_AOF | PROPAGATE_REPL)) != 0) {
594 return C_ERR;
595 }
596 run_ctx->repl_flags = repl;
597 return C_OK;
598}
599
600static int scriptVerifyAllowStale(client *c, sds *err) {
601 if (!server.masterhost) {
602 /* Not a replica, stale is irrelevant */
603 return C_OK;
604 }
605
606 if (server.repl_state == REPL_STATE_CONNECTED) {
607 /* Connected to replica, stale is irrelevant */
608 return C_OK;
609 }
610
611 if (server.repl_serve_stale_data == 1) {
612 /* Disconnected from replica but allow to serve data */
613 return C_OK;
614 }
615
616 if (c->cmd->flags & CMD_STALE) {
617 /* Command is allow while stale */
618 return C_OK;
619 }
620
621 /* On stale replica, can not run the command */
622 *err = sdsnew("Can not execute the command on a stale replica");
623 return C_ERR;
624}
625
626/* Call a Redis command.
627 * The reply is written to the run_ctx client and it is
628 * up to the engine to take and parse.
629 * The err out variable is set only if error occurs and describe the error.
630 * If err is set on reply is written to the run_ctx client. */
631void scriptCall(scriptRunCtx *run_ctx, sds *err) {
632 client *c = run_ctx->c;
633
634 /* Setup our fake client for command execution */
635 c->user = run_ctx->original_client->user;
636
637 /* Process module hooks */
638 moduleCallCommandFilters(c);
639
640 struct redisCommand *cmd = lookupCommand(c->argv, c->argc);
641 c->cmd = c->lastcmd = c->realcmd = cmd;
642 if (scriptVerifyCommandArity(cmd, c->argc, err) != C_OK) {
643 goto error;
644 }
645
646 /* There are commands that are not allowed inside scripts. */
647 if (!server.script_disable_deny_script && (cmd->flags & CMD_NOSCRIPT)) {
648 *err = sdsnew("This Redis command is not allowed from script");
649 goto error;
650 }
651
652 if (scriptVerifyAllowStale(c, err) != C_OK) {
653 goto error;
654 }
655
656 if (scriptVerifyACL(c, err) != C_OK) {
657 goto error;
658 }
659
660 if (scriptVerifyWriteCommandAllow(run_ctx, err) != C_OK) {
661 goto error;
662 }
663
664 if (scriptVerifyOOM(run_ctx, err) != C_OK) {
665 goto error;
666 }
667
668 if (cmd->flags & CMD_WRITE) {
669 /* signify that we already change the data in this execution */
670 run_ctx->flags |= SCRIPT_WRITE_DIRTY;
671 }
672
673 if (scriptVerifyClusterState(run_ctx, c, run_ctx->original_client, err) != C_OK) {
674 goto error;
675 }
676
677 scriptCheckClusterCompatibility(run_ctx, c);
678
679 int call_flags = CMD_CALL_NONE;
680 if (run_ctx->repl_flags & PROPAGATE_AOF) {
681 call_flags |= CMD_CALL_PROPAGATE_AOF;
682 }
683 if (run_ctx->repl_flags & PROPAGATE_REPL) {
684 call_flags |= CMD_CALL_PROPAGATE_REPL;
685 }
686 call(c, call_flags);
687 serverAssert((c->flags & CLIENT_BLOCKED) == 0);
688 clusterSlotStatsInvalidateSlotIfApplicable(run_ctx);
689 return;
690
691error:
692 afterErrorReply(c, *err, sdslen(*err), 0);
693 incrCommandStatsOnError(cmd, ERROR_COMMAND_REJECTED);
694}
695
696long long scriptRunDuration(void) {
697 serverAssert(scriptIsRunning());
698 return elapsedMs(curr_run_ctx->start_time);
699}