aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/blocked.c')
-rw-r--r--examples/redis-unstable/src/blocked.c787
1 files changed, 787 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/blocked.c b/examples/redis-unstable/src/blocked.c
new file mode 100644
index 0000000..4f518c9
--- /dev/null
+++ b/examples/redis-unstable/src/blocked.c
@@ -0,0 +1,787 @@
1/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
2 *
3 * Copyright (c) 2009-Present, Redis Ltd.
4 * All rights reserved.
5 *
6 * Copyright (c) 2024-present, Valkey contributors.
7 * All rights reserved.
8 *
9 * Licensed under your choice of (a) the Redis Source Available License 2.0
10 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
11 * GNU Affero General Public License v3 (AGPLv3).
12 *
13 * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
14 *
15 * ---------------------------------------------------------------------------
16 *
17 * API:
18 *
19 * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
20 * specified block type 'btype' filed to one of BLOCKED_* macros.
21 *
22 * unblockClient() unblocks the client doing the following:
23 * 1) It calls the btype-specific function to cleanup the state.
24 * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
25 * 3) It puts the client into a list of just unblocked clients that are
26 * processed ASAP in the beforeSleep() event loop callback, so that
27 * if there is some query buffer to process, we do it. This is also
28 * required because otherwise there is no 'readable' event fired, we
29 * already read the pending commands. We also set the CLIENT_UNBLOCKED
30 * flag to remember the client is in the unblocked_clients list.
31 *
32 * processUnblockedClients() is called inside the beforeSleep() function
33 * to process the query buffer from unblocked clients and remove the clients
34 * from the blocked_clients queue.
35 *
36 * replyToBlockedClientTimedOut() is called by the cron function when
37 * a client blocked reaches the specified timeout (if the timeout is set
38 * to 0, no timeout is processed).
39 * It usually just needs to send a reply to the client.
40 *
41 * When implementing a new type of blocking operation, the implementation
42 * should modify unblockClient() and replyToBlockedClientTimedOut() in order
43 * to handle the btype-specific behavior of this two functions.
44 * If the blocking operation waits for certain keys to change state, the
45 * clusterRedirectBlockedClientIfNeeded() function should also be updated.
46 */
47
48#include "server.h"
49#include "slowlog.h"
50#include "latency.h"
51#include "monotonic.h"
52#include "cluster_slot_stats.h"
53
54/* forward declarations */
55static void unblockClientWaitingData(client *c);
56static void handleClientsBlockedOnKey(readyList *rl);
57static void unblockClientOnKey(client *c, robj *key);
58static void moduleUnblockClientOnKey(client *c, robj *key);
59static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
60
61void initClientBlockingState(client *c) {
62 c->bstate.btype = BLOCKED_NONE;
63 c->bstate.timeout = 0;
64 c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
65 c->bstate.numreplicas = 0;
66 c->bstate.reploffset = 0;
67 c->bstate.unblock_on_nokey = 0;
68 c->bstate.async_rm_call_handle = NULL;
69}
70
71/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
72 * flag is set client query buffer is not longer processed, but accumulated,
73 * and will be processed when the client is unblocked. */
74void blockClient(client *c, int btype) {
75 /* Master client should never be blocked unless pause or module */
76 serverAssert(!(c->flags & CLIENT_MASTER &&
77 btype != BLOCKED_MODULE &&
78 btype != BLOCKED_LAZYFREE &&
79 btype != BLOCKED_POSTPONE &&
80 btype != BLOCKED_POSTPONE_TRIM));
81
82 c->flags |= CLIENT_BLOCKED;
83 c->bstate.btype = btype;
84 if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
85 server.blocked_clients_by_type[btype]++;
86 addClientToTimeoutTable(c);
87}
88
89/* Usually when a client is unblocked due to being blocked while processing some command
90 * he will attempt to reprocess the command which will update the statistics.
91 * However in case the client was timed out or in case of module blocked client is being unblocked
92 * the command will not be reprocessed and we need to make stats update.
93 * This function will make updates to the commandstats, slowlog and monitors.*/
94void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){
95 const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
96 clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
97 c->lastcmd->microseconds += total_cmd_duration;
98 c->lastcmd->calls++;
99 c->commands_processed++;
100 server.stat_numcommands++;
101 if (had_errors)
102 c->lastcmd->failed_calls++;
103 if (server.latency_tracking_enabled)
104 updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000);
105 /* Log the command into the Slow log if needed. */
106 slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
107 c->duration = 0;
108 /* Log the reply duration event. */
109 latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
110}
111
112/* This function is called in the beforeSleep() function of the event loop
113 * in order to process the pending input buffer of clients that were
114 * unblocked after a blocking operation. */
115void processUnblockedClients(void) {
116 listNode *ln;
117 client *c;
118
119 while (listLength(server.unblocked_clients)) {
120 ln = listFirst(server.unblocked_clients);
121 serverAssert(ln != NULL);
122 c = ln->value;
123 listDelNode(server.unblocked_clients,ln);
124 c->flags &= ~CLIENT_UNBLOCKED;
125
126 /* Reset the client for a new query, unless the client has pending command to process. */
127 if (!(c->flags & CLIENT_PENDING_COMMAND)) {
128 freeClientOriginalArgv(c);
129 /* Clients that are not blocked on keys are not reprocessed so we must
130 * call reqresAppendResponse here (for clients blocked on key,
131 * unblockClientOnKey is called, which eventually calls processCommand,
132 * which calls reqresAppendResponse) */
133 prepareForNextCommand(c, 0);
134 }
135
136 if (c->flags & CLIENT_MODULE) {
137 if (!(c->flags & CLIENT_BLOCKED)) {
138 moduleCallCommandUnblockedHandler(c);
139 }
140 continue;
141 }
142
143 /* Process remaining data in the input buffer, unless the client
144 * is blocked again. Actually processInputBuffer() checks that the
145 * client is not blocked before to proceed, but things may change and
146 * the code is conceptually more correct this way. */
147 if (!(c->flags & CLIENT_BLOCKED)) {
148 /* If we have a queued command, execute it now. */
149 if (processPendingCommandAndInputBuffer(c) == C_ERR) {
150 c = NULL;
151 }
152 }
153 beforeNextClient(c);
154 }
155}
156
157/* This function will schedule the client for reprocessing at a safe time.
158 *
159 * This is useful when a client was blocked for some reason (blocking operation,
160 * CLIENT PAUSE, or whatever), because it may end with some accumulated query
161 * buffer that needs to be processed ASAP:
162 *
163 * 1. When a client is blocked, its readable handler is still active.
164 * 2. However in this case it only gets data into the query buffer, but the
165 * query is not parsed or executed once there is enough to proceed as
166 * usually (because the client is blocked... so we can't execute commands).
167 * 3. When the client is unblocked, without this function, the client would
168 * have to write some query in order for the readable handler to finally
169 * call processQueryBuffer*() on it.
170 * 4. With this function instead we can put the client in a queue that will
171 * process it for queries ready to be executed at a safe time.
172 */
173void queueClientForReprocessing(client *c) {
174 /* The client may already be into the unblocked list because of a previous
175 * blocking operation, don't add back it into the list multiple times. */
176 if (!(c->flags & CLIENT_UNBLOCKED)) {
177 c->flags |= CLIENT_UNBLOCKED;
178 listAddNodeTail(server.unblocked_clients,c);
179 }
180}
181
182/* Unblock a client calling the right function depending on the kind
183 * of operation the client is blocking for. */
184void unblockClient(client *c, int queue_for_reprocessing) {
185 if (c->bstate.btype == BLOCKED_LIST ||
186 c->bstate.btype == BLOCKED_ZSET ||
187 c->bstate.btype == BLOCKED_STREAM) {
188 unblockClientWaitingData(c);
189 } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
190 unblockClientWaitingReplicas(c);
191 } else if (c->bstate.btype == BLOCKED_MODULE) {
192 if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
193 unblockClientFromModule(c);
194 } else if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) {
195 listDelNode(server.postponed_clients,c->postponed_list_node);
196 c->postponed_list_node = NULL;
197 } else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
198 /* No special cleanup. */
199 } else if (c->bstate.btype == BLOCKED_LAZYFREE) {
200 /* No special cleanup. */
201 } else {
202 serverPanic("Unknown btype in unblockClient().");
203 }
204
205
206 /* Clear the flags, and put the client in the unblocked list so that
207 * we'll process new commands in its query buffer ASAP. */
208 if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
209 server.blocked_clients_by_type[c->bstate.btype]--;
210 c->flags &= ~CLIENT_BLOCKED;
211 c->bstate.btype = BLOCKED_NONE;
212 c->bstate.unblock_on_nokey = 0;
213 removeClientFromTimeoutTable(c);
214 if (queue_for_reprocessing) queueClientForReprocessing(c);
215}
216
217/* Check if the specified client can be safely timed out using
218 * unblockClientOnTimeout(). */
219int blockedClientMayTimeout(client *c) {
220 if (c->bstate.btype == BLOCKED_MODULE) {
221 return moduleBlockedClientMayTimeout(c);
222 }
223
224 if (c->bstate.btype == BLOCKED_LIST ||
225 c->bstate.btype == BLOCKED_ZSET ||
226 c->bstate.btype == BLOCKED_STREAM ||
227 c->bstate.btype == BLOCKED_WAIT ||
228 c->bstate.btype == BLOCKED_WAITAOF)
229 {
230 return 1;
231 }
232 return 0;
233}
234
235/* This function gets called when a blocked client timed out in order to
236 * send it a reply of some kind. After this function is called,
237 * unblockClient() will be called with the same client as argument. */
238void replyToBlockedClientTimedOut(client *c) {
239 if (c->bstate.btype == BLOCKED_LAZYFREE) {
240 addReply(c, shared.ok); /* No reason lazy-free to fail */
241 } else if (c->bstate.btype == BLOCKED_LIST ||
242 c->bstate.btype == BLOCKED_ZSET ||
243 c->bstate.btype == BLOCKED_STREAM) {
244 addReplyNullArray(c);
245 updateStatsOnUnblock(c, 0, 0, 0);
246 } else if (c->bstate.btype == BLOCKED_WAIT) {
247 addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset));
248 } else if (c->bstate.btype == BLOCKED_WAITAOF) {
249 addReplyArrayLen(c,2);
250 addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
251 addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
252 } else if (c->bstate.btype == BLOCKED_MODULE) {
253 moduleBlockedClientTimedOut(c);
254 } else {
255 serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
256 }
257}
258
259/* If one or more clients are blocked on the SHUTDOWN command, this function
260 * sends them an error reply and unblocks them. */
261void replyToClientsBlockedOnShutdown(void) {
262 if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return;
263 listNode *ln;
264 listIter li;
265 listRewind(server.clients, &li);
266 while((ln = listNext(&li))) {
267 client *c = listNodeValue(ln);
268 if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
269 c->duration = 0;
270 addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
271 unblockClient(c, 1);
272 }
273 }
274}
275
276/* Mass-unblock clients because something changed in the instance that makes
277 * blocking no longer safe. For example clients blocked in list operations
278 * in an instance which turns from master to slave is unsafe, so this function
279 * is called when a master turns into a slave.
280 *
281 * The semantics is to send an -UNBLOCKED error to the client, disconnecting
282 * it at the same time. */
283void disconnectAllBlockedClients(void) {
284 listNode *ln;
285 listIter li;
286
287 listRewind(server.clients,&li);
288 while((ln = listNext(&li))) {
289 client *c = listNodeValue(ln);
290
291 if (c->flags & CLIENT_BLOCKED) {
292 /* POSTPONEd clients are an exception, when they'll be unblocked, the
293 * command processing will start from scratch, and the command will
294 * be either executed or rejected. (unlike LIST blocked clients for
295 * which the command is already in progress in a way. */
296 if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM)
297 continue;
298
299 if (c->bstate.btype == BLOCKED_LAZYFREE) {
300 addReply(c, shared.ok); /* No reason lazy-free to fail */
301 updateStatsOnUnblock(c, 0, 0, 0);
302 c->flags &= ~CLIENT_PENDING_COMMAND;
303 unblockClient(c, 1);
304 } else {
305
306 unblockClientOnError(c,
307 "-UNBLOCKED force unblock from blocking operation, "
308 "instance state changed (master -> replica?)");
309 }
310 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
311 }
312 }
313}
314
315/* This function should be called by Redis every time a single command,
316 * a MULTI/EXEC block, or a Lua script, terminated its execution after
317 * being called by a client. It handles serving clients blocked in all scenarios
318 * where a specific key access requires to block until that key is available.
319 *
320 * All the keys with at least one client blocked that are signaled as ready
321 * are accumulated into the server.ready_keys list. This function will run
322 * the list and will serve clients accordingly.
323 * Note that the function will iterate again and again (for example as a result of serving BLMOVE
324 * we can have new blocking clients to serve because of the PUSH side of BLMOVE.)
325 *
326 * This function is normally "fair", that is, it will serve clients
327 * using a FIFO behavior. However this fairness is violated in certain
328 * edge cases, that is, when we have clients blocked at the same time
329 * in a sorted set and in a list, for the same key (a very odd thing to
330 * do client side, indeed!). Because mismatching clients (blocking for
331 * a different type compared to the current key type) are moved in the
332 * other side of the linked list. However as long as the key starts to
333 * be used only for a single type, like virtually any Redis application will
334 * do, the function is already fair. */
335void handleClientsBlockedOnKeys(void) {
336
337 /* In case we are already in the process of unblocking clients we should
338 * not make a recursive call, in order to prevent breaking fairness. */
339 static int in_handling_blocked_clients = 0;
340 if (in_handling_blocked_clients)
341 return;
342 in_handling_blocked_clients = 1;
343
344 /* This function is called only when also_propagate is in its basic state
345 * (i.e. not from call(), module context, etc.) */
346 serverAssert(server.also_propagate.numops == 0);
347
348 /* If a command being unblocked causes another command to get unblocked,
349 * like a BLMOVE would do, then the new unblocked command will get processed
350 * right away rather than wait for later. */
351 while(listLength(server.ready_keys) != 0) {
352 list *l;
353
354 /* Point server.ready_keys to a fresh list and save the current one
355 * locally. This way as we run the old list we are free to call
356 * signalKeyAsReady() that may push new elements in server.ready_keys
357 * when handling clients blocked into BLMOVE. */
358 l = server.ready_keys;
359 server.ready_keys = listCreate();
360
361 while(listLength(l) != 0) {
362 listNode *ln = listFirst(l);
363 readyList *rl = ln->value;
364
365 /* First of all remove this key from db->ready_keys so that
366 * we can safely call signalKeyAsReady() against this key. */
367 dictDelete(rl->db->ready_keys,rl->key);
368
369 handleClientsBlockedOnKey(rl);
370
371 /* Free this item. */
372 decrRefCount(rl->key);
373 zfree(rl);
374 listDelNode(l,ln);
375 }
376 listRelease(l); /* We have the new list on place at this point. */
377 }
378 in_handling_blocked_clients = 0;
379}
380
381/* Set a client in blocking mode for the specified key, with the specified timeout.
382 * The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
383 * waiting for an empty key in order to awake the client. The client is blocked
384 * for all the 'numkeys' keys as in the 'keys' argument.
385 * The client will unblocked as soon as one of the keys in 'keys' value was updated.
386 * the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key
387 * is updated to become unavailable, either by type change (override), deletion or swapdb */
388void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) {
389 dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry;
390 list *l;
391 int j;
392
393 if (!(c->flags & CLIENT_REEXECUTING_COMMAND)) {
394 /* If the client is re-processing the command, we do not set the timeout
395 * because we need to retain the client's original timeout. */
396 c->bstate.timeout = timeout;
397 }
398
399 for (j = 0; j < numkeys; j++) {
400 /* If the key already exists in the dictionary ignore it. */
401 if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
402 continue;
403 }
404 incrRefCount(keys[j]);
405
406 /* And in the other "side", to map keys -> clients */
407 db_blocked_entry = dictAddRaw(c->db->blocking_keys,keys[j], &db_blocked_existing_entry);
408
409 /* In case key[j] did not have blocking clients yet, we need to create a new list */
410 if (db_blocked_entry != NULL) {
411 l = listCreate();
412 dictSetVal(c->db->blocking_keys, db_blocked_entry, l);
413 incrRefCount(keys[j]);
414 } else {
415 l = dictGetVal(db_blocked_existing_entry);
416 }
417 listAddNodeTail(l,c);
418 dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));
419
420 /* We need to add the key to blocking_keys_unblock_on_nokey, if the client
421 * wants to be awakened if key is deleted (like XREADGROUP) */
422 if (unblock_on_nokey) {
423 db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry);
424 if (db_blocked_entry) {
425 incrRefCount(keys[j]);
426 dictSetUnsignedIntegerVal(db_blocked_entry, 1);
427 } else {
428 dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1);
429 }
430 }
431 }
432 c->bstate.unblock_on_nokey = unblock_on_nokey;
433 /* Currently we assume key blocking will require reprocessing the command.
434 * However in case of modules, they have a different way to handle the reprocessing
435 * which does not require setting the pending command flag */
436 if (btype != BLOCKED_MODULE)
437 c->flags |= CLIENT_PENDING_COMMAND;
438 blockClient(c,btype);
439}
440
441/* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP.
442 * Internal function for unblockClient() */
443static void unblockClientWaitingData(client *c) {
444 dictEntry *de;
445 dictIterator di;
446
447 if (dictSize(c->bstate.keys) == 0)
448 return;
449
450 dictInitIterator(&di, c->bstate.keys);
451 /* The client may wait for multiple keys, so unblock it for every key. */
452 while((de = dictNext(&di)) != NULL) {
453 releaseBlockedEntry(c, de, 0);
454 }
455 dictResetIterator(&di);
456 dictEmpty(c->bstate.keys, NULL);
457}
458
459static blocking_type getBlockedTypeByType(int type) {
460 switch (type) {
461 case OBJ_LIST: return BLOCKED_LIST;
462 case OBJ_ZSET: return BLOCKED_ZSET;
463 case OBJ_MODULE: return BLOCKED_MODULE;
464 case OBJ_STREAM: return BLOCKED_STREAM;
465 default: return BLOCKED_NONE;
466 }
467}
468
469/* If the specified key has clients blocked waiting for list pushes, this
470 * function will put the key reference into the server.ready_keys list.
471 * Note that db->ready_keys is a hash table that allows us to avoid putting
472 * the same key again and again in the list in case of multiple pushes
473 * made by a script or in the context of MULTI/EXEC.
474 *
475 * The list will be finally processed by handleClientsBlockedOnKeys() */
476static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) {
477 readyList *rl;
478
479 /* Quick returns. */
480 int btype = getBlockedTypeByType(type);
481 if (btype == BLOCKED_NONE) {
482 /* The type can never block. */
483 return;
484 }
485 if (!server.blocked_clients_by_type[btype] &&
486 !server.blocked_clients_by_type[BLOCKED_MODULE]) {
487 /* No clients block on this type. Note: Blocked modules are represented
488 * by BLOCKED_MODULE, even if the intention is to wake up by normal
489 * types (list, zset, stream), so we need to check that there are no
490 * blocked modules before we do a quick return here. */
491 return;
492 }
493
494 if (deleted) {
495 /* Key deleted and no clients blocking for this key? No need to queue it. */
496 if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL)
497 return;
498 /* Note: if we made it here it means the key is also present in db->blocking_keys */
499 } else {
500 /* No clients blocking for this key? No need to queue it. */
501 if (dictFind(db->blocking_keys,key) == NULL)
502 return;
503 }
504
505 dictEntry *de, *existing;
506 de = dictAddRaw(db->ready_keys, key, &existing);
507 if (de) {
508 /* We add the key in the db->ready_keys dictionary in order
509 * to avoid adding it multiple times into a list with a simple O(1)
510 * check. */
511 incrRefCount(key);
512 } else {
513 /* Key was already signaled? No need to queue it again. */
514 return;
515 }
516
517 /* Ok, we need to queue this key into server.ready_keys. */
518 rl = zmalloc(sizeof(*rl));
519 rl->key = key;
520 rl->db = db;
521 incrRefCount(key);
522 listAddNodeTail(server.ready_keys,rl);
523}
524
525/* Helper function to wrap the logic of removing a client blocked key entry
526 * In this case we would like to do the following:
527 * 1. unlink the client from the global DB locked client list
528 * 2. remove the entry from the global db blocking list in case the list is empty
529 * 3. in case the global list is empty, also remove the key from the global dict of keys
530 * which should trigger unblock on key deletion
531 * 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys,
532 * but unblocked when only one of them is triggered, we would like to avoid deleting each key separately
533 * and instead clear the dictionary in one-shot. this is why the remove_key argument is provided
534 * to support this logic in unblockClientWaitingData
535 */
536static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
537 list *l;
538 listNode *pos;
539 void *key;
540 dictEntry *unblock_on_nokey_entry;
541
542 key = dictGetKey(de);
543 pos = dictGetVal(de);
544 /* Remove this client from the list of clients waiting for this key. */
545 l = dictFetchValue(c->db->blocking_keys, key);
546 serverAssertWithInfo(c,key,l != NULL);
547 listUnlinkNode(l,pos);
548 /* If the list is empty we need to remove it to avoid wasting memory
549 * We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict.
550 * However, in case the list is not empty, we will have to still perform reference accounting
551 * on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference.
552 * Why? because it is possible that some more clients are blocked on the same key but without
553 * require to be triggered on key deletion, we do not want these to be later triggered by the
554 * signalDeletedKeyAsReady. */
555 if (listLength(l) == 0) {
556 dictDelete(c->db->blocking_keys, key);
557 dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
558 } else if (c->bstate.unblock_on_nokey) {
559 unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key);
560 /* it is not possible to have a client blocked on nokey with no matching entry */
561 serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL);
562 if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) {
563 /* in case the count is zero, we can delete the entry */
564 dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
565 }
566 }
567 if (remove_key)
568 dictDelete(c->bstate.keys, key);
569}
570
571void signalKeyAsReady(redisDb *db, robj *key, int type) {
572 signalKeyAsReadyLogic(db, key, type, 0);
573}
574
575void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) {
576 signalKeyAsReadyLogic(db, key, type, 1);
577}
578
579/* Helper function for handleClientsBlockedOnKeys(). This function is called
580 * whenever a key is ready. we iterate over all the clients blocked on this key
581 * and try to re-execute the command (in case the key is still available). */
582static void handleClientsBlockedOnKey(readyList *rl) {
583
584 /* We serve clients in the same order they blocked for
585 * this key, from the first blocked to the last. */
586 dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
587
588 if (de) {
589 list *clients = dictGetVal(de);
590 listNode *ln;
591 listIter li;
592 listRewind(clients,&li);
593
594 /* Avoid processing more than the initial count so that we're not stuck
595 * in an endless loop in case the reprocessing of the command blocks again. */
596 long count = listLength(clients);
597 while ((ln = listNext(&li)) && count--) {
598 client *receiver = listNodeValue(ln);
599 kvobj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS);
600 /* 1. In case new key was added/touched we need to verify it satisfy the
601 * blocked type, since we might process the wrong key type.
602 * 2. We want to serve clients blocked on module keys
603 * regardless of the object type: we don't know what the
604 * module is trying to accomplish right now.
605 * 3. In case of XREADGROUP call we will want to unblock on any change in object type
606 * or in case the key was deleted, since the group is no longer valid. */
607 if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) ||
608 (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) ||
609 (receiver->bstate.unblock_on_nokey))
610 {
611 if (receiver->bstate.btype != BLOCKED_MODULE)
612 unblockClientOnKey(receiver, rl->key);
613 else
614 moduleUnblockClientOnKey(receiver, rl->key);
615 }
616 }
617 }
618}
619
620/* block a client due to wait command */
621void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
622 c->bstate.timeout = timeout;
623 c->bstate.reploffset = offset;
624 c->bstate.numreplicas = numreplicas;
625 listAddNodeHead(server.clients_waiting_acks,c);
626 blockClient(c,BLOCKED_WAIT);
627}
628
629/* block a client due to waitaof command */
630void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
631 c->bstate.timeout = timeout;
632 c->bstate.reploffset = offset;
633 c->bstate.numreplicas = numreplicas;
634 c->bstate.numlocal = numlocal;
635 listAddNodeHead(server.clients_waiting_acks,c);
636 blockClient(c,BLOCKED_WAITAOF);
637}
638
639/* Postpone client from executing a command. For example the server might be busy
640 * requesting to avoid processing clients commands which will be processed later
641 * when the it is ready to accept them. */
642void blockPostponeClientWithType(client *c, int btype) {
643 serverAssert(btype == BLOCKED_POSTPONE || btype == BLOCKED_POSTPONE_TRIM);
644 c->bstate.timeout = 0;
645 blockClient(c, btype);
646 listAddNodeTail(server.postponed_clients, c);
647 c->postponed_list_node = listLast(server.postponed_clients);
648 /* Mark this client to execute its command */
649 c->flags |= CLIENT_PENDING_COMMAND;
650}
651
652/* Postpone client from executing a command. */
653void blockPostponeClient(client *c) {
654 blockPostponeClientWithType(c, BLOCKED_POSTPONE);
655}
656
657/* Block client due to shutdown command */
658void blockClientShutdown(client *c) {
659 blockClient(c, BLOCKED_SHUTDOWN);
660}
661
662/* Unblock a client once a specific key became available for it.
663 * This function will remove the client from the list of clients blocked on this key
664 * and also remove the key from the dictionary of keys this client is blocked on.
665 * in case the client has a command pending it will process it immediately. */
666static void unblockClientOnKey(client *c, robj *key) {
667 dictEntry *de;
668
669 de = dictFind(c->bstate.keys, key);
670 releaseBlockedEntry(c, de, 1);
671
672 /* Only in case of blocking API calls, we might be blocked on several keys.
673 however we should force unblock the entire blocking keys */
674 serverAssert(c->bstate.btype == BLOCKED_STREAM ||
675 c->bstate.btype == BLOCKED_LIST ||
676 c->bstate.btype == BLOCKED_ZSET);
677
678 /* We need to unblock the client before calling processCommandAndResetClient
679 * because it checks the CLIENT_BLOCKED flag */
680 unblockClient(c, 0);
681 /* In case this client was blocked on keys during command
682 * we need to re process the command again */
683 if (c->flags & CLIENT_PENDING_COMMAND) {
684 c->flags &= ~CLIENT_PENDING_COMMAND;
685 c->flags |= CLIENT_REEXECUTING_COMMAND;
686 /* We want the command processing and the unblock handler (see RM_Call 'K' option)
687 * to run atomically, this is why we must enter the execution unit here before
688 * running the command, and exit the execution unit after calling the unblock handler (if exists).
689 * Notice that we also must set the current client so it will be available
690 * when we will try to send the client side caching notification (done on 'afterCommand'). */
691 client *old_client = server.current_client;
692 server.current_client = c;
693 enterExecutionUnit(1, 0);
694 processCommandAndResetClient(c);
695 if (!(c->flags & CLIENT_BLOCKED)) {
696 if (c->flags & CLIENT_MODULE) {
697 moduleCallCommandUnblockedHandler(c);
698 } else {
699 queueClientForReprocessing(c);
700 }
701 }
702 exitExecutionUnit();
703 afterCommand(c);
704 /* Clear the CLIENT_REEXECUTING_COMMAND flag after the proc is executed. */
705 c->flags &= ~CLIENT_REEXECUTING_COMMAND;
706 server.current_client = old_client;
707 }
708}
709
710/* Unblock a client blocked on the specific key from module context.
711 * This function will try to serve the module call, and in case it succeeds,
712 * it will add the client to the list of module unblocked clients which will
713 * be processed in moduleHandleBlockedClients. */
714static void moduleUnblockClientOnKey(client *c, robj *key) {
715 long long prev_error_replies = server.stat_total_error_replies;
716 client *old_client = server.current_client;
717 server.current_client = c;
718 monotime replyTimer;
719 elapsedStart(&replyTimer);
720
721 if (moduleTryServeClientBlockedOnKey(c, key)) {
722 updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
723 moduleUnblockClient(c);
724 }
725 /* We need to call afterCommand even if the client was not unblocked
726 * in order to propagate any changes that could have been done inside
727 * moduleTryServeClientBlockedOnKey */
728 afterCommand(c);
729 server.current_client = old_client;
730}
731
732/* Unblock a client which is currently Blocked on and provided a timeout.
733 * The implementation will first reply to the blocked client with null response
734 * or, in case of module blocked client the timeout callback will be used.
735 * In this case since we might have a command pending
736 * we want to remove the pending flag to indicate we already responded to the
737 * command with timeout reply. */
738void unblockClientOnTimeout(client *c) {
739 /* The client has been unlocked (in the moduleUnblocked list), return ASAP. */
740 if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
741
742 replyToBlockedClientTimedOut(c);
743 if (c->flags & CLIENT_PENDING_COMMAND)
744 c->flags &= ~CLIENT_PENDING_COMMAND;
745 unblockClient(c, 1);
746}
747
748/* Unblock a client which is currently Blocked with error.
749 * If err_str is provided it will be used to reply to the blocked client */
750void unblockClientOnError(client *c, const char *err_str) {
751 if (err_str)
752 addReplyError(c, err_str);
753 updateStatsOnUnblock(c, 0, 0, 1);
754 if (c->flags & CLIENT_PENDING_COMMAND)
755 c->flags &= ~CLIENT_PENDING_COMMAND;
756 unblockClient(c, 1);
757}
758
759void blockedBeforeSleep(void) {
760 /* Handle precise timeouts of blocked clients. */
761 handleBlockedClientsTimeout();
762
763 /* Handle for expired pending entries. */
764 handleClaimableStreamEntries();
765
766 /* Unblock all the clients blocked for synchronous replication
767 * in WAIT or WAITAOF. */
768 if (listLength(server.clients_waiting_acks))
769 processClientsWaitingReplicas();
770
771 /* Try to process blocked clients every once in while.
772 *
773 * Example: A module calls RM_SignalKeyAsReady from within a timer callback
774 * (So we don't visit processCommand() at all).
775 *
776 * This may unblock clients, so must be done before processUnblockedClients */
777 handleClientsBlockedOnKeys();
778
779 /* Check if there are clients unblocked by modules that implement
780 * blocking commands. */
781 if (moduleCount())
782 moduleHandleBlockedClients();
783
784 /* Try to process pending commands for clients that were just unblocked. */
785 if (listLength(server.unblocked_clients))
786 processUnblockedClients();
787}