1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
#include "server.h"
#include "cluster.h"
#include <math.h>
/* ========================== Clients timeouts ============================= */
/* Check if this blocked client timedout (does nothing if the client is
* not blocked right now). If so send a reply, unblock it, and return 1.
* Otherwise 0 is returned and no operation is performed. */
int checkBlockedClientTimeout(client *c, mstime_t now) {
if (c->flags & CLIENT_BLOCKED &&
c->bstate.timeout != 0
&& c->bstate.timeout < now)
{
/* Handle blocking operation specific timeout. */
unblockClientOnTimeout(c);
return 1;
} else {
return 0;
}
}
/* Check for timeouts. Returns non-zero if the client was terminated.
* The function gets the current time in milliseconds as argument since
* it gets called multiple times in a loop, so calling gettimeofday() for
* each iteration would be costly without any actual gain. */
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
time_t now = now_ms/1000;
if (server.maxidletime &&
/* This handles the idle clients connection timeout if set. */
!(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
!mustObeyClient(c) && /* No timeout for masters and AOF */
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
{
serverLog(LL_VERBOSE,"Closing idle client");
freeClient(c);
return 1;
} else if (c->flags & CLIENT_BLOCKED) {
/* Cluster: handle unblock & redirect of clients blocked
* into keys no longer served by this server. */
if (server.cluster_enabled) {
if (clusterRedirectBlockedClientIfNeeded(c))
unblockClientOnError(c, NULL);
}
}
return 0;
}
/* For blocked clients timeouts we populate a radix tree of 128 bit keys
* composed as such:
*
* [8 byte big endian expire time]+[8 byte client ID]
*
* We don't do any cleanup in the Radix tree: when we run the clients that
* reached the timeout already, if they are no longer existing or no longer
* blocked with such timeout, we just go forward.
*
* Every time a client blocks with a timeout, we add the client in
* the tree. In beforeSleep() we call handleBlockedClientsTimeout() to run
* the tree and unblock the clients. */
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
/* Given client ID and timeout, write the resulting radix tree key in buf. */
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, client *c) {
timeout = htonu64(timeout);
memcpy(buf,&timeout,sizeof(timeout));
memcpy(buf+8,&c,sizeof(c));
if (sizeof(c) == 4) memset(buf+12,0,4); /* Zero padding for 32bit target. */
}
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
* the timeout into *toptr and the client pointer into *cptr. */
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) {
memcpy(toptr,buf,sizeof(*toptr));
*toptr = ntohu64(*toptr);
memcpy(cptr,buf+8,sizeof(*cptr));
}
/* Add the specified client id / timeout as a key in the radix tree we use
* to handle blocked clients timeouts. The client is not added to the list
* if its timeout is zero (block forever). */
void addClientToTimeoutTable(client *c) {
if (c->bstate.timeout == 0) return;
uint64_t timeout = c->bstate.timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,c);
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
c->flags |= CLIENT_IN_TO_TABLE;
}
/* Remove the client from the table when it is unblocked for reasons
* different than timing out. */
void removeClientFromTimeoutTable(client *c) {
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
c->flags &= ~CLIENT_IN_TO_TABLE;
uint64_t timeout = c->bstate.timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,c);
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
}
/* This function is called in beforeSleep() in order to unblock clients
* that are waiting in blocking operations with a timeout set. */
void handleBlockedClientsTimeout(void) {
if (raxSize(server.clients_timeout_table) == 0) return;
uint64_t now = mstime();
raxIterator ri;
raxStart(&ri,server.clients_timeout_table);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t timeout;
client *c;
decodeTimeoutKey(ri.key,&timeout,&c);
if (timeout >= now) break; /* All the timeouts are in the future. */
c->flags &= ~CLIENT_IN_TO_TABLE;
checkBlockedClientTimeout(c,now);
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
raxSeek(&ri,"^",NULL,0);
}
raxStop(&ri);
}
/* Get a timeout value from an object and store it into 'timeout'.
* The final timeout is always stored as milliseconds as a time where the
* timeout will expire, however the parsing is performed according to
* the 'unit' that can be seconds or milliseconds.
*
* Note that if the timeout is zero (usually from the point of view of
* commands API this means no timeout) the value stored into 'timeout'
* is zero. */
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
long long tval;
long double ftval;
mstime_t now = commandTimeSnapshot();
if (unit == UNIT_SECONDS) {
if (getLongDoubleFromObjectOrReply(c,object,&ftval,
"timeout is not a float or out of range") != C_OK)
return C_ERR;
ftval *= 1000.0; /* seconds => millisec */
if (ftval > LLONG_MAX) {
addReplyError(c, "timeout is out of range");
return C_ERR;
}
tval = (long long) ceill(ftval);
} else {
if (getLongLongFromObjectOrReply(c,object,&tval,
"timeout is not an integer or out of range") != C_OK)
return C_ERR;
}
if (tval < 0) {
addReplyError(c,"timeout is negative");
return C_ERR;
}
if (tval > 0) {
if (tval > LLONG_MAX - now) {
addReplyError(c,"timeout is out of range"); /* 'tval+now' would overflow */
return C_ERR;
}
tval += now;
}
*timeout = tval;
return C_OK;
}
|