aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/multi.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/multi.c')
-rw-r--r--examples/redis-unstable/src/multi.c509
1 files changed, 0 insertions, 509 deletions
diff --git a/examples/redis-unstable/src/multi.c b/examples/redis-unstable/src/multi.c
deleted file mode 100644
index cd8783d..0000000
--- a/examples/redis-unstable/src/multi.c
+++ /dev/null
@@ -1,509 +0,0 @@
1/*
2 * Copyright (c) 2009-Present, Redis Ltd.
3 * All rights reserved.
4 *
5 * Licensed under your choice of (a) the Redis Source Available License 2.0
6 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
7 * GNU Affero General Public License v3 (AGPLv3).
8 */
9
10#include "server.h"
11#include "cluster.h"
12
13/* ================================ MULTI/EXEC ============================== */
14
15/* Client state initialization for MULTI/EXEC */
16void initClientMultiState(client *c) {
17 c->mstate.commands = NULL;
18 c->mstate.count = 0;
19 c->mstate.cmd_flags = 0;
20 c->mstate.cmd_inv_flags = 0;
21 c->mstate.argv_len_sums = 0;
22 c->mstate.alloc_count = 0;
23 c->mstate.executing_cmd = -1;
24}
25
26/* Release all the resources associated with MULTI/EXEC state */
27void freeClientMultiState(client *c) {
28 for (int i = 0; i < c->mstate.count; i++) {
29 freePendingCommand(c, c->mstate.commands[i]);
30 }
31 zfree(c->mstate.commands);
32}
33
34/* Add a new command into the MULTI commands queue */
35void queueMultiCommand(client *c, uint64_t cmd_flags) {
36 /* No sense to waste memory if the transaction is already aborted.
37 * this is useful in case client sends these in a pipeline, or doesn't
38 * bother to read previous responses and didn't notice the multi was already
39 * aborted. */
40 if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
41 return;
42 if (c->mstate.count == 0) {
43 /* If a client is using multi/exec, assuming it is used to execute at least
44 * two commands. Hence, creating by default size of 2. */
45 c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2);
46 c->mstate.alloc_count = 2;
47 }
48 if (c->mstate.count == c->mstate.alloc_count) {
49 c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
50 c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count));
51 }
52
53 /* Move the pending command into the multi-state.
54 * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up
55 * later, but set the value to NULL to indicate it has been moved out and should not be freed. */
56 pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds);
57 c->current_pending_cmd = NULL;
58 pendingCommand **mc = c->mstate.commands + c->mstate.count;
59 *mc = pcmd;
60
61 c->mstate.count++;
62 c->mstate.cmd_flags |= cmd_flags;
63 c->mstate.cmd_inv_flags |= ~cmd_flags;
64 c->mstate.argv_len_sums += (*mc)->argv_len_sum;
65 c->all_argv_len_sum -= (*mc)->argv_len_sum;
66
67 (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */
68 /* to subtract it from there later. */
69
70 /* Reset the client's args since we moved them into the mstate and shouldn't
71 * reference them from 'c' anymore. */
72 c->argv = NULL;
73 c->argc = 0;
74 c->argv_len = 0;
75}
76
77void discardTransaction(client *c) {
78 freeClientMultiState(c);
79 initClientMultiState(c);
80 c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
81 unwatchAllKeys(c);
82}
83
84/* Flag the transaction as DIRTY_EXEC so that EXEC will fail.
85 * Should be called every time there is an error while queueing a command. */
86void flagTransaction(client *c) {
87 if (c->flags & CLIENT_MULTI)
88 c->flags |= CLIENT_DIRTY_EXEC;
89}
90
91void multiCommand(client *c) {
92 if (c->flags & CLIENT_MULTI) {
93 addReplyError(c,"MULTI calls can not be nested");
94 return;
95 }
96 c->flags |= CLIENT_MULTI;
97
98 addReply(c,shared.ok);
99}
100
101void discardCommand(client *c) {
102 if (!(c->flags & CLIENT_MULTI)) {
103 addReplyError(c,"DISCARD without MULTI");
104 return;
105 }
106 discardTransaction(c);
107 addReply(c,shared.ok);
108}
109
110/* Aborts a transaction, with a specific error message.
111 * The transaction is always aborted with -EXECABORT so that the client knows
112 * the server exited the multi state, but the actual reason for the abort is
113 * included too.
114 * Note: 'error' may or may not end with \r\n. see addReplyErrorFormat. */
115void execCommandAbort(client *c, sds error) {
116 discardTransaction(c);
117
118 if (error[0] == '-') error++;
119 addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
120
121 /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
122 * already, and didn't send any of the queued commands, now we'll just send
123 * EXEC so it is clear that the transaction is over. */
124 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
125}
126
127void execCommand(client *c) {
128 int j;
129 robj **orig_argv;
130 int orig_argc, orig_argv_len;
131 size_t orig_all_argv_len_sum;
132 struct redisCommand *orig_cmd;
133
134 if (!(c->flags & CLIENT_MULTI)) {
135 addReplyError(c,"EXEC without MULTI");
136 return;
137 }
138
139 /* EXEC with expired watched key is disallowed*/
140 if (isWatchedKeyExpired(c)) {
141 c->flags |= (CLIENT_DIRTY_CAS);
142 }
143
144 /* Check if we need to abort the EXEC because:
145 * 1) Some WATCHed key was touched.
146 * 2) There was a previous error while queueing commands.
147 * A failed EXEC in the first case returns a multi bulk nil object
148 * (technically it is not an error but a special behavior), while
149 * in the second an EXECABORT error is returned. */
150 if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
151 if (c->flags & CLIENT_DIRTY_EXEC) {
152 addReplyErrorObject(c, shared.execaborterr);
153 } else {
154 addReply(c, shared.nullarray[c->resp]);
155 }
156
157 discardTransaction(c);
158 return;
159 }
160
161 uint64_t old_flags = c->flags;
162
163 /* we do not want to allow blocking commands inside multi */
164 c->flags |= CLIENT_DENY_BLOCKING;
165
166 /* Exec all the queued commands */
167 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
168
169 server.in_exec = 1;
170
171 orig_argv = c->argv;
172 orig_argv_len = c->argv_len;
173 orig_argc = c->argc;
174 orig_cmd = c->cmd;
175
176 /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field.
177 * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */
178 orig_all_argv_len_sum = c->all_argv_len_sum;
179
180 c->all_argv_len_sum = c->mstate.argv_len_sums;
181
182 /* Skip ACL check for the AOF client while server loading. */
183 int skip_acl_check = server.loading && c->id == CLIENT_ID_AOF;
184
185 addReplyArrayLen(c,c->mstate.count);
186 for (j = 0; j < c->mstate.count; j++) {
187 c->argc = c->mstate.commands[j]->argc;
188 c->argv = c->mstate.commands[j]->argv;
189 c->argv_len = c->mstate.commands[j]->argv_len;
190 c->cmd = c->realcmd = c->mstate.commands[j]->cmd;
191
192 /* ACL permissions are also checked at the time of execution in case
193 * they were changed after the commands were queued. */
194 int acl_errpos;
195 int acl_retval = ACL_OK;
196 if (!skip_acl_check) {
197 acl_retval = ACLCheckAllPerm(c,&acl_errpos);
198 }
199 if (acl_retval != ACL_OK) {
200 char *reason;
201 switch (acl_retval) {
202 case ACL_DENIED_CMD:
203 reason = "no permission to execute the command or subcommand";
204 break;
205 case ACL_DENIED_KEY:
206 reason = "no permission to touch the specified keys";
207 break;
208 case ACL_DENIED_CHANNEL:
209 reason = "no permission to access one of the channels used "
210 "as arguments";
211 break;
212 default:
213 reason = "no permission";
214 break;
215 }
216 addACLLogEntry(c,acl_retval,ACL_LOG_CTX_MULTI,acl_errpos,NULL,NULL);
217 addReplyErrorFormat(c,
218 "-NOPERM ACLs rules changed between the moment the "
219 "transaction was accumulated and the EXEC call. "
220 "This command is no longer allowed for the "
221 "following reason: %s", reason);
222 } else {
223 c->mstate.executing_cmd = j;
224 if (c->id == CLIENT_ID_AOF)
225 call(c,CMD_CALL_NONE);
226 else
227 call(c,CMD_CALL_FULL);
228
229 serverAssert((c->flags & CLIENT_BLOCKED) == 0);
230 }
231
232 /* Commands may alter argc/argv, restore mstate. */
233 c->mstate.commands[j]->argc = c->argc;
234 c->mstate.commands[j]->argv = c->argv;
235 c->mstate.commands[j]->argv_len = c->argv_len;
236 c->mstate.commands[j]->cmd = c->cmd;
237 }
238
239 // restore old DENY_BLOCKING value
240 if (!(old_flags & CLIENT_DENY_BLOCKING))
241 c->flags &= ~CLIENT_DENY_BLOCKING;
242
243 c->argv = orig_argv;
244 c->argv_len = orig_argv_len;
245 c->argc = orig_argc;
246 c->cmd = c->realcmd = orig_cmd;
247 c->all_argv_len_sum = orig_all_argv_len_sum;
248 discardTransaction(c);
249
250 server.in_exec = 0;
251}
252
253/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
254 *
255 * The implementation uses a per-DB hash table mapping keys to list of clients
256 * WATCHing those keys, so that given a key that is going to be modified
257 * we can mark all the associated clients as dirty.
258 *
259 * Also every client contains a list of WATCHed keys so that's possible to
260 * un-watch such keys when the client is freed or when UNWATCH is called. */
261
262/* The watchedKey struct is included in two lists: the client->watched_keys list,
263 * and db->watched_keys dict (each value in that dict is a list of watchedKey structs).
264 * The list in the client struct is a plain list, where each node's value is a pointer to a watchedKey.
265 * The list in the db db->watched_keys is different, the listnode member that's embedded in this struct
266 * is the node in the dict. And the value inside that listnode is a pointer to the that list, and we can use
267 * struct member offset math to get from the listnode to the watchedKey struct.
268 * This is done to avoid the need for listSearchKey and dictFind when we remove from the list. */
269typedef struct watchedKey {
270 listNode node;
271 robj *key;
272 redisDb *db;
273 client *client;
274 unsigned expired:1; /* Flag that we're watching an already expired key. */
275} watchedKey;
276
277/* Attach a watchedKey to the list of clients watching that key. */
278static inline void watchedKeyLinkToClients(list *clients, watchedKey *wk) {
279 wk->node.value = clients; /* Point the value back to the list */
280 listLinkNodeTail(clients, &wk->node); /* Link the embedded node */
281}
282
283/* Get the list of clients watching that key. */
284static inline list *watchedKeyGetClients(watchedKey *wk) {
285 return listNodeValue(&wk->node); /* embedded node->value points back to the list */
286}
287
288/* Get the node with wk->client in the list of clients watching that key. Actually it
289 * is just the embedded node. */
290static inline listNode *watchedKeyGetClientNode(watchedKey *wk) {
291 return &wk->node;
292}
293
294/* Watch for the specified key */
295void watchForKey(client *c, robj *key) {
296 list *clients = NULL;
297 listIter li;
298 listNode *ln;
299 watchedKey *wk;
300
301 if (listLength(c->watched_keys) == 0) server.watching_clients++;
302
303 /* Check if we are already watching for this key */
304 listRewind(c->watched_keys,&li);
305 while((ln = listNext(&li))) {
306 wk = listNodeValue(ln);
307 if (wk->db == c->db && equalStringObjects(key,wk->key))
308 return; /* Key already watched */
309 }
310 /* This key is not already watched in this DB. Let's add it */
311 clients = dictFetchValue(c->db->watched_keys,key);
312 if (!clients) {
313 clients = listCreate();
314 dictAdd(c->db->watched_keys,key,clients);
315 incrRefCount(key);
316 }
317 /* Add the new key to the list of keys watched by this client */
318 wk = zmalloc(sizeof(*wk));
319 wk->key = key;
320 wk->client = c;
321 wk->db = c->db;
322 wk->expired = keyIsExpired(c->db, key->ptr, NULL);
323 incrRefCount(key);
324 listAddNodeTail(c->watched_keys, wk);
325 watchedKeyLinkToClients(clients, wk);
326}
327
328/* Unwatch all the keys watched by this client. To clean the EXEC dirty
329 * flag is up to the caller. */
330void unwatchAllKeys(client *c) {
331 listIter li;
332 listNode *ln;
333
334 if (listLength(c->watched_keys) == 0) return;
335 listRewind(c->watched_keys,&li);
336 while((ln = listNext(&li))) {
337 list *clients;
338 watchedKey *wk;
339
340 /* Remove the client's wk from the list of clients watching the key. */
341 wk = listNodeValue(ln);
342 clients = watchedKeyGetClients(wk);
343 serverAssertWithInfo(c,NULL,clients != NULL);
344 listUnlinkNode(clients, watchedKeyGetClientNode(wk));
345 /* Kill the entry at all if this was the only client */
346 if (listLength(clients) == 0)
347 dictDelete(wk->db->watched_keys, wk->key);
348 /* Remove this watched key from the client->watched list */
349 listDelNode(c->watched_keys,ln);
350 decrRefCount(wk->key);
351 zfree(wk);
352 }
353 server.watching_clients--;
354}
355
356/* Iterates over the watched_keys list and looks for an expired key. Keys which
357 * were expired already when WATCH was called are ignored. */
358int isWatchedKeyExpired(client *c) {
359 listIter li;
360 listNode *ln;
361 watchedKey *wk;
362 if (listLength(c->watched_keys) == 0) return 0;
363 listRewind(c->watched_keys,&li);
364 while ((ln = listNext(&li))) {
365 wk = listNodeValue(ln);
366 if (wk->expired) continue; /* was expired when WATCH was called */
367 if (keyIsExpired(wk->db, wk->key->ptr, NULL)) return 1;
368 }
369
370 return 0;
371}
372
373/* "Touch" a key, so that if this key is being WATCHed by some client the
374 * next EXEC will fail.
375 *
376 * Sanitizer suppression: IO threads also read c->flags, but never modify
377 * it or read the CLIENT_DIRTY_CAS bit, main thread just only modifies
378 * this bit, so there is actually no real data race. */
379REDIS_NO_SANITIZE("thread")
380void touchWatchedKey(redisDb *db, robj *key) {
381 list *clients;
382 listIter li;
383 listNode *ln;
384
385 if (dictSize(db->watched_keys) == 0) return;
386 clients = dictFetchValue(db->watched_keys, key);
387 if (!clients) return;
388
389 /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
390 /* Check if we are already watching for this key */
391 listRewind(clients,&li);
392 while((ln = listNext(&li))) {
393 watchedKey *wk = redis_member2struct(watchedKey, node, ln);
394 client *c = wk->client;
395
396 if (wk->expired) {
397 /* The key was already expired when WATCH was called. */
398 if (db == wk->db &&
399 equalStringObjects(key, wk->key) &&
400 dbFind(db, key->ptr) == NULL)
401 {
402 /* Already expired key is deleted, so logically no change. Clear
403 * the flag. Deleted keys are not flagged as expired. */
404 wk->expired = 0;
405 goto skip_client;
406 }
407 break;
408 }
409
410 c->flags |= CLIENT_DIRTY_CAS;
411 /* As the client is marked as dirty, there is no point in getting here
412 * again in case that key (or others) are modified again (or keep the
413 * memory overhead till EXEC). */
414 unwatchAllKeys(c);
415
416 skip_client:
417 continue;
418 }
419}
420
421/* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
422 * It may happen in the following situations:
423 * - FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication.
424 * - Atomic slot migration trimming phase. In this case, 'slots' is set and only
425 * keys in the specified slots are touched.
426 *
427 * replaced_with: for SWAPDB, the WATCH should be invalidated if
428 * the key exists in either of them, and skipped only if it
429 * doesn't exist in both. */
430REDIS_NO_SANITIZE("thread")
431void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with, struct slotRangeArray *slots) {
432 listIter li;
433 listNode *ln;
434 dictEntry *de;
435
436 if (dictSize(emptied->watched_keys) == 0) return;
437
438 dictIterator di;
439 dictInitSafeIterator(&di, emptied->watched_keys);
440 while((de = dictNext(&di)) != NULL) {
441 robj *key = dictGetKey(de);
442 if (slots && !slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr))))
443 continue;
444 int exists_in_emptied = dbFind(emptied, key->ptr) != NULL;
445 if (exists_in_emptied ||
446 (replaced_with && dbFind(replaced_with, key->ptr) != NULL))
447 {
448 list *clients = dictGetVal(de);
449 if (!clients) continue;
450 listRewind(clients,&li);
451 while((ln = listNext(&li))) {
452 watchedKey *wk = redis_member2struct(watchedKey, node, ln);
453 if (wk->expired) {
454 if (!replaced_with || !dbFind(replaced_with, key->ptr)) {
455 /* Expired key now deleted. No logical change. Clear the
456 * flag. Deleted keys are not flagged as expired. */
457 wk->expired = 0;
458 continue;
459 } else if (keyIsExpired(replaced_with, key->ptr, NULL)) {
460 /* Expired key remains expired. */
461 continue;
462 }
463 } else if (!exists_in_emptied && keyIsExpired(replaced_with, key->ptr, NULL)) {
464 /* Non-existing key is replaced with an expired key. */
465 wk->expired = 1;
466 continue;
467 }
468 client *c = wk->client;
469 c->flags |= CLIENT_DIRTY_CAS;
470 /* Note - we could potentially call unwatchAllKeys for this specific client in order to reduce
471 * the total number of iterations. BUT this could also free the current next entry pointer
472 * held by the iterator and can lead to use-after-free. */
473 }
474 }
475 }
476 dictResetIterator(&di);
477}
478
479void watchCommand(client *c) {
480 int j;
481
482 if (c->flags & CLIENT_MULTI) {
483 addReplyError(c,"WATCH inside MULTI is not allowed");
484 return;
485 }
486 /* No point in watching if the client is already dirty. */
487 if (c->flags & CLIENT_DIRTY_CAS) {
488 addReply(c,shared.ok);
489 return;
490 }
491 for (j = 1; j < c->argc; j++)
492 watchForKey(c,c->argv[j]);
493 addReply(c,shared.ok);
494}
495
496void unwatchCommand(client *c) {
497 unwatchAllKeys(c);
498 c->flags &= (~CLIENT_DIRTY_CAS);
499 addReply(c,shared.ok);
500}
501
502size_t multiStateMemOverhead(client *c) {
503 size_t mem = c->mstate.argv_len_sums;
504 /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
505 mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
506 /* Reserved memory for queued multi commands. */
507 mem += c->mstate.alloc_count * sizeof(pendingCommand);
508 return mem;
509}