aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/deps/hiredis/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/deps/hiredis/async.c')
-rw-r--r--examples/redis-unstable/deps/hiredis/async.c1034
1 files changed, 1034 insertions, 0 deletions
diff --git a/examples/redis-unstable/deps/hiredis/async.c b/examples/redis-unstable/deps/hiredis/async.c
new file mode 100644
index 0000000..ac56353
--- /dev/null
+++ b/examples/redis-unstable/deps/hiredis/async.c
@@ -0,0 +1,1034 @@
1/*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "fmacros.h"
33#include "alloc.h"
34#include <stdlib.h>
35#include <string.h>
36#ifndef _MSC_VER
37#include <strings.h>
38#endif
39#include <assert.h>
40#include <ctype.h>
41#include <errno.h>
42#include "async.h"
43#include "net.h"
44#include "dict.c"
45#include "sds.h"
46#include "win32.h"
47
48#include "async_private.h"
49
50#ifdef NDEBUG
51#undef assert
52#define assert(e) (void)(e)
53#endif
54
55/* Forward declarations of hiredis.c functions */
56int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
57void __redisSetError(redisContext *c, int type, const char *str);
58
59/* Functions managing dictionary of callbacks for pub/sub. */
60static unsigned int callbackHash(const void *key) {
61 return dictGenHashFunction((const unsigned char *)key,
62 hi_sdslen((const hisds)key));
63}
64
65static void *callbackValDup(void *privdata, const void *src) {
66 ((void) privdata);
67 redisCallback *dup;
68
69 dup = hi_malloc(sizeof(*dup));
70 if (dup == NULL)
71 return NULL;
72
73 memcpy(dup,src,sizeof(*dup));
74 return dup;
75}
76
77static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
78 int l1, l2;
79 ((void) privdata);
80
81 l1 = hi_sdslen((const hisds)key1);
82 l2 = hi_sdslen((const hisds)key2);
83 if (l1 != l2) return 0;
84 return memcmp(key1,key2,l1) == 0;
85}
86
87static void callbackKeyDestructor(void *privdata, void *key) {
88 ((void) privdata);
89 hi_sdsfree((hisds)key);
90}
91
92static void callbackValDestructor(void *privdata, void *val) {
93 ((void) privdata);
94 hi_free(val);
95}
96
97static dictType callbackDict = {
98 callbackHash,
99 NULL,
100 callbackValDup,
101 callbackKeyCompare,
102 callbackKeyDestructor,
103 callbackValDestructor
104};
105
106static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
107 redisAsyncContext *ac;
108 dict *channels = NULL, *patterns = NULL;
109
110 channels = dictCreate(&callbackDict,NULL);
111 if (channels == NULL)
112 goto oom;
113
114 patterns = dictCreate(&callbackDict,NULL);
115 if (patterns == NULL)
116 goto oom;
117
118 ac = hi_realloc(c,sizeof(redisAsyncContext));
119 if (ac == NULL)
120 goto oom;
121
122 c = &(ac->c);
123
124 /* The regular connect functions will always set the flag REDIS_CONNECTED.
125 * For the async API, we want to wait until the first write event is
126 * received up before setting this flag, so reset it here. */
127 c->flags &= ~REDIS_CONNECTED;
128
129 ac->err = 0;
130 ac->errstr = NULL;
131 ac->data = NULL;
132 ac->dataCleanup = NULL;
133
134 ac->ev.data = NULL;
135 ac->ev.addRead = NULL;
136 ac->ev.delRead = NULL;
137 ac->ev.addWrite = NULL;
138 ac->ev.delWrite = NULL;
139 ac->ev.cleanup = NULL;
140 ac->ev.scheduleTimer = NULL;
141
142 ac->onConnect = NULL;
143 ac->onConnectNC = NULL;
144 ac->onDisconnect = NULL;
145
146 ac->replies.head = NULL;
147 ac->replies.tail = NULL;
148 ac->sub.replies.head = NULL;
149 ac->sub.replies.tail = NULL;
150 ac->sub.channels = channels;
151 ac->sub.patterns = patterns;
152 ac->sub.pending_unsubs = 0;
153
154 return ac;
155oom:
156 if (channels) dictRelease(channels);
157 if (patterns) dictRelease(patterns);
158 return NULL;
159}
160
161/* We want the error field to be accessible directly instead of requiring
162 * an indirection to the redisContext struct. */
163static void __redisAsyncCopyError(redisAsyncContext *ac) {
164 if (!ac)
165 return;
166
167 redisContext *c = &(ac->c);
168 ac->err = c->err;
169 ac->errstr = c->errstr;
170}
171
172redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
173 redisOptions myOptions = *options;
174 redisContext *c;
175 redisAsyncContext *ac;
176
177 /* Clear any erroneously set sync callback and flag that we don't want to
178 * use freeReplyObject by default. */
179 myOptions.push_cb = NULL;
180 myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
181
182 myOptions.options |= REDIS_OPT_NONBLOCK;
183 c = redisConnectWithOptions(&myOptions);
184 if (c == NULL) {
185 return NULL;
186 }
187
188 ac = redisAsyncInitialize(c);
189 if (ac == NULL) {
190 redisFree(c);
191 return NULL;
192 }
193
194 /* Set any configured async push handler */
195 redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
196
197 __redisAsyncCopyError(ac);
198 return ac;
199}
200
201redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
202 redisOptions options = {0};
203 REDIS_OPTIONS_SET_TCP(&options, ip, port);
204 return redisAsyncConnectWithOptions(&options);
205}
206
207redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
208 const char *source_addr) {
209 redisOptions options = {0};
210 REDIS_OPTIONS_SET_TCP(&options, ip, port);
211 options.endpoint.tcp.source_addr = source_addr;
212 return redisAsyncConnectWithOptions(&options);
213}
214
215redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
216 const char *source_addr) {
217 redisOptions options = {0};
218 REDIS_OPTIONS_SET_TCP(&options, ip, port);
219 options.options |= REDIS_OPT_REUSEADDR;
220 options.endpoint.tcp.source_addr = source_addr;
221 return redisAsyncConnectWithOptions(&options);
222}
223
224redisAsyncContext *redisAsyncConnectUnix(const char *path) {
225 redisOptions options = {0};
226 REDIS_OPTIONS_SET_UNIX(&options, path);
227 return redisAsyncConnectWithOptions(&options);
228}
229
230static int
231redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn,
232 redisConnectCallbackNC *fn_nc)
233{
234 /* If either are already set, this is an error */
235 if (ac->onConnect || ac->onConnectNC)
236 return REDIS_ERR;
237
238 if (fn) {
239 ac->onConnect = fn;
240 } else if (fn_nc) {
241 ac->onConnectNC = fn_nc;
242 }
243
244 /* The common way to detect an established connection is to wait for
245 * the first write event to be fired. This assumes the related event
246 * library functions are already set. */
247 _EL_ADD_WRITE(ac);
248
249 return REDIS_OK;
250}
251
252int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
253 return redisAsyncSetConnectCallbackImpl(ac, fn, NULL);
254}
255
256int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn) {
257 return redisAsyncSetConnectCallbackImpl(ac, NULL, fn);
258}
259
260int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
261 if (ac->onDisconnect == NULL) {
262 ac->onDisconnect = fn;
263 return REDIS_OK;
264 }
265 return REDIS_ERR;
266}
267
268/* Helper functions to push/shift callbacks */
269static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
270 redisCallback *cb;
271
272 /* Copy callback from stack to heap */
273 cb = hi_malloc(sizeof(*cb));
274 if (cb == NULL)
275 return REDIS_ERR_OOM;
276
277 if (source != NULL) {
278 memcpy(cb,source,sizeof(*cb));
279 cb->next = NULL;
280 }
281
282 /* Store callback in list */
283 if (list->head == NULL)
284 list->head = cb;
285 if (list->tail != NULL)
286 list->tail->next = cb;
287 list->tail = cb;
288 return REDIS_OK;
289}
290
291static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
292 redisCallback *cb = list->head;
293 if (cb != NULL) {
294 list->head = cb->next;
295 if (cb == list->tail)
296 list->tail = NULL;
297
298 /* Copy callback from heap to stack */
299 if (target != NULL)
300 memcpy(target,cb,sizeof(*cb));
301 hi_free(cb);
302 return REDIS_OK;
303 }
304 return REDIS_ERR;
305}
306
307static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
308 redisContext *c = &(ac->c);
309 if (cb->fn != NULL) {
310 c->flags |= REDIS_IN_CALLBACK;
311 cb->fn(ac,reply,cb->privdata);
312 c->flags &= ~REDIS_IN_CALLBACK;
313 }
314}
315
316static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
317 if (ac->push_cb != NULL) {
318 ac->c.flags |= REDIS_IN_CALLBACK;
319 ac->push_cb(ac, reply);
320 ac->c.flags &= ~REDIS_IN_CALLBACK;
321 }
322}
323
324static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
325{
326 if (ac->onConnect == NULL && ac->onConnectNC == NULL)
327 return;
328
329 if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
330 ac->c.flags |= REDIS_IN_CALLBACK;
331 if (ac->onConnect) {
332 ac->onConnect(ac, status);
333 } else {
334 ac->onConnectNC(ac, status);
335 }
336 ac->c.flags &= ~REDIS_IN_CALLBACK;
337 } else {
338 /* already in callback */
339 if (ac->onConnect) {
340 ac->onConnect(ac, status);
341 } else {
342 ac->onConnectNC(ac, status);
343 }
344 }
345}
346
347static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
348{
349 if (ac->onDisconnect) {
350 if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
351 ac->c.flags |= REDIS_IN_CALLBACK;
352 ac->onDisconnect(ac, status);
353 ac->c.flags &= ~REDIS_IN_CALLBACK;
354 } else {
355 /* already in callback */
356 ac->onDisconnect(ac, status);
357 }
358 }
359}
360
361/* Helper function to free the context. */
362static void __redisAsyncFree(redisAsyncContext *ac) {
363 redisContext *c = &(ac->c);
364 redisCallback cb;
365 dictIterator it;
366 dictEntry *de;
367
368 /* Execute pending callbacks with NULL reply. */
369 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
370 __redisRunCallback(ac,&cb,NULL);
371 while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
372 __redisRunCallback(ac,&cb,NULL);
373
374 /* Run subscription callbacks with NULL reply */
375 if (ac->sub.channels) {
376 dictInitIterator(&it,ac->sub.channels);
377 while ((de = dictNext(&it)) != NULL)
378 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
379
380 dictRelease(ac->sub.channels);
381 }
382
383 if (ac->sub.patterns) {
384 dictInitIterator(&it,ac->sub.patterns);
385 while ((de = dictNext(&it)) != NULL)
386 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
387
388 dictRelease(ac->sub.patterns);
389 }
390
391 /* Signal event lib to clean up */
392 _EL_CLEANUP(ac);
393
394 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
395 * this context, the status will always be REDIS_OK. */
396 if (c->flags & REDIS_CONNECTED) {
397 int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
398 if (c->flags & REDIS_FREEING)
399 status = REDIS_OK;
400 __redisRunDisconnectCallback(ac, status);
401 }
402
403 if (ac->dataCleanup) {
404 ac->dataCleanup(ac->data);
405 }
406
407 /* Cleanup self */
408 redisFree(c);
409}
410
411/* Free the async context. When this function is called from a callback,
412 * control needs to be returned to redisProcessCallbacks() before actual
413 * free'ing. To do so, a flag is set on the context which is picked up by
414 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
415void redisAsyncFree(redisAsyncContext *ac) {
416 if (ac == NULL)
417 return;
418
419 redisContext *c = &(ac->c);
420
421 c->flags |= REDIS_FREEING;
422 if (!(c->flags & REDIS_IN_CALLBACK))
423 __redisAsyncFree(ac);
424}
425
426/* Helper function to make the disconnect happen and clean up. */
427void __redisAsyncDisconnect(redisAsyncContext *ac) {
428 redisContext *c = &(ac->c);
429
430 /* Make sure error is accessible if there is any */
431 __redisAsyncCopyError(ac);
432
433 if (ac->err == 0) {
434 /* For clean disconnects, there should be no pending callbacks. */
435 int ret = __redisShiftCallback(&ac->replies,NULL);
436 assert(ret == REDIS_ERR);
437 } else {
438 /* Disconnection is caused by an error, make sure that pending
439 * callbacks cannot call new commands. */
440 c->flags |= REDIS_DISCONNECTING;
441 }
442
443 /* cleanup event library on disconnect.
444 * this is safe to call multiple times */
445 _EL_CLEANUP(ac);
446
447 /* For non-clean disconnects, __redisAsyncFree() will execute pending
448 * callbacks with a NULL-reply. */
449 if (!(c->flags & REDIS_NO_AUTO_FREE)) {
450 __redisAsyncFree(ac);
451 }
452}
453
454/* Tries to do a clean disconnect from Redis, meaning it stops new commands
455 * from being issued, but tries to flush the output buffer and execute
456 * callbacks for all remaining replies. When this function is called from a
457 * callback, there might be more replies and we can safely defer disconnecting
458 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
459 * when there are no pending callbacks. */
460void redisAsyncDisconnect(redisAsyncContext *ac) {
461 redisContext *c = &(ac->c);
462 c->flags |= REDIS_DISCONNECTING;
463
464 /** unset the auto-free flag here, because disconnect undoes this */
465 c->flags &= ~REDIS_NO_AUTO_FREE;
466 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
467 __redisAsyncDisconnect(ac);
468}
469
470static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
471 redisContext *c = &(ac->c);
472 dict *callbacks;
473 redisCallback *cb = NULL;
474 dictEntry *de;
475 int pvariant;
476 char *stype;
477 hisds sname = NULL;
478
479 /* Match reply with the expected format of a pushed message.
480 * The type and number of elements (3 to 4) are specified at:
481 * https://redis.io/docs/latest/develop/interact/pubsub/#format-of-pushed-messages */
482 if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
483 reply->type == REDIS_REPLY_PUSH) {
484 assert(reply->element[0]->type == REDIS_REPLY_STRING);
485 stype = reply->element[0]->str;
486 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
487
488 if (pvariant)
489 callbacks = ac->sub.patterns;
490 else
491 callbacks = ac->sub.channels;
492
493 /* Locate the right callback */
494 if (reply->element[1]->type == REDIS_REPLY_STRING) {
495 sname = hi_sdsnewlen(reply->element[1]->str,reply->element[1]->len);
496 if (sname == NULL) goto oom;
497
498 if ((de = dictFind(callbacks,sname)) != NULL) {
499 cb = dictGetEntryVal(de);
500 memcpy(dstcb,cb,sizeof(*dstcb));
501 }
502 }
503
504 /* If this is an subscribe reply decrease pending counter. */
505 if (strcasecmp(stype+pvariant,"subscribe") == 0) {
506 assert(cb != NULL);
507 cb->pending_subs -= 1;
508
509 } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
510 if (cb == NULL)
511 ac->sub.pending_unsubs -= 1;
512 else if (cb->pending_subs == 0)
513 dictDelete(callbacks,sname);
514
515 /* If this was the last unsubscribe message, revert to
516 * non-subscribe mode. */
517 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
518
519 /* Unset subscribed flag only when no pipelined pending subscribe
520 * or pending unsubscribe replies. */
521 if (reply->element[2]->integer == 0
522 && dictSize(ac->sub.channels) == 0
523 && dictSize(ac->sub.patterns) == 0
524 && ac->sub.pending_unsubs == 0) {
525 c->flags &= ~REDIS_SUBSCRIBED;
526
527 /* Move ongoing regular command callbacks. */
528 redisCallback cb;
529 while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
530 __redisPushCallback(&ac->replies,&cb);
531 }
532 }
533 }
534 hi_sdsfree(sname);
535 } else {
536 /* Shift callback for pending command in subscribed context. */
537 __redisShiftCallback(&ac->sub.replies,dstcb);
538 }
539 return REDIS_OK;
540oom:
541 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
542 __redisAsyncCopyError(ac);
543 return REDIS_ERR;
544}
545
546#define redisIsSpontaneousPushReply(r) \
547 (redisIsPushReply(r) && !redisIsSubscribeReply(r))
548
549static int redisIsSubscribeReply(redisReply *reply) {
550 char *str;
551 size_t len, off;
552
553 /* We will always have at least one string with the subscribe/message type */
554 if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
555 reply->element[0]->len < sizeof("message") - 1)
556 {
557 return 0;
558 }
559
560 /* Get the string/len moving past 'p' if needed */
561 off = tolower(reply->element[0]->str[0]) == 'p';
562 str = reply->element[0]->str + off;
563 len = reply->element[0]->len - off;
564
565 return !strncasecmp(str, "subscribe", len) ||
566 !strncasecmp(str, "message", len) ||
567 !strncasecmp(str, "unsubscribe", len);
568}
569
570void redisProcessCallbacks(redisAsyncContext *ac) {
571 redisContext *c = &(ac->c);
572 void *reply = NULL;
573 int status;
574
575 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
576 if (reply == NULL) {
577 /* When the connection is being disconnected and there are
578 * no more replies, this is the cue to really disconnect. */
579 if (c->flags & REDIS_DISCONNECTING && hi_sdslen(c->obuf) == 0
580 && ac->replies.head == NULL) {
581 __redisAsyncDisconnect(ac);
582 return;
583 }
584 /* When the connection is not being disconnected, simply stop
585 * trying to get replies and wait for the next loop tick. */
586 break;
587 }
588
589 /* Keep track of push message support for subscribe handling */
590 if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
591
592 /* Send any non-subscribe related PUSH messages to our PUSH handler
593 * while allowing subscribe related PUSH messages to pass through.
594 * This allows existing code to be backward compatible and work in
595 * either RESP2 or RESP3 mode. */
596 if (redisIsSpontaneousPushReply(reply)) {
597 __redisRunPushCallback(ac, reply);
598 c->reader->fn->freeObject(reply);
599 continue;
600 }
601
602 /* Even if the context is subscribed, pending regular
603 * callbacks will get a reply before pub/sub messages arrive. */
604 redisCallback cb = {NULL, NULL, 0, 0, NULL};
605 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
606 /*
607 * A spontaneous reply in a not-subscribed context can be the error
608 * reply that is sent when a new connection exceeds the maximum
609 * number of allowed connections on the server side.
610 *
611 * This is seen as an error instead of a regular reply because the
612 * server closes the connection after sending it.
613 *
614 * To prevent the error from being overwritten by an EOF error the
615 * connection is closed here. See issue #43.
616 *
617 * Another possibility is that the server is loading its dataset.
618 * In this case we also want to close the connection, and have the
619 * user wait until the server is ready to take our request.
620 */
621 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
622 c->err = REDIS_ERR_OTHER;
623 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
624 c->reader->fn->freeObject(reply);
625 __redisAsyncDisconnect(ac);
626 return;
627 }
628 /* No more regular callbacks and no errors, the context *must* be subscribed. */
629 assert(c->flags & REDIS_SUBSCRIBED);
630 if (c->flags & REDIS_SUBSCRIBED)
631 __redisGetSubscribeCallback(ac,reply,&cb);
632 }
633
634 if (cb.fn != NULL) {
635 __redisRunCallback(ac,&cb,reply);
636 if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
637 c->reader->fn->freeObject(reply);
638 }
639
640 /* Proceed with free'ing when redisAsyncFree() was called. */
641 if (c->flags & REDIS_FREEING) {
642 __redisAsyncFree(ac);
643 return;
644 }
645 } else {
646 /* No callback for this reply. This can either be a NULL callback,
647 * or there were no callbacks to begin with. Either way, don't
648 * abort with an error, but simply ignore it because the client
649 * doesn't know what the server will spit out over the wire. */
650 c->reader->fn->freeObject(reply);
651 }
652
653 /* If in monitor mode, repush the callback */
654 if (c->flags & REDIS_MONITORING) {
655 __redisPushCallback(&ac->replies,&cb);
656 }
657 }
658
659 /* Disconnect when there was an error reading the reply */
660 if (status != REDIS_OK)
661 __redisAsyncDisconnect(ac);
662}
663
664static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
665 __redisRunConnectCallback(ac, REDIS_ERR);
666 __redisAsyncDisconnect(ac);
667}
668
669/* Internal helper function to detect socket status the first time a read or
670 * write event fires. When connecting was not successful, the connect callback
671 * is called with a REDIS_ERR status and the context is free'd. */
672static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
673 int completed = 0;
674 redisContext *c = &(ac->c);
675
676 if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
677 /* Error! */
678 if (redisCheckSocketError(c) == REDIS_ERR)
679 __redisAsyncCopyError(ac);
680 __redisAsyncHandleConnectFailure(ac);
681 return REDIS_ERR;
682 } else if (completed == 1) {
683 /* connected! */
684 if (c->connection_type == REDIS_CONN_TCP &&
685 redisSetTcpNoDelay(c) == REDIS_ERR) {
686 __redisAsyncHandleConnectFailure(ac);
687 return REDIS_ERR;
688 }
689
690 /* flag us as fully connect, but allow the callback
691 * to disconnect. For that reason, permit the function
692 * to delete the context here after callback return.
693 */
694 c->flags |= REDIS_CONNECTED;
695 __redisRunConnectCallback(ac, REDIS_OK);
696 if ((ac->c.flags & REDIS_DISCONNECTING)) {
697 redisAsyncDisconnect(ac);
698 return REDIS_ERR;
699 } else if ((ac->c.flags & REDIS_FREEING)) {
700 redisAsyncFree(ac);
701 return REDIS_ERR;
702 }
703 return REDIS_OK;
704 } else {
705 return REDIS_OK;
706 }
707}
708
709void redisAsyncRead(redisAsyncContext *ac) {
710 redisContext *c = &(ac->c);
711
712 if (redisBufferRead(c) == REDIS_ERR) {
713 __redisAsyncDisconnect(ac);
714 } else {
715 /* Always re-schedule reads */
716 _EL_ADD_READ(ac);
717 redisProcessCallbacks(ac);
718 }
719}
720
721/* This function should be called when the socket is readable.
722 * It processes all replies that can be read and executes their callbacks.
723 */
724void redisAsyncHandleRead(redisAsyncContext *ac) {
725 redisContext *c = &(ac->c);
726 /* must not be called from a callback */
727 assert(!(c->flags & REDIS_IN_CALLBACK));
728
729 if (!(c->flags & REDIS_CONNECTED)) {
730 /* Abort connect was not successful. */
731 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
732 return;
733 /* Try again later when the context is still not connected. */
734 if (!(c->flags & REDIS_CONNECTED))
735 return;
736 }
737
738 c->funcs->async_read(ac);
739}
740
741void redisAsyncWrite(redisAsyncContext *ac) {
742 redisContext *c = &(ac->c);
743 int done = 0;
744
745 if (redisBufferWrite(c,&done) == REDIS_ERR) {
746 __redisAsyncDisconnect(ac);
747 } else {
748 /* Continue writing when not done, stop writing otherwise */
749 if (!done)
750 _EL_ADD_WRITE(ac);
751 else
752 _EL_DEL_WRITE(ac);
753
754 /* Always schedule reads after writes */
755 _EL_ADD_READ(ac);
756 }
757}
758
759void redisAsyncHandleWrite(redisAsyncContext *ac) {
760 redisContext *c = &(ac->c);
761 /* must not be called from a callback */
762 assert(!(c->flags & REDIS_IN_CALLBACK));
763
764 if (!(c->flags & REDIS_CONNECTED)) {
765 /* Abort connect was not successful. */
766 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
767 return;
768 /* Try again later when the context is still not connected. */
769 if (!(c->flags & REDIS_CONNECTED))
770 return;
771 }
772
773 c->funcs->async_write(ac);
774}
775
776void redisAsyncHandleTimeout(redisAsyncContext *ac) {
777 redisContext *c = &(ac->c);
778 redisCallback cb;
779 /* must not be called from a callback */
780 assert(!(c->flags & REDIS_IN_CALLBACK));
781
782 if ((c->flags & REDIS_CONNECTED)) {
783 if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
784 /* Nothing to do - just an idle timeout */
785 return;
786 }
787
788 if (!ac->c.command_timeout ||
789 (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) {
790 /* A belated connect timeout arriving, ignore */
791 return;
792 }
793 }
794
795 if (!c->err) {
796 __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
797 __redisAsyncCopyError(ac);
798 }
799
800 if (!(c->flags & REDIS_CONNECTED)) {
801 __redisRunConnectCallback(ac, REDIS_ERR);
802 }
803
804 while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
805 __redisRunCallback(ac, &cb, NULL);
806 }
807
808 /**
809 * TODO: Don't automatically sever the connection,
810 * rather, allow to ignore <x> responses before the queue is clear
811 */
812 __redisAsyncDisconnect(ac);
813}
814
815/* Sets a pointer to the first argument and its length starting at p. Returns
816 * the number of bytes to skip to get to the following argument. */
817static const char *nextArgument(const char *start, const char **str, size_t *len) {
818 const char *p = start;
819 if (p[0] != '$') {
820 p = strchr(p,'$');
821 if (p == NULL) return NULL;
822 }
823
824 *len = (int)strtol(p+1,NULL,10);
825 p = strchr(p,'\r');
826 assert(p);
827 *str = p+2;
828 return p+2+(*len)+2;
829}
830
831/* Helper function for the redisAsyncCommand* family of functions. Writes a
832 * formatted command to the output buffer and registers the provided callback
833 * function with the context. */
834static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
835 redisContext *c = &(ac->c);
836 redisCallback cb;
837 struct dict *cbdict;
838 dictIterator it;
839 dictEntry *de;
840 redisCallback *existcb;
841 int pvariant, hasnext;
842 const char *cstr, *astr;
843 size_t clen, alen;
844 const char *p;
845 hisds sname;
846 int ret;
847
848 /* Don't accept new commands when the connection is about to be closed. */
849 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
850
851 /* Setup callback */
852 cb.fn = fn;
853 cb.privdata = privdata;
854 cb.pending_subs = 1;
855 cb.unsubscribe_sent = 0;
856
857 /* Find out which command will be appended. */
858 p = nextArgument(cmd,&cstr,&clen);
859 assert(p != NULL);
860 hasnext = (p[0] == '$');
861 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
862 cstr += pvariant;
863 clen -= pvariant;
864
865 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
866 c->flags |= REDIS_SUBSCRIBED;
867
868 /* Add every channel/pattern to the list of subscription callbacks. */
869 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
870 sname = hi_sdsnewlen(astr,alen);
871 if (sname == NULL)
872 goto oom;
873
874 if (pvariant)
875 cbdict = ac->sub.patterns;
876 else
877 cbdict = ac->sub.channels;
878
879 de = dictFind(cbdict,sname);
880
881 if (de != NULL) {
882 existcb = dictGetEntryVal(de);
883 cb.pending_subs = existcb->pending_subs + 1;
884 }
885
886 ret = dictReplace(cbdict,sname,&cb);
887
888 if (ret == 0) hi_sdsfree(sname);
889 }
890 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
891 /* It is only useful to call (P)UNSUBSCRIBE when the context is
892 * subscribed to one or more channels or patterns. */
893 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
894
895 if (pvariant)
896 cbdict = ac->sub.patterns;
897 else
898 cbdict = ac->sub.channels;
899
900 if (hasnext) {
901 /* Send an unsubscribe with specific channels/patterns.
902 * Bookkeeping the number of expected replies */
903 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
904 sname = hi_sdsnewlen(astr,alen);
905 if (sname == NULL)
906 goto oom;
907
908 de = dictFind(cbdict,sname);
909 if (de != NULL) {
910 existcb = dictGetEntryVal(de);
911 if (existcb->unsubscribe_sent == 0)
912 existcb->unsubscribe_sent = 1;
913 else
914 /* Already sent, reply to be ignored */
915 ac->sub.pending_unsubs += 1;
916 } else {
917 /* Not subscribed to, reply to be ignored */
918 ac->sub.pending_unsubs += 1;
919 }
920 hi_sdsfree(sname);
921 }
922 } else {
923 /* Send an unsubscribe without specific channels/patterns.
924 * Bookkeeping the number of expected replies */
925 int no_subs = 1;
926 dictInitIterator(&it,cbdict);
927 while ((de = dictNext(&it)) != NULL) {
928 existcb = dictGetEntryVal(de);
929 if (existcb->unsubscribe_sent == 0) {
930 existcb->unsubscribe_sent = 1;
931 no_subs = 0;
932 }
933 }
934 /* Unsubscribing to all channels/patterns, where none is
935 * subscribed to, results in a single reply to be ignored. */
936 if (no_subs == 1)
937 ac->sub.pending_unsubs += 1;
938 }
939
940 /* (P)UNSUBSCRIBE does not have its own response: every channel or
941 * pattern that is unsubscribed will receive a message. This means we
942 * should not append a callback function for this command. */
943 } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
944 /* Set monitor flag and push callback */
945 c->flags |= REDIS_MONITORING;
946 if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
947 goto oom;
948 } else {
949 if (c->flags & REDIS_SUBSCRIBED) {
950 if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
951 goto oom;
952 } else {
953 if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
954 goto oom;
955 }
956 }
957
958 __redisAppendCommand(c,cmd,len);
959
960 /* Always schedule a write when the write buffer is non-empty */
961 _EL_ADD_WRITE(ac);
962
963 return REDIS_OK;
964oom:
965 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
966 __redisAsyncCopyError(ac);
967 return REDIS_ERR;
968}
969
970int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
971 char *cmd;
972 int len;
973 int status;
974 len = redisvFormatCommand(&cmd,format,ap);
975
976 /* We don't want to pass -1 or -2 to future functions as a length. */
977 if (len < 0)
978 return REDIS_ERR;
979
980 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
981 hi_free(cmd);
982 return status;
983}
984
985int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
986 va_list ap;
987 int status;
988 va_start(ap,format);
989 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
990 va_end(ap);
991 return status;
992}
993
994int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
995 hisds cmd;
996 long long len;
997 int status;
998 len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
999 if (len < 0)
1000 return REDIS_ERR;
1001 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
1002 hi_sdsfree(cmd);
1003 return status;
1004}
1005
1006int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
1007 int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
1008 return status;
1009}
1010
1011redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
1012 redisAsyncPushFn *old = ac->push_cb;
1013 ac->push_cb = fn;
1014 return old;
1015}
1016
1017int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
1018 if (!ac->c.command_timeout) {
1019 ac->c.command_timeout = hi_calloc(1, sizeof(tv));
1020 if (ac->c.command_timeout == NULL) {
1021 __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
1022 __redisAsyncCopyError(ac);
1023 return REDIS_ERR;
1024 }
1025 }
1026
1027 if (tv.tv_sec != ac->c.command_timeout->tv_sec ||
1028 tv.tv_usec != ac->c.command_timeout->tv_usec)
1029 {
1030 *ac->c.command_timeout = tv;
1031 }
1032
1033 return REDIS_OK;
1034}