summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/multi.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/multi.c')
-rw-r--r--examples/redis-unstable/src/multi.c509
1 files changed, 0 insertions, 509 deletions
diff --git a/examples/redis-unstable/src/multi.c b/examples/redis-unstable/src/multi.c
deleted file mode 100644
index cd8783d..0000000
--- a/examples/redis-unstable/src/multi.c
+++ /dev/null
@@ -1,509 +0,0 @@
-/*
- * Copyright (c) 2009-Present, Redis Ltd.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- */
-
-#include "server.h"
-#include "cluster.h"
-
-/* ================================ MULTI/EXEC ============================== */
-
-/* Client state initialization for MULTI/EXEC */
-void initClientMultiState(client *c) {
- c->mstate.commands = NULL;
- c->mstate.count = 0;
- c->mstate.cmd_flags = 0;
- c->mstate.cmd_inv_flags = 0;
- c->mstate.argv_len_sums = 0;
- c->mstate.alloc_count = 0;
- c->mstate.executing_cmd = -1;
-}
-
-/* Release all the resources associated with MULTI/EXEC state */
-void freeClientMultiState(client *c) {
- for (int i = 0; i < c->mstate.count; i++) {
- freePendingCommand(c, c->mstate.commands[i]);
- }
- zfree(c->mstate.commands);
-}
-
-/* Add a new command into the MULTI commands queue */
-void queueMultiCommand(client *c, uint64_t cmd_flags) {
- /* No sense to waste memory if the transaction is already aborted.
- * this is useful in case client sends these in a pipeline, or doesn't
- * bother to read previous responses and didn't notice the multi was already
- * aborted. */
- if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
- return;
- if (c->mstate.count == 0) {
- /* If a client is using multi/exec, assuming it is used to execute at least
- * two commands. Hence, creating by default size of 2. */
- c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2);
- c->mstate.alloc_count = 2;
- }
- if (c->mstate.count == c->mstate.alloc_count) {
- c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
- c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count));
- }
-
- /* Move the pending command into the multi-state.
- * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up
- * later, but set the value to NULL to indicate it has been moved out and should not be freed. */
- pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds);
- c->current_pending_cmd = NULL;
- pendingCommand **mc = c->mstate.commands + c->mstate.count;
- *mc = pcmd;
-
- c->mstate.count++;
- c->mstate.cmd_flags |= cmd_flags;
- c->mstate.cmd_inv_flags |= ~cmd_flags;
- c->mstate.argv_len_sums += (*mc)->argv_len_sum;
- c->all_argv_len_sum -= (*mc)->argv_len_sum;
-
- (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */
- /* to subtract it from there later. */
-
- /* Reset the client's args since we moved them into the mstate and shouldn't
- * reference them from 'c' anymore. */
- c->argv = NULL;
- c->argc = 0;
- c->argv_len = 0;
-}
-
-void discardTransaction(client *c) {
- freeClientMultiState(c);
- initClientMultiState(c);
- c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
- unwatchAllKeys(c);
-}
-
-/* Flag the transaction as DIRTY_EXEC so that EXEC will fail.
- * Should be called every time there is an error while queueing a command. */
-void flagTransaction(client *c) {
- if (c->flags & CLIENT_MULTI)
- c->flags |= CLIENT_DIRTY_EXEC;
-}
-
-void multiCommand(client *c) {
- if (c->flags & CLIENT_MULTI) {
- addReplyError(c,"MULTI calls can not be nested");
- return;
- }
- c->flags |= CLIENT_MULTI;
-
- addReply(c,shared.ok);
-}
-
-void discardCommand(client *c) {
- if (!(c->flags & CLIENT_MULTI)) {
- addReplyError(c,"DISCARD without MULTI");
- return;
- }
- discardTransaction(c);
- addReply(c,shared.ok);
-}
-
-/* Aborts a transaction, with a specific error message.
- * The transaction is always aborted with -EXECABORT so that the client knows
- * the server exited the multi state, but the actual reason for the abort is
- * included too.
- * Note: 'error' may or may not end with \r\n. see addReplyErrorFormat. */
-void execCommandAbort(client *c, sds error) {
- discardTransaction(c);
-
- if (error[0] == '-') error++;
- addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
-
- /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
- * already, and didn't send any of the queued commands, now we'll just send
- * EXEC so it is clear that the transaction is over. */
- replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
-}
-
-void execCommand(client *c) {
- int j;
- robj **orig_argv;
- int orig_argc, orig_argv_len;
- size_t orig_all_argv_len_sum;
- struct redisCommand *orig_cmd;
-
- if (!(c->flags & CLIENT_MULTI)) {
- addReplyError(c,"EXEC without MULTI");
- return;
- }
-
- /* EXEC with expired watched key is disallowed*/
- if (isWatchedKeyExpired(c)) {
- c->flags |= (CLIENT_DIRTY_CAS);
- }
-
- /* Check if we need to abort the EXEC because:
- * 1) Some WATCHed key was touched.
- * 2) There was a previous error while queueing commands.
- * A failed EXEC in the first case returns a multi bulk nil object
- * (technically it is not an error but a special behavior), while
- * in the second an EXECABORT error is returned. */
- if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
- if (c->flags & CLIENT_DIRTY_EXEC) {
- addReplyErrorObject(c, shared.execaborterr);
- } else {
- addReply(c, shared.nullarray[c->resp]);
- }
-
- discardTransaction(c);
- return;
- }
-
- uint64_t old_flags = c->flags;
-
- /* we do not want to allow blocking commands inside multi */
- c->flags |= CLIENT_DENY_BLOCKING;
-
- /* Exec all the queued commands */
- unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
-
- server.in_exec = 1;
-
- orig_argv = c->argv;
- orig_argv_len = c->argv_len;
- orig_argc = c->argc;
- orig_cmd = c->cmd;
-
- /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field.
- * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */
- orig_all_argv_len_sum = c->all_argv_len_sum;
-
- c->all_argv_len_sum = c->mstate.argv_len_sums;
-
- /* Skip ACL check for the AOF client while server loading. */
- int skip_acl_check = server.loading && c->id == CLIENT_ID_AOF;
-
- addReplyArrayLen(c,c->mstate.count);
- for (j = 0; j < c->mstate.count; j++) {
- c->argc = c->mstate.commands[j]->argc;
- c->argv = c->mstate.commands[j]->argv;
- c->argv_len = c->mstate.commands[j]->argv_len;
- c->cmd = c->realcmd = c->mstate.commands[j]->cmd;
-
- /* ACL permissions are also checked at the time of execution in case
- * they were changed after the commands were queued. */
- int acl_errpos;
- int acl_retval = ACL_OK;
- if (!skip_acl_check) {
- acl_retval = ACLCheckAllPerm(c,&acl_errpos);
- }
- if (acl_retval != ACL_OK) {
- char *reason;
- switch (acl_retval) {
- case ACL_DENIED_CMD:
- reason = "no permission to execute the command or subcommand";
- break;
- case ACL_DENIED_KEY:
- reason = "no permission to touch the specified keys";
- break;
- case ACL_DENIED_CHANNEL:
- reason = "no permission to access one of the channels used "
- "as arguments";
- break;
- default:
- reason = "no permission";
- break;
- }
- addACLLogEntry(c,acl_retval,ACL_LOG_CTX_MULTI,acl_errpos,NULL,NULL);
- addReplyErrorFormat(c,
- "-NOPERM ACLs rules changed between the moment the "
- "transaction was accumulated and the EXEC call. "
- "This command is no longer allowed for the "
- "following reason: %s", reason);
- } else {
- c->mstate.executing_cmd = j;
- if (c->id == CLIENT_ID_AOF)
- call(c,CMD_CALL_NONE);
- else
- call(c,CMD_CALL_FULL);
-
- serverAssert((c->flags & CLIENT_BLOCKED) == 0);
- }
-
- /* Commands may alter argc/argv, restore mstate. */
- c->mstate.commands[j]->argc = c->argc;
- c->mstate.commands[j]->argv = c->argv;
- c->mstate.commands[j]->argv_len = c->argv_len;
- c->mstate.commands[j]->cmd = c->cmd;
- }
-
- // restore old DENY_BLOCKING value
- if (!(old_flags & CLIENT_DENY_BLOCKING))
- c->flags &= ~CLIENT_DENY_BLOCKING;
-
- c->argv = orig_argv;
- c->argv_len = orig_argv_len;
- c->argc = orig_argc;
- c->cmd = c->realcmd = orig_cmd;
- c->all_argv_len_sum = orig_all_argv_len_sum;
- discardTransaction(c);
-
- server.in_exec = 0;
-}
-
-/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
- *
- * The implementation uses a per-DB hash table mapping keys to list of clients
- * WATCHing those keys, so that given a key that is going to be modified
- * we can mark all the associated clients as dirty.
- *
- * Also every client contains a list of WATCHed keys so that's possible to
- * un-watch such keys when the client is freed or when UNWATCH is called. */
-
-/* The watchedKey struct is included in two lists: the client->watched_keys list,
- * and db->watched_keys dict (each value in that dict is a list of watchedKey structs).
- * The list in the client struct is a plain list, where each node's value is a pointer to a watchedKey.
- * The list in the db db->watched_keys is different, the listnode member that's embedded in this struct
- * is the node in the dict. And the value inside that listnode is a pointer to the that list, and we can use
- * struct member offset math to get from the listnode to the watchedKey struct.
- * This is done to avoid the need for listSearchKey and dictFind when we remove from the list. */
-typedef struct watchedKey {
- listNode node;
- robj *key;
- redisDb *db;
- client *client;
- unsigned expired:1; /* Flag that we're watching an already expired key. */
-} watchedKey;
-
-/* Attach a watchedKey to the list of clients watching that key. */
-static inline void watchedKeyLinkToClients(list *clients, watchedKey *wk) {
- wk->node.value = clients; /* Point the value back to the list */
- listLinkNodeTail(clients, &wk->node); /* Link the embedded node */
-}
-
-/* Get the list of clients watching that key. */
-static inline list *watchedKeyGetClients(watchedKey *wk) {
- return listNodeValue(&wk->node); /* embedded node->value points back to the list */
-}
-
-/* Get the node with wk->client in the list of clients watching that key. Actually it
- * is just the embedded node. */
-static inline listNode *watchedKeyGetClientNode(watchedKey *wk) {
- return &wk->node;
-}
-
-/* Watch for the specified key */
-void watchForKey(client *c, robj *key) {
- list *clients = NULL;
- listIter li;
- listNode *ln;
- watchedKey *wk;
-
- if (listLength(c->watched_keys) == 0) server.watching_clients++;
-
- /* Check if we are already watching for this key */
- listRewind(c->watched_keys,&li);
- while((ln = listNext(&li))) {
- wk = listNodeValue(ln);
- if (wk->db == c->db && equalStringObjects(key,wk->key))
- return; /* Key already watched */
- }
- /* This key is not already watched in this DB. Let's add it */
- clients = dictFetchValue(c->db->watched_keys,key);
- if (!clients) {
- clients = listCreate();
- dictAdd(c->db->watched_keys,key,clients);
- incrRefCount(key);
- }
- /* Add the new key to the list of keys watched by this client */
- wk = zmalloc(sizeof(*wk));
- wk->key = key;
- wk->client = c;
- wk->db = c->db;
- wk->expired = keyIsExpired(c->db, key->ptr, NULL);
- incrRefCount(key);
- listAddNodeTail(c->watched_keys, wk);
- watchedKeyLinkToClients(clients, wk);
-}
-
-/* Unwatch all the keys watched by this client. To clean the EXEC dirty
- * flag is up to the caller. */
-void unwatchAllKeys(client *c) {
- listIter li;
- listNode *ln;
-
- if (listLength(c->watched_keys) == 0) return;
- listRewind(c->watched_keys,&li);
- while((ln = listNext(&li))) {
- list *clients;
- watchedKey *wk;
-
- /* Remove the client's wk from the list of clients watching the key. */
- wk = listNodeValue(ln);
- clients = watchedKeyGetClients(wk);
- serverAssertWithInfo(c,NULL,clients != NULL);
- listUnlinkNode(clients, watchedKeyGetClientNode(wk));
- /* Kill the entry at all if this was the only client */
- if (listLength(clients) == 0)
- dictDelete(wk->db->watched_keys, wk->key);
- /* Remove this watched key from the client->watched list */
- listDelNode(c->watched_keys,ln);
- decrRefCount(wk->key);
- zfree(wk);
- }
- server.watching_clients--;
-}
-
-/* Iterates over the watched_keys list and looks for an expired key. Keys which
- * were expired already when WATCH was called are ignored. */
-int isWatchedKeyExpired(client *c) {
- listIter li;
- listNode *ln;
- watchedKey *wk;
- if (listLength(c->watched_keys) == 0) return 0;
- listRewind(c->watched_keys,&li);
- while ((ln = listNext(&li))) {
- wk = listNodeValue(ln);
- if (wk->expired) continue; /* was expired when WATCH was called */
- if (keyIsExpired(wk->db, wk->key->ptr, NULL)) return 1;
- }
-
- return 0;
-}
-
-/* "Touch" a key, so that if this key is being WATCHed by some client the
- * next EXEC will fail.
- *
- * Sanitizer suppression: IO threads also read c->flags, but never modify
- * it or read the CLIENT_DIRTY_CAS bit, main thread just only modifies
- * this bit, so there is actually no real data race. */
-REDIS_NO_SANITIZE("thread")
-void touchWatchedKey(redisDb *db, robj *key) {
- list *clients;
- listIter li;
- listNode *ln;
-
- if (dictSize(db->watched_keys) == 0) return;
- clients = dictFetchValue(db->watched_keys, key);
- if (!clients) return;
-
- /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
- /* Check if we are already watching for this key */
- listRewind(clients,&li);
- while((ln = listNext(&li))) {
- watchedKey *wk = redis_member2struct(watchedKey, node, ln);
- client *c = wk->client;
-
- if (wk->expired) {
- /* The key was already expired when WATCH was called. */
- if (db == wk->db &&
- equalStringObjects(key, wk->key) &&
- dbFind(db, key->ptr) == NULL)
- {
- /* Already expired key is deleted, so logically no change. Clear
- * the flag. Deleted keys are not flagged as expired. */
- wk->expired = 0;
- goto skip_client;
- }
- break;
- }
-
- c->flags |= CLIENT_DIRTY_CAS;
- /* As the client is marked as dirty, there is no point in getting here
- * again in case that key (or others) are modified again (or keep the
- * memory overhead till EXEC). */
- unwatchAllKeys(c);
-
- skip_client:
- continue;
- }
-}
-
-/* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
- * It may happen in the following situations:
- * - FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication.
- * - Atomic slot migration trimming phase. In this case, 'slots' is set and only
- * keys in the specified slots are touched.
- *
- * replaced_with: for SWAPDB, the WATCH should be invalidated if
- * the key exists in either of them, and skipped only if it
- * doesn't exist in both. */
-REDIS_NO_SANITIZE("thread")
-void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with, struct slotRangeArray *slots) {
- listIter li;
- listNode *ln;
- dictEntry *de;
-
- if (dictSize(emptied->watched_keys) == 0) return;
-
- dictIterator di;
- dictInitSafeIterator(&di, emptied->watched_keys);
- while((de = dictNext(&di)) != NULL) {
- robj *key = dictGetKey(de);
- if (slots && !slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr))))
- continue;
- int exists_in_emptied = dbFind(emptied, key->ptr) != NULL;
- if (exists_in_emptied ||
- (replaced_with && dbFind(replaced_with, key->ptr) != NULL))
- {
- list *clients = dictGetVal(de);
- if (!clients) continue;
- listRewind(clients,&li);
- while((ln = listNext(&li))) {
- watchedKey *wk = redis_member2struct(watchedKey, node, ln);
- if (wk->expired) {
- if (!replaced_with || !dbFind(replaced_with, key->ptr)) {
- /* Expired key now deleted. No logical change. Clear the
- * flag. Deleted keys are not flagged as expired. */
- wk->expired = 0;
- continue;
- } else if (keyIsExpired(replaced_with, key->ptr, NULL)) {
- /* Expired key remains expired. */
- continue;
- }
- } else if (!exists_in_emptied && keyIsExpired(replaced_with, key->ptr, NULL)) {
- /* Non-existing key is replaced with an expired key. */
- wk->expired = 1;
- continue;
- }
- client *c = wk->client;
- c->flags |= CLIENT_DIRTY_CAS;
- /* Note - we could potentially call unwatchAllKeys for this specific client in order to reduce
- * the total number of iterations. BUT this could also free the current next entry pointer
- * held by the iterator and can lead to use-after-free. */
- }
- }
- }
- dictResetIterator(&di);
-}
-
-void watchCommand(client *c) {
- int j;
-
- if (c->flags & CLIENT_MULTI) {
- addReplyError(c,"WATCH inside MULTI is not allowed");
- return;
- }
- /* No point in watching if the client is already dirty. */
- if (c->flags & CLIENT_DIRTY_CAS) {
- addReply(c,shared.ok);
- return;
- }
- for (j = 1; j < c->argc; j++)
- watchForKey(c,c->argv[j]);
- addReply(c,shared.ok);
-}
-
-void unwatchCommand(client *c) {
- unwatchAllKeys(c);
- c->flags &= (~CLIENT_DIRTY_CAS);
- addReply(c,shared.ok);
-}
-
-size_t multiStateMemOverhead(client *c) {
- size_t mem = c->mstate.argv_len_sums;
- /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
- mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
- /* Reserved memory for queued multi commands. */
- mem += c->mstate.alloc_count * sizeof(pendingCommand);
- return mem;
-}