summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/timeout.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/timeout.c')
-rw-r--r--examples/redis-unstable/src/timeout.c183
1 files changed, 183 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/timeout.c b/examples/redis-unstable/src/timeout.c
new file mode 100644
index 0000000..01848ce
--- /dev/null
+++ b/examples/redis-unstable/src/timeout.c
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2009-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ */
+
+#include "server.h"
+#include "cluster.h"
+
+#include <math.h>
+
+/* ========================== Clients timeouts ============================= */
+
+/* Check if this blocked client timedout (does nothing if the client is
+ * not blocked right now). If so send a reply, unblock it, and return 1.
+ * Otherwise 0 is returned and no operation is performed. */
+int checkBlockedClientTimeout(client *c, mstime_t now) {
+ if (c->flags & CLIENT_BLOCKED &&
+ c->bstate.timeout != 0
+ && c->bstate.timeout < now)
+ {
+ /* Handle blocking operation specific timeout. */
+ unblockClientOnTimeout(c);
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/* Check for timeouts. Returns non-zero if the client was terminated.
+ * The function gets the current time in milliseconds as argument since
+ * it gets called multiple times in a loop, so calling gettimeofday() for
+ * each iteration would be costly without any actual gain. */
+int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
+ time_t now = now_ms/1000;
+
+ if (server.maxidletime &&
+ /* This handles the idle clients connection timeout if set. */
+ !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
+ !mustObeyClient(c) && /* No timeout for masters and AOF */
+ !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
+ !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
+ (now - c->lastinteraction > server.maxidletime))
+ {
+ serverLog(LL_VERBOSE,"Closing idle client");
+ freeClient(c);
+ return 1;
+ } else if (c->flags & CLIENT_BLOCKED) {
+ /* Cluster: handle unblock & redirect of clients blocked
+ * into keys no longer served by this server. */
+ if (server.cluster_enabled) {
+ if (clusterRedirectBlockedClientIfNeeded(c))
+ unblockClientOnError(c, NULL);
+ }
+ }
+ return 0;
+}
+
+/* For blocked clients timeouts we populate a radix tree of 128 bit keys
+ * composed as such:
+ *
+ * [8 byte big endian expire time]+[8 byte client ID]
+ *
+ * We don't do any cleanup in the Radix tree: when we run the clients that
+ * reached the timeout already, if they are no longer existing or no longer
+ * blocked with such timeout, we just go forward.
+ *
+ * Every time a client blocks with a timeout, we add the client in
+ * the tree. In beforeSleep() we call handleBlockedClientsTimeout() to run
+ * the tree and unblock the clients. */
+
+#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
+
+/* Given client ID and timeout, write the resulting radix tree key in buf. */
+void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, client *c) {
+ timeout = htonu64(timeout);
+ memcpy(buf,&timeout,sizeof(timeout));
+ memcpy(buf+8,&c,sizeof(c));
+ if (sizeof(c) == 4) memset(buf+12,0,4); /* Zero padding for 32bit target. */
+}
+
+/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
+ * the timeout into *toptr and the client pointer into *cptr. */
+void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) {
+ memcpy(toptr,buf,sizeof(*toptr));
+ *toptr = ntohu64(*toptr);
+ memcpy(cptr,buf+8,sizeof(*cptr));
+}
+
+/* Add the specified client id / timeout as a key in the radix tree we use
+ * to handle blocked clients timeouts. The client is not added to the list
+ * if its timeout is zero (block forever). */
+void addClientToTimeoutTable(client *c) {
+ if (c->bstate.timeout == 0) return;
+ uint64_t timeout = c->bstate.timeout;
+ unsigned char buf[CLIENT_ST_KEYLEN];
+ encodeTimeoutKey(buf,timeout,c);
+ if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
+ c->flags |= CLIENT_IN_TO_TABLE;
+}
+
+/* Remove the client from the table when it is unblocked for reasons
+ * different than timing out. */
+void removeClientFromTimeoutTable(client *c) {
+ if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
+ c->flags &= ~CLIENT_IN_TO_TABLE;
+ uint64_t timeout = c->bstate.timeout;
+ unsigned char buf[CLIENT_ST_KEYLEN];
+ encodeTimeoutKey(buf,timeout,c);
+ raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
+}
+
+/* This function is called in beforeSleep() in order to unblock clients
+ * that are waiting in blocking operations with a timeout set. */
+void handleBlockedClientsTimeout(void) {
+ if (raxSize(server.clients_timeout_table) == 0) return;
+ uint64_t now = mstime();
+ raxIterator ri;
+ raxStart(&ri,server.clients_timeout_table);
+ raxSeek(&ri,"^",NULL,0);
+
+ while(raxNext(&ri)) {
+ uint64_t timeout;
+ client *c;
+ decodeTimeoutKey(ri.key,&timeout,&c);
+ if (timeout >= now) break; /* All the timeouts are in the future. */
+ c->flags &= ~CLIENT_IN_TO_TABLE;
+ checkBlockedClientTimeout(c,now);
+ raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
+ raxSeek(&ri,"^",NULL,0);
+ }
+ raxStop(&ri);
+}
+
+/* Get a timeout value from an object and store it into 'timeout'.
+ * The final timeout is always stored as milliseconds as a time where the
+ * timeout will expire, however the parsing is performed according to
+ * the 'unit' that can be seconds or milliseconds.
+ *
+ * Note that if the timeout is zero (usually from the point of view of
+ * commands API this means no timeout) the value stored into 'timeout'
+ * is zero. */
+int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
+ long long tval;
+ long double ftval;
+ mstime_t now = commandTimeSnapshot();
+
+ if (unit == UNIT_SECONDS) {
+ if (getLongDoubleFromObjectOrReply(c,object,&ftval,
+ "timeout is not a float or out of range") != C_OK)
+ return C_ERR;
+
+ ftval *= 1000.0; /* seconds => millisec */
+ if (ftval > LLONG_MAX) {
+ addReplyError(c, "timeout is out of range");
+ return C_ERR;
+ }
+ tval = (long long) ceill(ftval);
+ } else {
+ if (getLongLongFromObjectOrReply(c,object,&tval,
+ "timeout is not an integer or out of range") != C_OK)
+ return C_ERR;
+ }
+
+ if (tval < 0) {
+ addReplyError(c,"timeout is negative");
+ return C_ERR;
+ }
+
+ if (tval > 0) {
+ if (tval > LLONG_MAX - now) {
+ addReplyError(c,"timeout is out of range"); /* 'tval+now' would overflow */
+ return C_ERR;
+ }
+ tval += now;
+ }
+ *timeout = tval;
+
+ return C_OK;
+}