diff options
Diffstat (limited to 'examples/redis-unstable/src/multi.c')
| -rw-r--r-- | examples/redis-unstable/src/multi.c | 509 |
1 files changed, 0 insertions, 509 deletions
diff --git a/examples/redis-unstable/src/multi.c b/examples/redis-unstable/src/multi.c deleted file mode 100644 index cd8783d..0000000 --- a/examples/redis-unstable/src/multi.c +++ /dev/null | |||
| @@ -1,509 +0,0 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (c) 2009-Present, Redis Ltd. | ||
| 3 | * All rights reserved. | ||
| 4 | * | ||
| 5 | * Licensed under your choice of (a) the Redis Source Available License 2.0 | ||
| 6 | * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the | ||
| 7 | * GNU Affero General Public License v3 (AGPLv3). | ||
| 8 | */ | ||
| 9 | |||
| 10 | #include "server.h" | ||
| 11 | #include "cluster.h" | ||
| 12 | |||
| 13 | /* ================================ MULTI/EXEC ============================== */ | ||
| 14 | |||
| 15 | /* Client state initialization for MULTI/EXEC */ | ||
| 16 | void initClientMultiState(client *c) { | ||
| 17 | c->mstate.commands = NULL; | ||
| 18 | c->mstate.count = 0; | ||
| 19 | c->mstate.cmd_flags = 0; | ||
| 20 | c->mstate.cmd_inv_flags = 0; | ||
| 21 | c->mstate.argv_len_sums = 0; | ||
| 22 | c->mstate.alloc_count = 0; | ||
| 23 | c->mstate.executing_cmd = -1; | ||
| 24 | } | ||
| 25 | |||
| 26 | /* Release all the resources associated with MULTI/EXEC state */ | ||
| 27 | void freeClientMultiState(client *c) { | ||
| 28 | for (int i = 0; i < c->mstate.count; i++) { | ||
| 29 | freePendingCommand(c, c->mstate.commands[i]); | ||
| 30 | } | ||
| 31 | zfree(c->mstate.commands); | ||
| 32 | } | ||
| 33 | |||
| 34 | /* Add a new command into the MULTI commands queue */ | ||
| 35 | void queueMultiCommand(client *c, uint64_t cmd_flags) { | ||
| 36 | /* No sense to waste memory if the transaction is already aborted. | ||
| 37 | * this is useful in case client sends these in a pipeline, or doesn't | ||
| 38 | * bother to read previous responses and didn't notice the multi was already | ||
| 39 | * aborted. */ | ||
| 40 | if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) | ||
| 41 | return; | ||
| 42 | if (c->mstate.count == 0) { | ||
| 43 | /* If a client is using multi/exec, assuming it is used to execute at least | ||
| 44 | * two commands. Hence, creating by default size of 2. */ | ||
| 45 | c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2); | ||
| 46 | c->mstate.alloc_count = 2; | ||
| 47 | } | ||
| 48 | if (c->mstate.count == c->mstate.alloc_count) { | ||
| 49 | c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX; | ||
| 50 | c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count)); | ||
| 51 | } | ||
| 52 | |||
| 53 | /* Move the pending command into the multi-state. | ||
| 54 | * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up | ||
| 55 | * later, but set the value to NULL to indicate it has been moved out and should not be freed. */ | ||
| 56 | pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); | ||
| 57 | c->current_pending_cmd = NULL; | ||
| 58 | pendingCommand **mc = c->mstate.commands + c->mstate.count; | ||
| 59 | *mc = pcmd; | ||
| 60 | |||
| 61 | c->mstate.count++; | ||
| 62 | c->mstate.cmd_flags |= cmd_flags; | ||
| 63 | c->mstate.cmd_inv_flags |= ~cmd_flags; | ||
| 64 | c->mstate.argv_len_sums += (*mc)->argv_len_sum; | ||
| 65 | c->all_argv_len_sum -= (*mc)->argv_len_sum; | ||
| 66 | |||
| 67 | (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */ | ||
| 68 | /* to subtract it from there later. */ | ||
| 69 | |||
| 70 | /* Reset the client's args since we moved them into the mstate and shouldn't | ||
| 71 | * reference them from 'c' anymore. */ | ||
| 72 | c->argv = NULL; | ||
| 73 | c->argc = 0; | ||
| 74 | c->argv_len = 0; | ||
| 75 | } | ||
| 76 | |||
| 77 | void discardTransaction(client *c) { | ||
| 78 | freeClientMultiState(c); | ||
| 79 | initClientMultiState(c); | ||
| 80 | c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); | ||
| 81 | unwatchAllKeys(c); | ||
| 82 | } | ||
| 83 | |||
| 84 | /* Flag the transaction as DIRTY_EXEC so that EXEC will fail. | ||
| 85 | * Should be called every time there is an error while queueing a command. */ | ||
| 86 | void flagTransaction(client *c) { | ||
| 87 | if (c->flags & CLIENT_MULTI) | ||
| 88 | c->flags |= CLIENT_DIRTY_EXEC; | ||
| 89 | } | ||
| 90 | |||
| 91 | void multiCommand(client *c) { | ||
| 92 | if (c->flags & CLIENT_MULTI) { | ||
| 93 | addReplyError(c,"MULTI calls can not be nested"); | ||
| 94 | return; | ||
| 95 | } | ||
| 96 | c->flags |= CLIENT_MULTI; | ||
| 97 | |||
| 98 | addReply(c,shared.ok); | ||
| 99 | } | ||
| 100 | |||
| 101 | void discardCommand(client *c) { | ||
| 102 | if (!(c->flags & CLIENT_MULTI)) { | ||
| 103 | addReplyError(c,"DISCARD without MULTI"); | ||
| 104 | return; | ||
| 105 | } | ||
| 106 | discardTransaction(c); | ||
| 107 | addReply(c,shared.ok); | ||
| 108 | } | ||
| 109 | |||
| 110 | /* Aborts a transaction, with a specific error message. | ||
| 111 | * The transaction is always aborted with -EXECABORT so that the client knows | ||
| 112 | * the server exited the multi state, but the actual reason for the abort is | ||
| 113 | * included too. | ||
| 114 | * Note: 'error' may or may not end with \r\n. see addReplyErrorFormat. */ | ||
| 115 | void execCommandAbort(client *c, sds error) { | ||
| 116 | discardTransaction(c); | ||
| 117 | |||
| 118 | if (error[0] == '-') error++; | ||
| 119 | addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error); | ||
| 120 | |||
| 121 | /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI | ||
| 122 | * already, and didn't send any of the queued commands, now we'll just send | ||
| 123 | * EXEC so it is clear that the transaction is over. */ | ||
| 124 | replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); | ||
| 125 | } | ||
| 126 | |||
| 127 | void execCommand(client *c) { | ||
| 128 | int j; | ||
| 129 | robj **orig_argv; | ||
| 130 | int orig_argc, orig_argv_len; | ||
| 131 | size_t orig_all_argv_len_sum; | ||
| 132 | struct redisCommand *orig_cmd; | ||
| 133 | |||
| 134 | if (!(c->flags & CLIENT_MULTI)) { | ||
| 135 | addReplyError(c,"EXEC without MULTI"); | ||
| 136 | return; | ||
| 137 | } | ||
| 138 | |||
| 139 | /* EXEC with expired watched key is disallowed*/ | ||
| 140 | if (isWatchedKeyExpired(c)) { | ||
| 141 | c->flags |= (CLIENT_DIRTY_CAS); | ||
| 142 | } | ||
| 143 | |||
| 144 | /* Check if we need to abort the EXEC because: | ||
| 145 | * 1) Some WATCHed key was touched. | ||
| 146 | * 2) There was a previous error while queueing commands. | ||
| 147 | * A failed EXEC in the first case returns a multi bulk nil object | ||
| 148 | * (technically it is not an error but a special behavior), while | ||
| 149 | * in the second an EXECABORT error is returned. */ | ||
| 150 | if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) { | ||
| 151 | if (c->flags & CLIENT_DIRTY_EXEC) { | ||
| 152 | addReplyErrorObject(c, shared.execaborterr); | ||
| 153 | } else { | ||
| 154 | addReply(c, shared.nullarray[c->resp]); | ||
| 155 | } | ||
| 156 | |||
| 157 | discardTransaction(c); | ||
| 158 | return; | ||
| 159 | } | ||
| 160 | |||
| 161 | uint64_t old_flags = c->flags; | ||
| 162 | |||
| 163 | /* we do not want to allow blocking commands inside multi */ | ||
| 164 | c->flags |= CLIENT_DENY_BLOCKING; | ||
| 165 | |||
| 166 | /* Exec all the queued commands */ | ||
| 167 | unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ | ||
| 168 | |||
| 169 | server.in_exec = 1; | ||
| 170 | |||
| 171 | orig_argv = c->argv; | ||
| 172 | orig_argv_len = c->argv_len; | ||
| 173 | orig_argc = c->argc; | ||
| 174 | orig_cmd = c->cmd; | ||
| 175 | |||
| 176 | /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field. | ||
| 177 | * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */ | ||
| 178 | orig_all_argv_len_sum = c->all_argv_len_sum; | ||
| 179 | |||
| 180 | c->all_argv_len_sum = c->mstate.argv_len_sums; | ||
| 181 | |||
| 182 | /* Skip ACL check for the AOF client while server loading. */ | ||
| 183 | int skip_acl_check = server.loading && c->id == CLIENT_ID_AOF; | ||
| 184 | |||
| 185 | addReplyArrayLen(c,c->mstate.count); | ||
| 186 | for (j = 0; j < c->mstate.count; j++) { | ||
| 187 | c->argc = c->mstate.commands[j]->argc; | ||
| 188 | c->argv = c->mstate.commands[j]->argv; | ||
| 189 | c->argv_len = c->mstate.commands[j]->argv_len; | ||
| 190 | c->cmd = c->realcmd = c->mstate.commands[j]->cmd; | ||
| 191 | |||
| 192 | /* ACL permissions are also checked at the time of execution in case | ||
| 193 | * they were changed after the commands were queued. */ | ||
| 194 | int acl_errpos; | ||
| 195 | int acl_retval = ACL_OK; | ||
| 196 | if (!skip_acl_check) { | ||
| 197 | acl_retval = ACLCheckAllPerm(c,&acl_errpos); | ||
| 198 | } | ||
| 199 | if (acl_retval != ACL_OK) { | ||
| 200 | char *reason; | ||
| 201 | switch (acl_retval) { | ||
| 202 | case ACL_DENIED_CMD: | ||
| 203 | reason = "no permission to execute the command or subcommand"; | ||
| 204 | break; | ||
| 205 | case ACL_DENIED_KEY: | ||
| 206 | reason = "no permission to touch the specified keys"; | ||
| 207 | break; | ||
| 208 | case ACL_DENIED_CHANNEL: | ||
| 209 | reason = "no permission to access one of the channels used " | ||
| 210 | "as arguments"; | ||
| 211 | break; | ||
| 212 | default: | ||
| 213 | reason = "no permission"; | ||
| 214 | break; | ||
| 215 | } | ||
| 216 | addACLLogEntry(c,acl_retval,ACL_LOG_CTX_MULTI,acl_errpos,NULL,NULL); | ||
| 217 | addReplyErrorFormat(c, | ||
| 218 | "-NOPERM ACLs rules changed between the moment the " | ||
| 219 | "transaction was accumulated and the EXEC call. " | ||
| 220 | "This command is no longer allowed for the " | ||
| 221 | "following reason: %s", reason); | ||
| 222 | } else { | ||
| 223 | c->mstate.executing_cmd = j; | ||
| 224 | if (c->id == CLIENT_ID_AOF) | ||
| 225 | call(c,CMD_CALL_NONE); | ||
| 226 | else | ||
| 227 | call(c,CMD_CALL_FULL); | ||
| 228 | |||
| 229 | serverAssert((c->flags & CLIENT_BLOCKED) == 0); | ||
| 230 | } | ||
| 231 | |||
| 232 | /* Commands may alter argc/argv, restore mstate. */ | ||
| 233 | c->mstate.commands[j]->argc = c->argc; | ||
| 234 | c->mstate.commands[j]->argv = c->argv; | ||
| 235 | c->mstate.commands[j]->argv_len = c->argv_len; | ||
| 236 | c->mstate.commands[j]->cmd = c->cmd; | ||
| 237 | } | ||
| 238 | |||
| 239 | // restore old DENY_BLOCKING value | ||
| 240 | if (!(old_flags & CLIENT_DENY_BLOCKING)) | ||
| 241 | c->flags &= ~CLIENT_DENY_BLOCKING; | ||
| 242 | |||
| 243 | c->argv = orig_argv; | ||
| 244 | c->argv_len = orig_argv_len; | ||
| 245 | c->argc = orig_argc; | ||
| 246 | c->cmd = c->realcmd = orig_cmd; | ||
| 247 | c->all_argv_len_sum = orig_all_argv_len_sum; | ||
| 248 | discardTransaction(c); | ||
| 249 | |||
| 250 | server.in_exec = 0; | ||
| 251 | } | ||
| 252 | |||
| 253 | /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== | ||
| 254 | * | ||
| 255 | * The implementation uses a per-DB hash table mapping keys to list of clients | ||
| 256 | * WATCHing those keys, so that given a key that is going to be modified | ||
| 257 | * we can mark all the associated clients as dirty. | ||
| 258 | * | ||
| 259 | * Also every client contains a list of WATCHed keys so that's possible to | ||
| 260 | * un-watch such keys when the client is freed or when UNWATCH is called. */ | ||
| 261 | |||
| 262 | /* The watchedKey struct is included in two lists: the client->watched_keys list, | ||
| 263 | * and db->watched_keys dict (each value in that dict is a list of watchedKey structs). | ||
| 264 | * The list in the client struct is a plain list, where each node's value is a pointer to a watchedKey. | ||
| 265 | * The list in the db db->watched_keys is different, the listnode member that's embedded in this struct | ||
| 266 | * is the node in the dict. And the value inside that listnode is a pointer to the that list, and we can use | ||
| 267 | * struct member offset math to get from the listnode to the watchedKey struct. | ||
| 268 | * This is done to avoid the need for listSearchKey and dictFind when we remove from the list. */ | ||
| 269 | typedef struct watchedKey { | ||
| 270 | listNode node; | ||
| 271 | robj *key; | ||
| 272 | redisDb *db; | ||
| 273 | client *client; | ||
| 274 | unsigned expired:1; /* Flag that we're watching an already expired key. */ | ||
| 275 | } watchedKey; | ||
| 276 | |||
| 277 | /* Attach a watchedKey to the list of clients watching that key. */ | ||
| 278 | static inline void watchedKeyLinkToClients(list *clients, watchedKey *wk) { | ||
| 279 | wk->node.value = clients; /* Point the value back to the list */ | ||
| 280 | listLinkNodeTail(clients, &wk->node); /* Link the embedded node */ | ||
| 281 | } | ||
| 282 | |||
| 283 | /* Get the list of clients watching that key. */ | ||
| 284 | static inline list *watchedKeyGetClients(watchedKey *wk) { | ||
| 285 | return listNodeValue(&wk->node); /* embedded node->value points back to the list */ | ||
| 286 | } | ||
| 287 | |||
| 288 | /* Get the node with wk->client in the list of clients watching that key. Actually it | ||
| 289 | * is just the embedded node. */ | ||
| 290 | static inline listNode *watchedKeyGetClientNode(watchedKey *wk) { | ||
| 291 | return &wk->node; | ||
| 292 | } | ||
| 293 | |||
| 294 | /* Watch for the specified key */ | ||
| 295 | void watchForKey(client *c, robj *key) { | ||
| 296 | list *clients = NULL; | ||
| 297 | listIter li; | ||
| 298 | listNode *ln; | ||
| 299 | watchedKey *wk; | ||
| 300 | |||
| 301 | if (listLength(c->watched_keys) == 0) server.watching_clients++; | ||
| 302 | |||
| 303 | /* Check if we are already watching for this key */ | ||
| 304 | listRewind(c->watched_keys,&li); | ||
| 305 | while((ln = listNext(&li))) { | ||
| 306 | wk = listNodeValue(ln); | ||
| 307 | if (wk->db == c->db && equalStringObjects(key,wk->key)) | ||
| 308 | return; /* Key already watched */ | ||
| 309 | } | ||
| 310 | /* This key is not already watched in this DB. Let's add it */ | ||
| 311 | clients = dictFetchValue(c->db->watched_keys,key); | ||
| 312 | if (!clients) { | ||
| 313 | clients = listCreate(); | ||
| 314 | dictAdd(c->db->watched_keys,key,clients); | ||
| 315 | incrRefCount(key); | ||
| 316 | } | ||
| 317 | /* Add the new key to the list of keys watched by this client */ | ||
| 318 | wk = zmalloc(sizeof(*wk)); | ||
| 319 | wk->key = key; | ||
| 320 | wk->client = c; | ||
| 321 | wk->db = c->db; | ||
| 322 | wk->expired = keyIsExpired(c->db, key->ptr, NULL); | ||
| 323 | incrRefCount(key); | ||
| 324 | listAddNodeTail(c->watched_keys, wk); | ||
| 325 | watchedKeyLinkToClients(clients, wk); | ||
| 326 | } | ||
| 327 | |||
| 328 | /* Unwatch all the keys watched by this client. To clean the EXEC dirty | ||
| 329 | * flag is up to the caller. */ | ||
| 330 | void unwatchAllKeys(client *c) { | ||
| 331 | listIter li; | ||
| 332 | listNode *ln; | ||
| 333 | |||
| 334 | if (listLength(c->watched_keys) == 0) return; | ||
| 335 | listRewind(c->watched_keys,&li); | ||
| 336 | while((ln = listNext(&li))) { | ||
| 337 | list *clients; | ||
| 338 | watchedKey *wk; | ||
| 339 | |||
| 340 | /* Remove the client's wk from the list of clients watching the key. */ | ||
| 341 | wk = listNodeValue(ln); | ||
| 342 | clients = watchedKeyGetClients(wk); | ||
| 343 | serverAssertWithInfo(c,NULL,clients != NULL); | ||
| 344 | listUnlinkNode(clients, watchedKeyGetClientNode(wk)); | ||
| 345 | /* Kill the entry at all if this was the only client */ | ||
| 346 | if (listLength(clients) == 0) | ||
| 347 | dictDelete(wk->db->watched_keys, wk->key); | ||
| 348 | /* Remove this watched key from the client->watched list */ | ||
| 349 | listDelNode(c->watched_keys,ln); | ||
| 350 | decrRefCount(wk->key); | ||
| 351 | zfree(wk); | ||
| 352 | } | ||
| 353 | server.watching_clients--; | ||
| 354 | } | ||
| 355 | |||
| 356 | /* Iterates over the watched_keys list and looks for an expired key. Keys which | ||
| 357 | * were expired already when WATCH was called are ignored. */ | ||
| 358 | int isWatchedKeyExpired(client *c) { | ||
| 359 | listIter li; | ||
| 360 | listNode *ln; | ||
| 361 | watchedKey *wk; | ||
| 362 | if (listLength(c->watched_keys) == 0) return 0; | ||
| 363 | listRewind(c->watched_keys,&li); | ||
| 364 | while ((ln = listNext(&li))) { | ||
| 365 | wk = listNodeValue(ln); | ||
| 366 | if (wk->expired) continue; /* was expired when WATCH was called */ | ||
| 367 | if (keyIsExpired(wk->db, wk->key->ptr, NULL)) return 1; | ||
| 368 | } | ||
| 369 | |||
| 370 | return 0; | ||
| 371 | } | ||
| 372 | |||
| 373 | /* "Touch" a key, so that if this key is being WATCHed by some client the | ||
| 374 | * next EXEC will fail. | ||
| 375 | * | ||
| 376 | * Sanitizer suppression: IO threads also read c->flags, but never modify | ||
| 377 | * it or read the CLIENT_DIRTY_CAS bit, main thread just only modifies | ||
| 378 | * this bit, so there is actually no real data race. */ | ||
| 379 | REDIS_NO_SANITIZE("thread") | ||
| 380 | void touchWatchedKey(redisDb *db, robj *key) { | ||
| 381 | list *clients; | ||
| 382 | listIter li; | ||
| 383 | listNode *ln; | ||
| 384 | |||
| 385 | if (dictSize(db->watched_keys) == 0) return; | ||
| 386 | clients = dictFetchValue(db->watched_keys, key); | ||
| 387 | if (!clients) return; | ||
| 388 | |||
| 389 | /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ | ||
| 390 | /* Check if we are already watching for this key */ | ||
| 391 | listRewind(clients,&li); | ||
| 392 | while((ln = listNext(&li))) { | ||
| 393 | watchedKey *wk = redis_member2struct(watchedKey, node, ln); | ||
| 394 | client *c = wk->client; | ||
| 395 | |||
| 396 | if (wk->expired) { | ||
| 397 | /* The key was already expired when WATCH was called. */ | ||
| 398 | if (db == wk->db && | ||
| 399 | equalStringObjects(key, wk->key) && | ||
| 400 | dbFind(db, key->ptr) == NULL) | ||
| 401 | { | ||
| 402 | /* Already expired key is deleted, so logically no change. Clear | ||
| 403 | * the flag. Deleted keys are not flagged as expired. */ | ||
| 404 | wk->expired = 0; | ||
| 405 | goto skip_client; | ||
| 406 | } | ||
| 407 | break; | ||
| 408 | } | ||
| 409 | |||
| 410 | c->flags |= CLIENT_DIRTY_CAS; | ||
| 411 | /* As the client is marked as dirty, there is no point in getting here | ||
| 412 | * again in case that key (or others) are modified again (or keep the | ||
| 413 | * memory overhead till EXEC). */ | ||
| 414 | unwatchAllKeys(c); | ||
| 415 | |||
| 416 | skip_client: | ||
| 417 | continue; | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | /* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty. | ||
| 422 | * It may happen in the following situations: | ||
| 423 | * - FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication. | ||
| 424 | * - Atomic slot migration trimming phase. In this case, 'slots' is set and only | ||
| 425 | * keys in the specified slots are touched. | ||
| 426 | * | ||
| 427 | * replaced_with: for SWAPDB, the WATCH should be invalidated if | ||
| 428 | * the key exists in either of them, and skipped only if it | ||
| 429 | * doesn't exist in both. */ | ||
| 430 | REDIS_NO_SANITIZE("thread") | ||
| 431 | void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with, struct slotRangeArray *slots) { | ||
| 432 | listIter li; | ||
| 433 | listNode *ln; | ||
| 434 | dictEntry *de; | ||
| 435 | |||
| 436 | if (dictSize(emptied->watched_keys) == 0) return; | ||
| 437 | |||
| 438 | dictIterator di; | ||
| 439 | dictInitSafeIterator(&di, emptied->watched_keys); | ||
| 440 | while((de = dictNext(&di)) != NULL) { | ||
| 441 | robj *key = dictGetKey(de); | ||
| 442 | if (slots && !slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr)))) | ||
| 443 | continue; | ||
| 444 | int exists_in_emptied = dbFind(emptied, key->ptr) != NULL; | ||
| 445 | if (exists_in_emptied || | ||
| 446 | (replaced_with && dbFind(replaced_with, key->ptr) != NULL)) | ||
| 447 | { | ||
| 448 | list *clients = dictGetVal(de); | ||
| 449 | if (!clients) continue; | ||
| 450 | listRewind(clients,&li); | ||
| 451 | while((ln = listNext(&li))) { | ||
| 452 | watchedKey *wk = redis_member2struct(watchedKey, node, ln); | ||
| 453 | if (wk->expired) { | ||
| 454 | if (!replaced_with || !dbFind(replaced_with, key->ptr)) { | ||
| 455 | /* Expired key now deleted. No logical change. Clear the | ||
| 456 | * flag. Deleted keys are not flagged as expired. */ | ||
| 457 | wk->expired = 0; | ||
| 458 | continue; | ||
| 459 | } else if (keyIsExpired(replaced_with, key->ptr, NULL)) { | ||
| 460 | /* Expired key remains expired. */ | ||
| 461 | continue; | ||
| 462 | } | ||
| 463 | } else if (!exists_in_emptied && keyIsExpired(replaced_with, key->ptr, NULL)) { | ||
| 464 | /* Non-existing key is replaced with an expired key. */ | ||
| 465 | wk->expired = 1; | ||
| 466 | continue; | ||
| 467 | } | ||
| 468 | client *c = wk->client; | ||
| 469 | c->flags |= CLIENT_DIRTY_CAS; | ||
| 470 | /* Note - we could potentially call unwatchAllKeys for this specific client in order to reduce | ||
| 471 | * the total number of iterations. BUT this could also free the current next entry pointer | ||
| 472 | * held by the iterator and can lead to use-after-free. */ | ||
| 473 | } | ||
| 474 | } | ||
| 475 | } | ||
| 476 | dictResetIterator(&di); | ||
| 477 | } | ||
| 478 | |||
| 479 | void watchCommand(client *c) { | ||
| 480 | int j; | ||
| 481 | |||
| 482 | if (c->flags & CLIENT_MULTI) { | ||
| 483 | addReplyError(c,"WATCH inside MULTI is not allowed"); | ||
| 484 | return; | ||
| 485 | } | ||
| 486 | /* No point in watching if the client is already dirty. */ | ||
| 487 | if (c->flags & CLIENT_DIRTY_CAS) { | ||
| 488 | addReply(c,shared.ok); | ||
| 489 | return; | ||
| 490 | } | ||
| 491 | for (j = 1; j < c->argc; j++) | ||
| 492 | watchForKey(c,c->argv[j]); | ||
| 493 | addReply(c,shared.ok); | ||
| 494 | } | ||
| 495 | |||
| 496 | void unwatchCommand(client *c) { | ||
| 497 | unwatchAllKeys(c); | ||
| 498 | c->flags &= (~CLIENT_DIRTY_CAS); | ||
| 499 | addReply(c,shared.ok); | ||
| 500 | } | ||
| 501 | |||
| 502 | size_t multiStateMemOverhead(client *c) { | ||
| 503 | size_t mem = c->mstate.argv_len_sums; | ||
| 504 | /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ | ||
| 505 | mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); | ||
| 506 | /* Reserved memory for queued multi commands. */ | ||
| 507 | mem += c->mstate.alloc_count * sizeof(pendingCommand); | ||
| 508 | return mem; | ||
| 509 | } | ||
