diff options
Diffstat (limited to 'examples/redis-unstable/src/blocked.c')
| -rw-r--r-- | examples/redis-unstable/src/blocked.c | 787 |
1 files changed, 787 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/blocked.c b/examples/redis-unstable/src/blocked.c new file mode 100644 index 0000000..4f518c9 --- /dev/null +++ b/examples/redis-unstable/src/blocked.c | |||
| @@ -0,0 +1,787 @@ | |||
| 1 | /* blocked.c - generic support for blocking operations like BLPOP & WAIT. | ||
| 2 | * | ||
| 3 | * Copyright (c) 2009-Present, Redis Ltd. | ||
| 4 | * All rights reserved. | ||
| 5 | * | ||
| 6 | * Copyright (c) 2024-present, Valkey contributors. | ||
| 7 | * All rights reserved. | ||
| 8 | * | ||
| 9 | * Licensed under your choice of (a) the Redis Source Available License 2.0 | ||
| 10 | * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the | ||
| 11 | * GNU Affero General Public License v3 (AGPLv3). | ||
| 12 | * | ||
| 13 | * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. | ||
| 14 | * | ||
| 15 | * --------------------------------------------------------------------------- | ||
| 16 | * | ||
| 17 | * API: | ||
| 18 | * | ||
| 19 | * blockClient() set the CLIENT_BLOCKED flag in the client, and set the | ||
| 20 | * specified block type 'btype' filed to one of BLOCKED_* macros. | ||
| 21 | * | ||
| 22 | * unblockClient() unblocks the client doing the following: | ||
| 23 | * 1) It calls the btype-specific function to cleanup the state. | ||
| 24 | * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag. | ||
| 25 | * 3) It puts the client into a list of just unblocked clients that are | ||
| 26 | * processed ASAP in the beforeSleep() event loop callback, so that | ||
| 27 | * if there is some query buffer to process, we do it. This is also | ||
| 28 | * required because otherwise there is no 'readable' event fired, we | ||
| 29 | * already read the pending commands. We also set the CLIENT_UNBLOCKED | ||
| 30 | * flag to remember the client is in the unblocked_clients list. | ||
| 31 | * | ||
| 32 | * processUnblockedClients() is called inside the beforeSleep() function | ||
| 33 | * to process the query buffer from unblocked clients and remove the clients | ||
| 34 | * from the blocked_clients queue. | ||
| 35 | * | ||
| 36 | * replyToBlockedClientTimedOut() is called by the cron function when | ||
| 37 | * a client blocked reaches the specified timeout (if the timeout is set | ||
| 38 | * to 0, no timeout is processed). | ||
| 39 | * It usually just needs to send a reply to the client. | ||
| 40 | * | ||
| 41 | * When implementing a new type of blocking operation, the implementation | ||
| 42 | * should modify unblockClient() and replyToBlockedClientTimedOut() in order | ||
| 43 | * to handle the btype-specific behavior of this two functions. | ||
| 44 | * If the blocking operation waits for certain keys to change state, the | ||
| 45 | * clusterRedirectBlockedClientIfNeeded() function should also be updated. | ||
| 46 | */ | ||
| 47 | |||
| 48 | #include "server.h" | ||
| 49 | #include "slowlog.h" | ||
| 50 | #include "latency.h" | ||
| 51 | #include "monotonic.h" | ||
| 52 | #include "cluster_slot_stats.h" | ||
| 53 | |||
| 54 | /* forward declarations */ | ||
| 55 | static void unblockClientWaitingData(client *c); | ||
| 56 | static void handleClientsBlockedOnKey(readyList *rl); | ||
| 57 | static void unblockClientOnKey(client *c, robj *key); | ||
| 58 | static void moduleUnblockClientOnKey(client *c, robj *key); | ||
| 59 | static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key); | ||
| 60 | |||
| 61 | void initClientBlockingState(client *c) { | ||
| 62 | c->bstate.btype = BLOCKED_NONE; | ||
| 63 | c->bstate.timeout = 0; | ||
| 64 | c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType); | ||
| 65 | c->bstate.numreplicas = 0; | ||
| 66 | c->bstate.reploffset = 0; | ||
| 67 | c->bstate.unblock_on_nokey = 0; | ||
| 68 | c->bstate.async_rm_call_handle = NULL; | ||
| 69 | } | ||
| 70 | |||
| 71 | /* Block a client for the specific operation type. Once the CLIENT_BLOCKED | ||
| 72 | * flag is set client query buffer is not longer processed, but accumulated, | ||
| 73 | * and will be processed when the client is unblocked. */ | ||
| 74 | void blockClient(client *c, int btype) { | ||
| 75 | /* Master client should never be blocked unless pause or module */ | ||
| 76 | serverAssert(!(c->flags & CLIENT_MASTER && | ||
| 77 | btype != BLOCKED_MODULE && | ||
| 78 | btype != BLOCKED_LAZYFREE && | ||
| 79 | btype != BLOCKED_POSTPONE && | ||
| 80 | btype != BLOCKED_POSTPONE_TRIM)); | ||
| 81 | |||
| 82 | c->flags |= CLIENT_BLOCKED; | ||
| 83 | c->bstate.btype = btype; | ||
| 84 | if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */ | ||
| 85 | server.blocked_clients_by_type[btype]++; | ||
| 86 | addClientToTimeoutTable(c); | ||
| 87 | } | ||
| 88 | |||
| 89 | /* Usually when a client is unblocked due to being blocked while processing some command | ||
| 90 | * he will attempt to reprocess the command which will update the statistics. | ||
| 91 | * However in case the client was timed out or in case of module blocked client is being unblocked | ||
| 92 | * the command will not be reprocessed and we need to make stats update. | ||
| 93 | * This function will make updates to the commandstats, slowlog and monitors.*/ | ||
| 94 | void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ | ||
| 95 | const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; | ||
| 96 | clusterSlotStatsAddCpuDuration(c, total_cmd_duration); | ||
| 97 | c->lastcmd->microseconds += total_cmd_duration; | ||
| 98 | c->lastcmd->calls++; | ||
| 99 | c->commands_processed++; | ||
| 100 | server.stat_numcommands++; | ||
| 101 | if (had_errors) | ||
| 102 | c->lastcmd->failed_calls++; | ||
| 103 | if (server.latency_tracking_enabled) | ||
| 104 | updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000); | ||
| 105 | /* Log the command into the Slow log if needed. */ | ||
| 106 | slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration); | ||
| 107 | c->duration = 0; | ||
| 108 | /* Log the reply duration event. */ | ||
| 109 | latencyAddSampleIfNeeded("command-unblocking",reply_us/1000); | ||
| 110 | } | ||
| 111 | |||
| 112 | /* This function is called in the beforeSleep() function of the event loop | ||
| 113 | * in order to process the pending input buffer of clients that were | ||
| 114 | * unblocked after a blocking operation. */ | ||
| 115 | void processUnblockedClients(void) { | ||
| 116 | listNode *ln; | ||
| 117 | client *c; | ||
| 118 | |||
| 119 | while (listLength(server.unblocked_clients)) { | ||
| 120 | ln = listFirst(server.unblocked_clients); | ||
| 121 | serverAssert(ln != NULL); | ||
| 122 | c = ln->value; | ||
| 123 | listDelNode(server.unblocked_clients,ln); | ||
| 124 | c->flags &= ~CLIENT_UNBLOCKED; | ||
| 125 | |||
| 126 | /* Reset the client for a new query, unless the client has pending command to process. */ | ||
| 127 | if (!(c->flags & CLIENT_PENDING_COMMAND)) { | ||
| 128 | freeClientOriginalArgv(c); | ||
| 129 | /* Clients that are not blocked on keys are not reprocessed so we must | ||
| 130 | * call reqresAppendResponse here (for clients blocked on key, | ||
| 131 | * unblockClientOnKey is called, which eventually calls processCommand, | ||
| 132 | * which calls reqresAppendResponse) */ | ||
| 133 | prepareForNextCommand(c, 0); | ||
| 134 | } | ||
| 135 | |||
| 136 | if (c->flags & CLIENT_MODULE) { | ||
| 137 | if (!(c->flags & CLIENT_BLOCKED)) { | ||
| 138 | moduleCallCommandUnblockedHandler(c); | ||
| 139 | } | ||
| 140 | continue; | ||
| 141 | } | ||
| 142 | |||
| 143 | /* Process remaining data in the input buffer, unless the client | ||
| 144 | * is blocked again. Actually processInputBuffer() checks that the | ||
| 145 | * client is not blocked before to proceed, but things may change and | ||
| 146 | * the code is conceptually more correct this way. */ | ||
| 147 | if (!(c->flags & CLIENT_BLOCKED)) { | ||
| 148 | /* If we have a queued command, execute it now. */ | ||
| 149 | if (processPendingCommandAndInputBuffer(c) == C_ERR) { | ||
| 150 | c = NULL; | ||
| 151 | } | ||
| 152 | } | ||
| 153 | beforeNextClient(c); | ||
| 154 | } | ||
| 155 | } | ||
| 156 | |||
| 157 | /* This function will schedule the client for reprocessing at a safe time. | ||
| 158 | * | ||
| 159 | * This is useful when a client was blocked for some reason (blocking operation, | ||
| 160 | * CLIENT PAUSE, or whatever), because it may end with some accumulated query | ||
| 161 | * buffer that needs to be processed ASAP: | ||
| 162 | * | ||
| 163 | * 1. When a client is blocked, its readable handler is still active. | ||
| 164 | * 2. However in this case it only gets data into the query buffer, but the | ||
| 165 | * query is not parsed or executed once there is enough to proceed as | ||
| 166 | * usually (because the client is blocked... so we can't execute commands). | ||
| 167 | * 3. When the client is unblocked, without this function, the client would | ||
| 168 | * have to write some query in order for the readable handler to finally | ||
| 169 | * call processQueryBuffer*() on it. | ||
| 170 | * 4. With this function instead we can put the client in a queue that will | ||
| 171 | * process it for queries ready to be executed at a safe time. | ||
| 172 | */ | ||
| 173 | void queueClientForReprocessing(client *c) { | ||
| 174 | /* The client may already be into the unblocked list because of a previous | ||
| 175 | * blocking operation, don't add back it into the list multiple times. */ | ||
| 176 | if (!(c->flags & CLIENT_UNBLOCKED)) { | ||
| 177 | c->flags |= CLIENT_UNBLOCKED; | ||
| 178 | listAddNodeTail(server.unblocked_clients,c); | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | /* Unblock a client calling the right function depending on the kind | ||
| 183 | * of operation the client is blocking for. */ | ||
| 184 | void unblockClient(client *c, int queue_for_reprocessing) { | ||
| 185 | if (c->bstate.btype == BLOCKED_LIST || | ||
| 186 | c->bstate.btype == BLOCKED_ZSET || | ||
| 187 | c->bstate.btype == BLOCKED_STREAM) { | ||
| 188 | unblockClientWaitingData(c); | ||
| 189 | } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) { | ||
| 190 | unblockClientWaitingReplicas(c); | ||
| 191 | } else if (c->bstate.btype == BLOCKED_MODULE) { | ||
| 192 | if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); | ||
| 193 | unblockClientFromModule(c); | ||
| 194 | } else if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) { | ||
| 195 | listDelNode(server.postponed_clients,c->postponed_list_node); | ||
| 196 | c->postponed_list_node = NULL; | ||
| 197 | } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { | ||
| 198 | /* No special cleanup. */ | ||
| 199 | } else if (c->bstate.btype == BLOCKED_LAZYFREE) { | ||
| 200 | /* No special cleanup. */ | ||
| 201 | } else { | ||
| 202 | serverPanic("Unknown btype in unblockClient()."); | ||
| 203 | } | ||
| 204 | |||
| 205 | |||
| 206 | /* Clear the flags, and put the client in the unblocked list so that | ||
| 207 | * we'll process new commands in its query buffer ASAP. */ | ||
| 208 | if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */ | ||
| 209 | server.blocked_clients_by_type[c->bstate.btype]--; | ||
| 210 | c->flags &= ~CLIENT_BLOCKED; | ||
| 211 | c->bstate.btype = BLOCKED_NONE; | ||
| 212 | c->bstate.unblock_on_nokey = 0; | ||
| 213 | removeClientFromTimeoutTable(c); | ||
| 214 | if (queue_for_reprocessing) queueClientForReprocessing(c); | ||
| 215 | } | ||
| 216 | |||
| 217 | /* Check if the specified client can be safely timed out using | ||
| 218 | * unblockClientOnTimeout(). */ | ||
| 219 | int blockedClientMayTimeout(client *c) { | ||
| 220 | if (c->bstate.btype == BLOCKED_MODULE) { | ||
| 221 | return moduleBlockedClientMayTimeout(c); | ||
| 222 | } | ||
| 223 | |||
| 224 | if (c->bstate.btype == BLOCKED_LIST || | ||
| 225 | c->bstate.btype == BLOCKED_ZSET || | ||
| 226 | c->bstate.btype == BLOCKED_STREAM || | ||
| 227 | c->bstate.btype == BLOCKED_WAIT || | ||
| 228 | c->bstate.btype == BLOCKED_WAITAOF) | ||
| 229 | { | ||
| 230 | return 1; | ||
| 231 | } | ||
| 232 | return 0; | ||
| 233 | } | ||
| 234 | |||
| 235 | /* This function gets called when a blocked client timed out in order to | ||
| 236 | * send it a reply of some kind. After this function is called, | ||
| 237 | * unblockClient() will be called with the same client as argument. */ | ||
| 238 | void replyToBlockedClientTimedOut(client *c) { | ||
| 239 | if (c->bstate.btype == BLOCKED_LAZYFREE) { | ||
| 240 | addReply(c, shared.ok); /* No reason lazy-free to fail */ | ||
| 241 | } else if (c->bstate.btype == BLOCKED_LIST || | ||
| 242 | c->bstate.btype == BLOCKED_ZSET || | ||
| 243 | c->bstate.btype == BLOCKED_STREAM) { | ||
| 244 | addReplyNullArray(c); | ||
| 245 | updateStatsOnUnblock(c, 0, 0, 0); | ||
| 246 | } else if (c->bstate.btype == BLOCKED_WAIT) { | ||
| 247 | addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset)); | ||
| 248 | } else if (c->bstate.btype == BLOCKED_WAITAOF) { | ||
| 249 | addReplyArrayLen(c,2); | ||
| 250 | addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset); | ||
| 251 | addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); | ||
| 252 | } else if (c->bstate.btype == BLOCKED_MODULE) { | ||
| 253 | moduleBlockedClientTimedOut(c); | ||
| 254 | } else { | ||
| 255 | serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); | ||
| 256 | } | ||
| 257 | } | ||
| 258 | |||
| 259 | /* If one or more clients are blocked on the SHUTDOWN command, this function | ||
| 260 | * sends them an error reply and unblocks them. */ | ||
| 261 | void replyToClientsBlockedOnShutdown(void) { | ||
| 262 | if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return; | ||
| 263 | listNode *ln; | ||
| 264 | listIter li; | ||
| 265 | listRewind(server.clients, &li); | ||
| 266 | while((ln = listNext(&li))) { | ||
| 267 | client *c = listNodeValue(ln); | ||
| 268 | if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) { | ||
| 269 | c->duration = 0; | ||
| 270 | addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); | ||
| 271 | unblockClient(c, 1); | ||
| 272 | } | ||
| 273 | } | ||
| 274 | } | ||
| 275 | |||
| 276 | /* Mass-unblock clients because something changed in the instance that makes | ||
| 277 | * blocking no longer safe. For example clients blocked in list operations | ||
| 278 | * in an instance which turns from master to slave is unsafe, so this function | ||
| 279 | * is called when a master turns into a slave. | ||
| 280 | * | ||
| 281 | * The semantics is to send an -UNBLOCKED error to the client, disconnecting | ||
| 282 | * it at the same time. */ | ||
| 283 | void disconnectAllBlockedClients(void) { | ||
| 284 | listNode *ln; | ||
| 285 | listIter li; | ||
| 286 | |||
| 287 | listRewind(server.clients,&li); | ||
| 288 | while((ln = listNext(&li))) { | ||
| 289 | client *c = listNodeValue(ln); | ||
| 290 | |||
| 291 | if (c->flags & CLIENT_BLOCKED) { | ||
| 292 | /* POSTPONEd clients are an exception, when they'll be unblocked, the | ||
| 293 | * command processing will start from scratch, and the command will | ||
| 294 | * be either executed or rejected. (unlike LIST blocked clients for | ||
| 295 | * which the command is already in progress in a way. */ | ||
| 296 | if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) | ||
| 297 | continue; | ||
| 298 | |||
| 299 | if (c->bstate.btype == BLOCKED_LAZYFREE) { | ||
| 300 | addReply(c, shared.ok); /* No reason lazy-free to fail */ | ||
| 301 | updateStatsOnUnblock(c, 0, 0, 0); | ||
| 302 | c->flags &= ~CLIENT_PENDING_COMMAND; | ||
| 303 | unblockClient(c, 1); | ||
| 304 | } else { | ||
| 305 | |||
| 306 | unblockClientOnError(c, | ||
| 307 | "-UNBLOCKED force unblock from blocking operation, " | ||
| 308 | "instance state changed (master -> replica?)"); | ||
| 309 | } | ||
| 310 | c->flags |= CLIENT_CLOSE_AFTER_REPLY; | ||
| 311 | } | ||
| 312 | } | ||
| 313 | } | ||
| 314 | |||
| 315 | /* This function should be called by Redis every time a single command, | ||
| 316 | * a MULTI/EXEC block, or a Lua script, terminated its execution after | ||
| 317 | * being called by a client. It handles serving clients blocked in all scenarios | ||
| 318 | * where a specific key access requires to block until that key is available. | ||
| 319 | * | ||
| 320 | * All the keys with at least one client blocked that are signaled as ready | ||
| 321 | * are accumulated into the server.ready_keys list. This function will run | ||
| 322 | * the list and will serve clients accordingly. | ||
| 323 | * Note that the function will iterate again and again (for example as a result of serving BLMOVE | ||
| 324 | * we can have new blocking clients to serve because of the PUSH side of BLMOVE.) | ||
| 325 | * | ||
| 326 | * This function is normally "fair", that is, it will serve clients | ||
| 327 | * using a FIFO behavior. However this fairness is violated in certain | ||
| 328 | * edge cases, that is, when we have clients blocked at the same time | ||
| 329 | * in a sorted set and in a list, for the same key (a very odd thing to | ||
| 330 | * do client side, indeed!). Because mismatching clients (blocking for | ||
| 331 | * a different type compared to the current key type) are moved in the | ||
| 332 | * other side of the linked list. However as long as the key starts to | ||
| 333 | * be used only for a single type, like virtually any Redis application will | ||
| 334 | * do, the function is already fair. */ | ||
| 335 | void handleClientsBlockedOnKeys(void) { | ||
| 336 | |||
| 337 | /* In case we are already in the process of unblocking clients we should | ||
| 338 | * not make a recursive call, in order to prevent breaking fairness. */ | ||
| 339 | static int in_handling_blocked_clients = 0; | ||
| 340 | if (in_handling_blocked_clients) | ||
| 341 | return; | ||
| 342 | in_handling_blocked_clients = 1; | ||
| 343 | |||
| 344 | /* This function is called only when also_propagate is in its basic state | ||
| 345 | * (i.e. not from call(), module context, etc.) */ | ||
| 346 | serverAssert(server.also_propagate.numops == 0); | ||
| 347 | |||
| 348 | /* If a command being unblocked causes another command to get unblocked, | ||
| 349 | * like a BLMOVE would do, then the new unblocked command will get processed | ||
| 350 | * right away rather than wait for later. */ | ||
| 351 | while(listLength(server.ready_keys) != 0) { | ||
| 352 | list *l; | ||
| 353 | |||
| 354 | /* Point server.ready_keys to a fresh list and save the current one | ||
| 355 | * locally. This way as we run the old list we are free to call | ||
| 356 | * signalKeyAsReady() that may push new elements in server.ready_keys | ||
| 357 | * when handling clients blocked into BLMOVE. */ | ||
| 358 | l = server.ready_keys; | ||
| 359 | server.ready_keys = listCreate(); | ||
| 360 | |||
| 361 | while(listLength(l) != 0) { | ||
| 362 | listNode *ln = listFirst(l); | ||
| 363 | readyList *rl = ln->value; | ||
| 364 | |||
| 365 | /* First of all remove this key from db->ready_keys so that | ||
| 366 | * we can safely call signalKeyAsReady() against this key. */ | ||
| 367 | dictDelete(rl->db->ready_keys,rl->key); | ||
| 368 | |||
| 369 | handleClientsBlockedOnKey(rl); | ||
| 370 | |||
| 371 | /* Free this item. */ | ||
| 372 | decrRefCount(rl->key); | ||
| 373 | zfree(rl); | ||
| 374 | listDelNode(l,ln); | ||
| 375 | } | ||
| 376 | listRelease(l); /* We have the new list on place at this point. */ | ||
| 377 | } | ||
| 378 | in_handling_blocked_clients = 0; | ||
| 379 | } | ||
| 380 | |||
| 381 | /* Set a client in blocking mode for the specified key, with the specified timeout. | ||
| 382 | * The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are | ||
| 383 | * waiting for an empty key in order to awake the client. The client is blocked | ||
| 384 | * for all the 'numkeys' keys as in the 'keys' argument. | ||
| 385 | * The client will unblocked as soon as one of the keys in 'keys' value was updated. | ||
| 386 | * the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key | ||
| 387 | * is updated to become unavailable, either by type change (override), deletion or swapdb */ | ||
| 388 | void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) { | ||
| 389 | dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry; | ||
| 390 | list *l; | ||
| 391 | int j; | ||
| 392 | |||
| 393 | if (!(c->flags & CLIENT_REEXECUTING_COMMAND)) { | ||
| 394 | /* If the client is re-processing the command, we do not set the timeout | ||
| 395 | * because we need to retain the client's original timeout. */ | ||
| 396 | c->bstate.timeout = timeout; | ||
| 397 | } | ||
| 398 | |||
| 399 | for (j = 0; j < numkeys; j++) { | ||
| 400 | /* If the key already exists in the dictionary ignore it. */ | ||
| 401 | if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) { | ||
| 402 | continue; | ||
| 403 | } | ||
| 404 | incrRefCount(keys[j]); | ||
| 405 | |||
| 406 | /* And in the other "side", to map keys -> clients */ | ||
| 407 | db_blocked_entry = dictAddRaw(c->db->blocking_keys,keys[j], &db_blocked_existing_entry); | ||
| 408 | |||
| 409 | /* In case key[j] did not have blocking clients yet, we need to create a new list */ | ||
| 410 | if (db_blocked_entry != NULL) { | ||
| 411 | l = listCreate(); | ||
| 412 | dictSetVal(c->db->blocking_keys, db_blocked_entry, l); | ||
| 413 | incrRefCount(keys[j]); | ||
| 414 | } else { | ||
| 415 | l = dictGetVal(db_blocked_existing_entry); | ||
| 416 | } | ||
| 417 | listAddNodeTail(l,c); | ||
| 418 | dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l)); | ||
| 419 | |||
| 420 | /* We need to add the key to blocking_keys_unblock_on_nokey, if the client | ||
| 421 | * wants to be awakened if key is deleted (like XREADGROUP) */ | ||
| 422 | if (unblock_on_nokey) { | ||
| 423 | db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry); | ||
| 424 | if (db_blocked_entry) { | ||
| 425 | incrRefCount(keys[j]); | ||
| 426 | dictSetUnsignedIntegerVal(db_blocked_entry, 1); | ||
| 427 | } else { | ||
| 428 | dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1); | ||
| 429 | } | ||
| 430 | } | ||
| 431 | } | ||
| 432 | c->bstate.unblock_on_nokey = unblock_on_nokey; | ||
| 433 | /* Currently we assume key blocking will require reprocessing the command. | ||
| 434 | * However in case of modules, they have a different way to handle the reprocessing | ||
| 435 | * which does not require setting the pending command flag */ | ||
| 436 | if (btype != BLOCKED_MODULE) | ||
| 437 | c->flags |= CLIENT_PENDING_COMMAND; | ||
| 438 | blockClient(c,btype); | ||
| 439 | } | ||
| 440 | |||
| 441 | /* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP. | ||
| 442 | * Internal function for unblockClient() */ | ||
| 443 | static void unblockClientWaitingData(client *c) { | ||
| 444 | dictEntry *de; | ||
| 445 | dictIterator di; | ||
| 446 | |||
| 447 | if (dictSize(c->bstate.keys) == 0) | ||
| 448 | return; | ||
| 449 | |||
| 450 | dictInitIterator(&di, c->bstate.keys); | ||
| 451 | /* The client may wait for multiple keys, so unblock it for every key. */ | ||
| 452 | while((de = dictNext(&di)) != NULL) { | ||
| 453 | releaseBlockedEntry(c, de, 0); | ||
| 454 | } | ||
| 455 | dictResetIterator(&di); | ||
| 456 | dictEmpty(c->bstate.keys, NULL); | ||
| 457 | } | ||
| 458 | |||
| 459 | static blocking_type getBlockedTypeByType(int type) { | ||
| 460 | switch (type) { | ||
| 461 | case OBJ_LIST: return BLOCKED_LIST; | ||
| 462 | case OBJ_ZSET: return BLOCKED_ZSET; | ||
| 463 | case OBJ_MODULE: return BLOCKED_MODULE; | ||
| 464 | case OBJ_STREAM: return BLOCKED_STREAM; | ||
| 465 | default: return BLOCKED_NONE; | ||
| 466 | } | ||
| 467 | } | ||
| 468 | |||
| 469 | /* If the specified key has clients blocked waiting for list pushes, this | ||
| 470 | * function will put the key reference into the server.ready_keys list. | ||
| 471 | * Note that db->ready_keys is a hash table that allows us to avoid putting | ||
| 472 | * the same key again and again in the list in case of multiple pushes | ||
| 473 | * made by a script or in the context of MULTI/EXEC. | ||
| 474 | * | ||
| 475 | * The list will be finally processed by handleClientsBlockedOnKeys() */ | ||
| 476 | static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) { | ||
| 477 | readyList *rl; | ||
| 478 | |||
| 479 | /* Quick returns. */ | ||
| 480 | int btype = getBlockedTypeByType(type); | ||
| 481 | if (btype == BLOCKED_NONE) { | ||
| 482 | /* The type can never block. */ | ||
| 483 | return; | ||
| 484 | } | ||
| 485 | if (!server.blocked_clients_by_type[btype] && | ||
| 486 | !server.blocked_clients_by_type[BLOCKED_MODULE]) { | ||
| 487 | /* No clients block on this type. Note: Blocked modules are represented | ||
| 488 | * by BLOCKED_MODULE, even if the intention is to wake up by normal | ||
| 489 | * types (list, zset, stream), so we need to check that there are no | ||
| 490 | * blocked modules before we do a quick return here. */ | ||
| 491 | return; | ||
| 492 | } | ||
| 493 | |||
| 494 | if (deleted) { | ||
| 495 | /* Key deleted and no clients blocking for this key? No need to queue it. */ | ||
| 496 | if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL) | ||
| 497 | return; | ||
| 498 | /* Note: if we made it here it means the key is also present in db->blocking_keys */ | ||
| 499 | } else { | ||
| 500 | /* No clients blocking for this key? No need to queue it. */ | ||
| 501 | if (dictFind(db->blocking_keys,key) == NULL) | ||
| 502 | return; | ||
| 503 | } | ||
| 504 | |||
| 505 | dictEntry *de, *existing; | ||
| 506 | de = dictAddRaw(db->ready_keys, key, &existing); | ||
| 507 | if (de) { | ||
| 508 | /* We add the key in the db->ready_keys dictionary in order | ||
| 509 | * to avoid adding it multiple times into a list with a simple O(1) | ||
| 510 | * check. */ | ||
| 511 | incrRefCount(key); | ||
| 512 | } else { | ||
| 513 | /* Key was already signaled? No need to queue it again. */ | ||
| 514 | return; | ||
| 515 | } | ||
| 516 | |||
| 517 | /* Ok, we need to queue this key into server.ready_keys. */ | ||
| 518 | rl = zmalloc(sizeof(*rl)); | ||
| 519 | rl->key = key; | ||
| 520 | rl->db = db; | ||
| 521 | incrRefCount(key); | ||
| 522 | listAddNodeTail(server.ready_keys,rl); | ||
| 523 | } | ||
| 524 | |||
| 525 | /* Helper function to wrap the logic of removing a client blocked key entry | ||
| 526 | * In this case we would like to do the following: | ||
| 527 | * 1. unlink the client from the global DB locked client list | ||
| 528 | * 2. remove the entry from the global db blocking list in case the list is empty | ||
| 529 | * 3. in case the global list is empty, also remove the key from the global dict of keys | ||
| 530 | * which should trigger unblock on key deletion | ||
| 531 | * 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys, | ||
| 532 | * but unblocked when only one of them is triggered, we would like to avoid deleting each key separately | ||
| 533 | * and instead clear the dictionary in one-shot. this is why the remove_key argument is provided | ||
| 534 | * to support this logic in unblockClientWaitingData | ||
| 535 | */ | ||
| 536 | static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) { | ||
| 537 | list *l; | ||
| 538 | listNode *pos; | ||
| 539 | void *key; | ||
| 540 | dictEntry *unblock_on_nokey_entry; | ||
| 541 | |||
| 542 | key = dictGetKey(de); | ||
| 543 | pos = dictGetVal(de); | ||
| 544 | /* Remove this client from the list of clients waiting for this key. */ | ||
| 545 | l = dictFetchValue(c->db->blocking_keys, key); | ||
| 546 | serverAssertWithInfo(c,key,l != NULL); | ||
| 547 | listUnlinkNode(l,pos); | ||
| 548 | /* If the list is empty we need to remove it to avoid wasting memory | ||
| 549 | * We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict. | ||
| 550 | * However, in case the list is not empty, we will have to still perform reference accounting | ||
| 551 | * on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference. | ||
| 552 | * Why? because it is possible that some more clients are blocked on the same key but without | ||
| 553 | * require to be triggered on key deletion, we do not want these to be later triggered by the | ||
| 554 | * signalDeletedKeyAsReady. */ | ||
| 555 | if (listLength(l) == 0) { | ||
| 556 | dictDelete(c->db->blocking_keys, key); | ||
| 557 | dictDelete(c->db->blocking_keys_unblock_on_nokey,key); | ||
| 558 | } else if (c->bstate.unblock_on_nokey) { | ||
| 559 | unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key); | ||
| 560 | /* it is not possible to have a client blocked on nokey with no matching entry */ | ||
| 561 | serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL); | ||
| 562 | if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) { | ||
| 563 | /* in case the count is zero, we can delete the entry */ | ||
| 564 | dictDelete(c->db->blocking_keys_unblock_on_nokey,key); | ||
| 565 | } | ||
| 566 | } | ||
| 567 | if (remove_key) | ||
| 568 | dictDelete(c->bstate.keys, key); | ||
| 569 | } | ||
| 570 | |||
| 571 | void signalKeyAsReady(redisDb *db, robj *key, int type) { | ||
| 572 | signalKeyAsReadyLogic(db, key, type, 0); | ||
| 573 | } | ||
| 574 | |||
| 575 | void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) { | ||
| 576 | signalKeyAsReadyLogic(db, key, type, 1); | ||
| 577 | } | ||
| 578 | |||
| 579 | /* Helper function for handleClientsBlockedOnKeys(). This function is called | ||
| 580 | * whenever a key is ready. we iterate over all the clients blocked on this key | ||
| 581 | * and try to re-execute the command (in case the key is still available). */ | ||
| 582 | static void handleClientsBlockedOnKey(readyList *rl) { | ||
| 583 | |||
| 584 | /* We serve clients in the same order they blocked for | ||
| 585 | * this key, from the first blocked to the last. */ | ||
| 586 | dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); | ||
| 587 | |||
| 588 | if (de) { | ||
| 589 | list *clients = dictGetVal(de); | ||
| 590 | listNode *ln; | ||
| 591 | listIter li; | ||
| 592 | listRewind(clients,&li); | ||
| 593 | |||
| 594 | /* Avoid processing more than the initial count so that we're not stuck | ||
| 595 | * in an endless loop in case the reprocessing of the command blocks again. */ | ||
| 596 | long count = listLength(clients); | ||
| 597 | while ((ln = listNext(&li)) && count--) { | ||
| 598 | client *receiver = listNodeValue(ln); | ||
| 599 | kvobj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS); | ||
| 600 | /* 1. In case new key was added/touched we need to verify it satisfy the | ||
| 601 | * blocked type, since we might process the wrong key type. | ||
| 602 | * 2. We want to serve clients blocked on module keys | ||
| 603 | * regardless of the object type: we don't know what the | ||
| 604 | * module is trying to accomplish right now. | ||
| 605 | * 3. In case of XREADGROUP call we will want to unblock on any change in object type | ||
| 606 | * or in case the key was deleted, since the group is no longer valid. */ | ||
| 607 | if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) || | ||
| 608 | (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || | ||
| 609 | (receiver->bstate.unblock_on_nokey)) | ||
| 610 | { | ||
| 611 | if (receiver->bstate.btype != BLOCKED_MODULE) | ||
| 612 | unblockClientOnKey(receiver, rl->key); | ||
| 613 | else | ||
| 614 | moduleUnblockClientOnKey(receiver, rl->key); | ||
| 615 | } | ||
| 616 | } | ||
| 617 | } | ||
| 618 | } | ||
| 619 | |||
| 620 | /* block a client due to wait command */ | ||
| 621 | void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { | ||
| 622 | c->bstate.timeout = timeout; | ||
| 623 | c->bstate.reploffset = offset; | ||
| 624 | c->bstate.numreplicas = numreplicas; | ||
| 625 | listAddNodeHead(server.clients_waiting_acks,c); | ||
| 626 | blockClient(c,BLOCKED_WAIT); | ||
| 627 | } | ||
| 628 | |||
| 629 | /* block a client due to waitaof command */ | ||
| 630 | void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { | ||
| 631 | c->bstate.timeout = timeout; | ||
| 632 | c->bstate.reploffset = offset; | ||
| 633 | c->bstate.numreplicas = numreplicas; | ||
| 634 | c->bstate.numlocal = numlocal; | ||
| 635 | listAddNodeHead(server.clients_waiting_acks,c); | ||
| 636 | blockClient(c,BLOCKED_WAITAOF); | ||
| 637 | } | ||
| 638 | |||
| 639 | /* Postpone client from executing a command. For example the server might be busy | ||
| 640 | * requesting to avoid processing clients commands which will be processed later | ||
| 641 | * when the it is ready to accept them. */ | ||
| 642 | void blockPostponeClientWithType(client *c, int btype) { | ||
| 643 | serverAssert(btype == BLOCKED_POSTPONE || btype == BLOCKED_POSTPONE_TRIM); | ||
| 644 | c->bstate.timeout = 0; | ||
| 645 | blockClient(c, btype); | ||
| 646 | listAddNodeTail(server.postponed_clients, c); | ||
| 647 | c->postponed_list_node = listLast(server.postponed_clients); | ||
| 648 | /* Mark this client to execute its command */ | ||
| 649 | c->flags |= CLIENT_PENDING_COMMAND; | ||
| 650 | } | ||
| 651 | |||
| 652 | /* Postpone client from executing a command. */ | ||
| 653 | void blockPostponeClient(client *c) { | ||
| 654 | blockPostponeClientWithType(c, BLOCKED_POSTPONE); | ||
| 655 | } | ||
| 656 | |||
| 657 | /* Block client due to shutdown command */ | ||
| 658 | void blockClientShutdown(client *c) { | ||
| 659 | blockClient(c, BLOCKED_SHUTDOWN); | ||
| 660 | } | ||
| 661 | |||
| 662 | /* Unblock a client once a specific key became available for it. | ||
| 663 | * This function will remove the client from the list of clients blocked on this key | ||
| 664 | * and also remove the key from the dictionary of keys this client is blocked on. | ||
| 665 | * in case the client has a command pending it will process it immediately. */ | ||
| 666 | static void unblockClientOnKey(client *c, robj *key) { | ||
| 667 | dictEntry *de; | ||
| 668 | |||
| 669 | de = dictFind(c->bstate.keys, key); | ||
| 670 | releaseBlockedEntry(c, de, 1); | ||
| 671 | |||
| 672 | /* Only in case of blocking API calls, we might be blocked on several keys. | ||
| 673 | however we should force unblock the entire blocking keys */ | ||
| 674 | serverAssert(c->bstate.btype == BLOCKED_STREAM || | ||
| 675 | c->bstate.btype == BLOCKED_LIST || | ||
| 676 | c->bstate.btype == BLOCKED_ZSET); | ||
| 677 | |||
| 678 | /* We need to unblock the client before calling processCommandAndResetClient | ||
| 679 | * because it checks the CLIENT_BLOCKED flag */ | ||
| 680 | unblockClient(c, 0); | ||
| 681 | /* In case this client was blocked on keys during command | ||
| 682 | * we need to re process the command again */ | ||
| 683 | if (c->flags & CLIENT_PENDING_COMMAND) { | ||
| 684 | c->flags &= ~CLIENT_PENDING_COMMAND; | ||
| 685 | c->flags |= CLIENT_REEXECUTING_COMMAND; | ||
| 686 | /* We want the command processing and the unblock handler (see RM_Call 'K' option) | ||
| 687 | * to run atomically, this is why we must enter the execution unit here before | ||
| 688 | * running the command, and exit the execution unit after calling the unblock handler (if exists). | ||
| 689 | * Notice that we also must set the current client so it will be available | ||
| 690 | * when we will try to send the client side caching notification (done on 'afterCommand'). */ | ||
| 691 | client *old_client = server.current_client; | ||
| 692 | server.current_client = c; | ||
| 693 | enterExecutionUnit(1, 0); | ||
| 694 | processCommandAndResetClient(c); | ||
| 695 | if (!(c->flags & CLIENT_BLOCKED)) { | ||
| 696 | if (c->flags & CLIENT_MODULE) { | ||
| 697 | moduleCallCommandUnblockedHandler(c); | ||
| 698 | } else { | ||
| 699 | queueClientForReprocessing(c); | ||
| 700 | } | ||
| 701 | } | ||
| 702 | exitExecutionUnit(); | ||
| 703 | afterCommand(c); | ||
| 704 | /* Clear the CLIENT_REEXECUTING_COMMAND flag after the proc is executed. */ | ||
| 705 | c->flags &= ~CLIENT_REEXECUTING_COMMAND; | ||
| 706 | server.current_client = old_client; | ||
| 707 | } | ||
| 708 | } | ||
| 709 | |||
| 710 | /* Unblock a client blocked on the specific key from module context. | ||
| 711 | * This function will try to serve the module call, and in case it succeeds, | ||
| 712 | * it will add the client to the list of module unblocked clients which will | ||
| 713 | * be processed in moduleHandleBlockedClients. */ | ||
| 714 | static void moduleUnblockClientOnKey(client *c, robj *key) { | ||
| 715 | long long prev_error_replies = server.stat_total_error_replies; | ||
| 716 | client *old_client = server.current_client; | ||
| 717 | server.current_client = c; | ||
| 718 | monotime replyTimer; | ||
| 719 | elapsedStart(&replyTimer); | ||
| 720 | |||
| 721 | if (moduleTryServeClientBlockedOnKey(c, key)) { | ||
| 722 | updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); | ||
| 723 | moduleUnblockClient(c); | ||
| 724 | } | ||
| 725 | /* We need to call afterCommand even if the client was not unblocked | ||
| 726 | * in order to propagate any changes that could have been done inside | ||
| 727 | * moduleTryServeClientBlockedOnKey */ | ||
| 728 | afterCommand(c); | ||
| 729 | server.current_client = old_client; | ||
| 730 | } | ||
| 731 | |||
| 732 | /* Unblock a client which is currently Blocked on and provided a timeout. | ||
| 733 | * The implementation will first reply to the blocked client with null response | ||
| 734 | * or, in case of module blocked client the timeout callback will be used. | ||
| 735 | * In this case since we might have a command pending | ||
| 736 | * we want to remove the pending flag to indicate we already responded to the | ||
| 737 | * command with timeout reply. */ | ||
| 738 | void unblockClientOnTimeout(client *c) { | ||
| 739 | /* The client has been unlocked (in the moduleUnblocked list), return ASAP. */ | ||
| 740 | if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return; | ||
| 741 | |||
| 742 | replyToBlockedClientTimedOut(c); | ||
| 743 | if (c->flags & CLIENT_PENDING_COMMAND) | ||
| 744 | c->flags &= ~CLIENT_PENDING_COMMAND; | ||
| 745 | unblockClient(c, 1); | ||
| 746 | } | ||
| 747 | |||
| 748 | /* Unblock a client which is currently Blocked with error. | ||
| 749 | * If err_str is provided it will be used to reply to the blocked client */ | ||
| 750 | void unblockClientOnError(client *c, const char *err_str) { | ||
| 751 | if (err_str) | ||
| 752 | addReplyError(c, err_str); | ||
| 753 | updateStatsOnUnblock(c, 0, 0, 1); | ||
| 754 | if (c->flags & CLIENT_PENDING_COMMAND) | ||
| 755 | c->flags &= ~CLIENT_PENDING_COMMAND; | ||
| 756 | unblockClient(c, 1); | ||
| 757 | } | ||
| 758 | |||
| 759 | void blockedBeforeSleep(void) { | ||
| 760 | /* Handle precise timeouts of blocked clients. */ | ||
| 761 | handleBlockedClientsTimeout(); | ||
| 762 | |||
| 763 | /* Handle for expired pending entries. */ | ||
| 764 | handleClaimableStreamEntries(); | ||
| 765 | |||
| 766 | /* Unblock all the clients blocked for synchronous replication | ||
| 767 | * in WAIT or WAITAOF. */ | ||
| 768 | if (listLength(server.clients_waiting_acks)) | ||
| 769 | processClientsWaitingReplicas(); | ||
| 770 | |||
| 771 | /* Try to process blocked clients every once in while. | ||
| 772 | * | ||
| 773 | * Example: A module calls RM_SignalKeyAsReady from within a timer callback | ||
| 774 | * (So we don't visit processCommand() at all). | ||
| 775 | * | ||
| 776 | * This may unblock clients, so must be done before processUnblockedClients */ | ||
| 777 | handleClientsBlockedOnKeys(); | ||
| 778 | |||
| 779 | /* Check if there are clients unblocked by modules that implement | ||
| 780 | * blocking commands. */ | ||
| 781 | if (moduleCount()) | ||
| 782 | moduleHandleBlockedClients(); | ||
| 783 | |||
| 784 | /* Try to process pending commands for clients that were just unblocked. */ | ||
| 785 | if (listLength(server.unblocked_clients)) | ||
| 786 | processUnblockedClients(); | ||
| 787 | } | ||
